Ask AI

You are viewing an unreleased or outdated version of the documentation

Custom loggers#

You may find yourself wanting to add or supplement the built-in loggers so that Dagster logs are integrated with the rest of your log aggregation and monitoring infrastructure.

For example, you may be operating in a containerized environment where container stdout is aggregated by a tool such as Logstash. In this kind of environment, where logs will be aggregated and parsed by machine, the multi-line output from the default colored console logger is unhelpful. Instead, we'd much prefer to see single-line, structured log messages like:

{"orig_message": "Hello, world!", "log_message_id": "49854579-e4d1-4289-8453-b3e177b20056", ...}

Dagster includes a logger that prints JSON-formatted single-line messages like this to the console (json_console_logger).


Defining custom loggers#

Loggers are defined internally using the LoggerDefinition class, but, following a common pattern in the Dagster codebase, the @logger decorator exposes a simpler API for the common use case and is typically what you'll use to define your own loggers.

The decorated function should take a single argument, the init_context available during logger initialization, and return a logging.Logger:

@logger(
    {
        "log_level": Field(str, is_required=False, default_value="INFO"),
        "name": Field(str, is_required=False, default_value="dagster"),
    },
    description="A JSON-formatted console logger",
)
def json_console_logger(init_context):
    level = init_context.logger_config["log_level"]
    name = init_context.logger_config["name"]

    klass = logging.getLoggerClass()
    logger_ = klass(name, level=level)

    handler = logging.StreamHandler()

    class JsonFormatter(logging.Formatter):
        def format(self, record):
            return json.dumps(
                {
                    k: v
                    for k, v in record.__dict__.items()
                    # values for these keys are not directly JSON-serializable
                    if k not in ["dagster_event", "dagster_meta"]
                }
            )

    handler.setFormatter(JsonFormatter())
    logger_.addHandler(handler)

    return logger_


@op
def hello_logs(context: OpExecutionContext):
    context.log.info("Hello, world!")


@job(logger_defs={"my_json_logger": json_console_logger})
def demo_job():
    hello_logs()

You can specify the logger name in the run config. It also takes a config argument, representing the config that users can pass to the logger. For example:

loggers:
  my_json_logger:
    config:
      log_level: INFO

And when the job is executed:

dagster job execute -f custom_logger.py -c config_custom_logger.yaml

The custom JSON logger, now visible in the Dagster UI, is now being used:

Custom logger Terminal output

Testing custom loggers#

You can unit test the initialization method of a logger by invoking it:

def test_init_json_console_logger():
    logger_ = json_console_logger(None)
    assert logger_.level == 20
    assert logger_.name == "dagster"

If you need to provide config to the initialization of the logger, use the build_init_logger_context function:

from dagster import build_init_logger_context


def test_init_json_console_logger_with_context():
    logger_ = json_console_logger(
        build_init_logger_context(logger_config={"name": "my_logger"})
    )
    assert logger_.level == 20
    assert logger_.name == "my_logger"