Ask AI

You are viewing an unreleased or outdated version of the documentation

Asset definitions#

Prefer videos? Check out our explainer and demo videos to get a quick look at asset definitions.

An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. An asset definition is a description, in code, of an asset that should exist and how to produce and update that asset.

Asset definitions enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed.

An asset definition includes the following:

  • An AssetKey, which is a handle for referring to the asset.

  • A set of upstream asset keys, which refer to assets that the contents of the asset definition are derived from.

  • A Python function, which is responsible for computing the contents of the asset from its upstream dependencies and storing the results.

    Note: Behind-the-scenes, the Python function is an op. Ops are an advanced topic that isn't required to get started with Dagster. A crucial distinction between asset definitions and ops is that asset definitions know about their dependencies, while ops do not. Ops aren't connected to dependencies until they're placed inside a graph.

Materializing an asset is the act of running its function and saving the results to persistent storage. You can initiate materializations from the Dagster UI or by invoking Python APIs.


Relevant APIs#

NameDescription
@assetA decorator used to define assets.
AssetsDefinitionA class that represents a one or more asset definitions, usually backed by a single function that materializes or observes them all together.

Defining assets#

Basic asset definitions#

The easiest way to create an asset definition is with the @asset decorator.

import json
import os

from dagster import asset


@asset
def my_asset():
    os.makedirs("data", exist_ok=True)
    with open("data/my_asset.json", "w") as f:
        json.dump([1, 2, 3], f)

By default, the name of the decorated function, my_asset, is used as the asset key. The decorated function is responsible for producing the asset's contents. The asset in this example doesn't depend on any other assets.

Assets with dependencies#

Asset definitions can depend on other asset definitions. In this section, we'll show you how to define:

Defining basic dependencies#

You can define a dependency between two assets by passing the upstream asset to the deps parameter in the downstream asset's @asset decorator.

In this example, the asset sugary_cereals creates a new table (sugary_cereals) by selecting records from the cereals table. Then the asset shopping_list creates a new table (shopping_list) by selecting records from sugary_cereals:

from dagster import asset


@asset
def sugary_cereals() -> None:
    execute_query(
        "CREATE TABLE sugary_cereals AS SELECT * FROM cereals WHERE sugar_grams > 10"
    )


@asset(deps=[sugary_cereals])
def shopping_list() -> None:
    execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")

Defining asset dependencies across code locations#

Assets can depend on assets in different code locations. Consider this example for code_location_1:

# code_location_1.py
import json

from dagster import Definitions, asset


@asset
def code_location_1_asset():
    with open("/data/code_location_1_asset.json", "w+") as f:
        json.dump(5, f)


defs = Definitions(assets=[code_location_1_asset])

In code_location_2, we can then reference it via its asset key:

# code_location_2.py
import json

from dagster import AssetKey, Definitions, asset


@asset(deps=["code_location_1_asset"])
def code_location_2_asset():
    with open("/data/code_location_1_asset.json", "r") as f:
        x = json.load(f)

    with open("/data/code_location_2_asset.json", "w+") as f:
        json.dump(x + 6, f)


defs = Definitions(assets=[code_location_2_asset])

Graph-backed assets and multi-assets#

If you'd like to define more complex assets, Dagster offers augmented asset definition abstractions:

Asset configuration#

Assets in Dagster can specify a config schema. This allows you to provide values to assets at run time. The configuration system is explained in detail in the Config schema documentation.

Asset functions can specify an annotated config parameter for the assets's configuration. The config class, which subclasses Config (which inherits from pydantic.BaseModel) specifies the configuration schema for the asset.

For example, the following downstream asset queries an API endpoint defined through configuration:

from dagster import Config, asset


class MyDownstreamAssetConfig(Config):
    api_endpoint: str


@asset
def my_downstream_asset(config: MyDownstreamAssetConfig):
    data = requests.get(f"{config.api_endpoint}/data").json()
    ...

Refer to the Config schema documentation for more configuration info and examples.

Asset context#

When writing an asset, users can optionally provide a first parameter, context. When this parameter is supplied, Dagster will supply an AssetExecutionContext object to the body of the asset which provides access to system information like loggers and the current run ID.

For example, to access the logger and log an info message:

from dagster import AssetExecutionContext, asset


@asset
def context_asset(context: AssetExecutionContext):
    context.log.info(f"My run ID is {context.run.run_id}")
    ...

Asset code versions#

Assets may be assigned a code_version. Versions let you help Dagster track what assets haven't been re-materialized since their code has changed, and avoid performing redundant computation.

@asset(code_version="1")
def asset_with_version():
    with open("data/asset_with_version.json", "w") as f:
        json.dump(100, f)

When an asset with a code version is materialized, the generated AssetMaterialization is tagged with the version. The UI will indicate when an asset has a different code version than the code version used for its most recent materialization.

Defining metadata and tags#

Dagster offers several ways to provide useful information and documentation alongside your data pipelines, including metadata and tagging. For example, you can attach metadata to an asset that calculates how many records are processed during each run and then view the data as a plot in the Dagster UI!

Check out the Metadata & tags documentation to get started.

Retrying failed assets#

If an exception occurs during asset execution, you can use a RetryPolicy to automatically retry the asset within the same run.

In the following example, we've specified the number of times to retry and how long to wait between retries:

from dagster import (
    AssetExecutionContext,
    Backoff,
    Jitter,
    RetryPolicy,
    RetryRequested,
    asset,
)


@asset(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=0.2,  # 200ms
        backoff=Backoff.EXPONENTIAL,
        jitter=Jitter.PLUS_MINUS,
    )
)
def retried_asset(context: AssetExecutionContext):
    context.log.info("Retry me!")

Viewing and materializing assets in the UI#

Once you've defined a set of assets, you can:

Loading assets into the webserver#

To view and materialize assets in the UI, you can point the underlying webserver at a module that contains asset definitions or lists of asset definitions as module-level attributes:

dagster dev -m module_with_assets

If you want the UI to show both assets and jobs that target the assets, you can place the assets and jobs together inside a Definitions object. For example:

defs = Definitions(
    assets=[asset_1, asset_2],
    jobs=[asset_job],
)

A Definitions object defines a code location, which is a collection of assets, jobs, resources, and schedules. Refer to the Code locations documentation for more info.

Viewing assets in the UI#

Asset catalog#

To view a list of all your assets, click Assets in the top navigation. This opens the Asset catalog:

Asset catalog page in the UI

Materializing assets in the UI#

In the UI, you can launch runs that materialize assets by:

  • Navigating to the Asset details page for the asset and clicking the Materialize button in the upper right corner.
  • Navigating to the graph view of the Asset catalog page and clicking the Materialize button in the upper right corner. You can also click on individual assets to collect a subset to materialize.

Building jobs that materialize assets#

Jobs that target assets can materialize a fixed selection of assets each time they run and be placed on schedules and sensors. Refer to the Asset jobs documentation for more info and examples.


Grouping assets#

To help keep your assets tidy, you can organize them into groups. Grouping assets by project, concept, and so on simplifies keeping track of them in the UI. Each asset is assigned to a single group, which by default is called "default".

Assigning assets to groups#

In Dagster, there are two ways to assign assets to groups:

By default, assets that aren't assigned to a group will be placed in a group named default. Use the UI to view these assets.

On individual assets#

Assets can also be given groups on an individual basis by specifying an argument when creating the asset:

@asset(group_name="cereal_assets")
def nabisco_cereals():
    execute_query(
        "CREATE TABLE nabisco_cereals AS SELECT * FROM cereals WHERE manufacturer = 'Nabisco'"
    )

From assets in a sub-module#

This recommended approach constructs a group of assets from a specified module in your project. Using the load_assets_from_package_module function, you can import all assets in a module and apply a grouping:

from my_package import cereal

cereal_assets = load_assets_from_package_module(
    cereal,
    group_name="cereal_assets",
)

If any of the assets in the module already has a group_name explicitly set on it, you'll encounter a Group name already exists on assets error.

Viewing asset groups in the UI#

To view your asset groups in the UI, open the left navigation by clicking the menu icon in the top left corner. As asset groups are grouped in code locations, you may need to open a code location to view its asset groups.

Click the asset group to open a dependency graph for all assets in the group. For example, in the following image, the dependency graph for the activity_analytics asset group is currently displayed:

Dependency graph for an asset group

Testing#

When writing unit tests, you can treat the function decorated by @asset as a regular Python function.

Consider a simple asset with no upstream dependencies:

@asset
def my_simple_asset():
    return [1, 2, 3]

When writing a unit test, you can directly invoke the decorated function:

def test_my_simple_asset():
    result = my_simple_asset()
    assert result == [1, 2, 3]

If you have an asset with managed-loading upstream dependencies:

@asset
def more_complex_asset(my_simple_asset):
    return my_simple_asset + [4, 5, 6]

You can manually provide values for those dependencies in your unit test. This allows you to test assets in isolation from one another:

def test_more_complex_asset():
    result = more_complex_asset([0])
    assert result == [0, 4, 5, 6]

If you use config of resources in your asset, they will be provided automatically during execution. When writing unit tests, you may provide them directly when invoking the asset function:

class MyConfig(Config):
    api_url: str


class MyAPIResource(ConfigurableResource):
    def query(self, url) -> Dict[str, Any]:
        return requests.get(url).json()


@asset
def uses_config_and_resource(config: MyConfig, my_api: MyAPIResource):
    return my_api.query(config.api_url)


def test_uses_resource() -> None:
    result = uses_config_and_resource(
        config=MyConfig(api_url="https://dagster.io"), my_api=MyAPIResource()
    )
    assert result == {"foo": "bar"}

If you use a context object in your function, you can use build_asset_context to generate the context object.

Consider the following asset that uses a context object:

@asset
def uses_context(context: AssetExecutionContext):
    context.log.info(context.run.run_id)
    return "bar"

When writing a unit test, use build_asset_context to mock the context and provide values for testing:

def test_uses_context():
    context = build_asset_context()
    result = uses_context(context)
    assert result == "bar"

Examples#

Multi-component asset keys#

Assets are often objects in systems with hierarchical namespaces, like filesystems. Because of this, it often makes sense for an asset key to be a list of strings, instead of just a single string. To define an asset with a multi-part asset key, use the key_prefix argument with a list of strings. The full asset key is formed by prepending the key_prefix to the asset name (which defaults to the name of the decorated function).

from dagster import AssetIn, asset


@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
    return [1, 2, 3]


@asset(ins={"upstream_asset": AssetIn(key_prefix=["one", "two", "three"])})
def downstream_asset(upstream_asset):
    return upstream_asset + [4]

See it in action#

For more examples of asset definitions, check out these examples: