Ask AI

You are viewing an unreleased or outdated version of the documentation

Orchestration on Celery + Docker

APIs

dagster_celery_docker.celery_docker_executor ExecutorDefinition[source]

Config Schema:
broker (Union[dagster.StringSource, None], optional):

The URL of the Celery broker. Default: ‘pyamqp://guest@{os.getenv(‘DAGSTER_CELERY_BROKER_HOST’,’localhost’)}//’.

backend (Union[dagster.StringSource, None], optional):

The URL of the Celery results backend. Default: ‘rpc://’.

Default Value: ‘rpc://’

include (List[String], optional):

List of modules every worker should import

config_source (Union[permissive dict, None], optional):

Additional settings for the Celery app.

retries (selector, optional):

Whether retries are enabled or not. By default, retries are enabled.

Default Value:
{
    "enabled": {}
}
Config Schema:
enabled (strict dict, optional):
Default Value:
{}
disabled (strict dict, optional):
Default Value:
{}
docker (strict dict):

The configuration for interacting with docker in the celery worker.

Config Schema:
image (dagster.StringSource, optional):

The docker image to be used for step execution.

registry (strict dict, optional):

Information for using a non local/public docker registry

Config Schema:
url (dagster.StringSource):

username (dagster.StringSource):

password (dagster.StringSource):

env_vars (List[String], optional):

The list of environment variables names to forward from the celery worker in to the docker container

network (String, optional):

Name of the network this container will be connected to at creation time

container_kwargs (permissive dict, optional):

Additional keyword args for the docker container

Celery-based executor which launches tasks in docker containers.

The Celery executor exposes config settings for the underlying Celery app under the config_source key. This config corresponds to the “new lowercase settings” introduced in Celery version 4.0 and the object constructed from config will be passed to the celery.Celery constructor as its config_source argument. (See https://docs.celeryq.dev/en/stable/userguide/configuration.html for details.)

The executor also exposes the broker, backend, and include arguments to the celery.Celery constructor.

In the most common case, you may want to modify the broker and backend (e.g., to use Redis instead of RabbitMQ). We expect that config_source will be less frequently modified, but that when op executions are especially fast or slow, or when there are different requirements around idempotence or retry, it may make sense to execute jobs with variations on these settings.

To use the celery_docker_executor, set it as the executor_def when defining a job:

from dagster import job
from dagster_celery_docker.executor import celery_docker_executor

@job(executor_def=celery_docker_executor)
def celery_enabled_job():
    pass

Then you can configure the executor as follows:

execution:
  config:
    docker:
      image: 'my_repo.com/image_name:latest'
      registry:
        url: 'my_repo.com'
        username: 'my_user'
        password: {env: 'DOCKER_PASSWORD'}
      env_vars: ["DAGSTER_HOME"] # environment vars to pass from celery worker to docker
      container_kwargs: # keyword args to be passed to the container. example:
        volumes: ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']

    broker: 'pyamqp://guest@localhost//'  # Optional[str]: The URL of the Celery broker
    backend: 'rpc://' # Optional[str]: The URL of the Celery results backend
    include: ['my_module'] # Optional[List[str]]: Modules every worker should import
    config_source: # Dict[str, Any]: Any additional parameters to pass to the
        #...       # Celery workers. This dict will be passed as the `config_source`
        #...       # argument of celery.Celery().

Note that the YAML you provide here must align with the configuration with which the Celery workers on which you hope to run were started. If, for example, you point the executor at a different broker than the one your workers are listening to, the workers will never be able to pick up tasks for execution.

In deployments where the celery_docker_job_executor is used all appropriate celery and dagster_celery commands must be invoked with the -A dagster_celery_docker.app argument.