Ask AI

You are viewing an unreleased or outdated version of the documentation

Airflow Federation Tutorial: Observing multiple Airflow instances#

At this point, we should have finished the setup step, and now we have the example code setup with a fresh virtual environment, and our two Airflow instances running locally. Now, we can start writing Dagster code.

Observing the Airflow instances#

We'll start by creating asset representations of our DAGs in Dagster.

Create a new shell and navigate to the root of the tutorial directory. You will need to set up the dagster-airlift package in your Dagster environment:

source .venv/bin/activate
uv pip install 'dagster-airlift[core]' dagster-webserver dagster

Observing the warehouse Airflow instance#

Next, we'll declare a reference to our warehouse Airflow instance, which is running at http://localhost:8081.

from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance

warehouse_airflow_instance = AirflowInstance(
    auth_backend=AirflowBasicAuthBackend(
        webserver_url="http://localhost:8081",
        username="admin",
        password="admin",
    ),
    name="warehouse",
)

Now, we can use the load_airflow_dag_asset_specs function to create asset representations of the DAGs in the warehouse Airflow instance:

from dagster_airlift.core import load_airflow_dag_asset_specs

assets = load_airflow_dag_asset_specs(
    airflow_instance=warehouse_airflow_instance,
)

Now, let's add these assets to a Definitions object:

from dagster import Definitions

defs = Definitions(assets=assets)

Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance:

# Set up environment variables to point to the airlift-federation-tutorial directory on your machine
export TUTORIAL_EXAMPLE_DIR=$(pwd)
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py

If we navigate to the Dagster UI (running at http://localhost:3000), we should see the assets created from the warehouse Airflow instance.

Assets from the warehouse Airflow instance in the Dagster UI

There's a lot of DAGs in this instance, and we only want to focus on the load_customers DAG. Let's filter the assets to only include the load_customers DAG:

load_customers = next(
    iter(
        load_airflow_dag_asset_specs(
            airflow_instance=warehouse_airflow_instance,
            dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
        )
    )
)

Let's instead add this asset to our Definitions object:

defs = Definitions(assets=[load_customers])

Now, our Dagster environment only includes the load_customers DAG from the warehouse Airflow instance.

Assets from the warehouse Airflow instance in the Dagster UI

Finally, we'll use a sensor to poll the warehouse Airflow instance for new runs. This way, whenever we get a successful run of the load_customers DAG, we'll see a materialization in the Dagster UI:

from dagster_airlift.core import build_airflow_polling_sensor

warehouse_sensor = build_airflow_polling_sensor(
    mapped_assets=[load_customers],
    airflow_instance=warehouse_airflow_instance,
)

Now, we can add this sensor to our Definitions object:

defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor])

You can test this by navigating to the airflow UI at localhost:8081, and triggering a run of the load_customers DAG. When the run completes, you should see a materialization in the Dagster UI.

Materialization of the load_customers DAG in the Dagster UI

Observing the metrics Airflow instance#

We can repeat the same process for the customer_metrics DAG in the metrics Airflow instance, which runs at http://localhost:8082. We'll leave this as an exercise to test your understanding.

When complete, your code should look like this:

from dagster import Definitions
from dagster_airlift.core import (
    AirflowBasicAuthBackend,
    AirflowInstance,
    build_airflow_polling_sensor,
    load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
    auth_backend=AirflowBasicAuthBackend(
        webserver_url="http://localhost:8081",
        username="admin",
        password="admin",
    ),
    name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
    auth_backend=AirflowBasicAuthBackend(
        webserver_url="http://localhost:8082",
        username="admin",
        password="admin",
    ),
    name="metrics",
)

load_customers_dag_asset = next(
    iter(
        load_airflow_dag_asset_specs(
            airflow_instance=warehouse_airflow_instance,
            dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
        )
    )
)
customer_metrics_dag_asset = next(
    iter(
        load_airflow_dag_asset_specs(
            airflow_instance=metrics_airflow_instance,
            dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
        )
    )
)

warehouse_sensor = build_airflow_polling_sensor(
    mapped_assets=[load_customers_dag_asset],
    airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
    mapped_assets=[customer_metrics_dag_asset],
    airflow_instance=metrics_airflow_instance,
)

defs = Definitions(
    assets=[load_customers_dag_asset, customer_metrics_dag_asset],
    sensors=[warehouse_sensor, metrics_sensor],
)

Adding lineage between load_customers and customer_metrics#

Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the replace_attributes function to add a dependency from the load_customers asset to the customer_metrics asset:

from dagster._core.definitions.asset_spec import replace_attributes

customer_metrics_dag_asset = replace_attributes(
    customer_metrics_dag_asset,
    deps=[load_customers],
)

Now, after adding the updated customer_metrics_dag_asset to our Definitions object, we should see the lineage between the two DAGs in the Dagster UI.

Lineage between load_customers and customer_metrics in the Dagster UI

Next steps#

Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along here.