The dagster-aws integration library provides the PipesECSClient resource which can be used to launch AWS ECS tasks from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the task side.
Call open_dagster_pipes in the ECS task script to create a context that can be used to send messages to Dagster:
from dagster_pipes import(
PipesEnvVarParamsLoader,
PipesS3ContextLoader,
open_dagster_pipes,)defmain():with open_dagster_pipes()as pipes:
pipes.log.info("Hello from AWS ECS task!")
pipes.report_asset_materialization(
metadata={"some_metric":{"raw_value":0,"type":"int"}},
data_version="alpha",)if __name__ =="__main__":
main()
Step 3: Create an asset using the PipesECSClient to launch the task#
In the Dagster asset/op code, use the PipesECSClient resource to launch the job:
import os
# dagster_glue_pipes.pyimport boto3
from dagster_aws.pipes import PipesECSClient
from docutils.nodes import entry
from dagster import AssetExecutionContext, asset
@assetdefecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient):return pipes_ecs_client.run(
context=context,
run_task_params={"taskDefinition":"my-task","count":1,},).get_materialize_result()
This will launch the AWS ECS task and wait until it reaches "STOPPED" status. If any of the tasks's containers fail, the Dagster process will raise an exception. If the Dagster process is interrupted while the task is still running, the task will be terminated.
Dagster will now be able to launch the AWS ECS task from the ecs_pipes_asset asset, and receive logs and events from the task. If using the default message_readerPipesCloudwatchLogReader, logs will be read from the Cloudwatch log group specified in the container "logConfiguration" field definition. Logs from all containers in the task will be read.