Share via


Lakeflow Declarative Pipelines event log

The Lakeflow Declarative Pipelines event log contains all information related to a pipeline, including audit logs, data quality checks, pipeline progress, and data lineage. You can use the event log to track, understand, and monitor the state of your data pipelines.

You can view event log entries in the Lakeflow Declarative Pipelines user interface, the Lakeflow Declarative Pipelines API, or by directly querying the event log. This section focuses on querying the event log directly.

You can also define custom actions to run when events are logged, for example, sending alerts, with event hooks.

Important

Do not delete the event log or the parent catalog or schema where the event log is published. Deleting the event log might result in your pipeline failing to update during future runs.

For full details of the event log schema, see Lakeflow Declarative Pipelines event log schema.

Query the event log

Note

This section describes the default behavior and syntax for working with event logs for pipelines configured with Unity Catalog and the default publishing mode.

By default, Lakeflow Declarative Pipelines writes the event log to a hidden Delta table in the default catalog and schema configured for the pipeline. While hidden, the table can still be queried by all sufficiently privileged users. By default, only the owner of the pipeline can query the event log table.

To query the event log as the owner, use the pipeline ID:

SELECT * FROM event_log(<pipelineId>);

By default, the name for the hidden event log is formatted as event_log_{pipeline_id}, where the pipeline ID is the system-assigned UUID with dashes replaced by underscores.

You can publish the event log by editing the Advanced settings for your pipeline, then select Publish event log to metastore. For details, see Pipeline setting for event log. When you publish an event log, you specify the name for the event log and can optionally specify a catalog and schema, as in the following example:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

The event log location also serves as the schema location for any Auto Loader queries in the pipeline. Databricks recommends creating a view over the event log table before modifying the privileges, as some compute settings might allow users to gain access to schema metadata if the event log table is shared directly. The following example syntax creates a view on an event log table, and is used in the example event log queries included in this article. Replace <catalog_name>.<schema_name>.<event_log_table_name> with the fully qualified table name of your pipeline event log. If you've published the event log, use the name specified when publishing. Otherwise, use event_log(<pipelineId>) where the pipelineId is the ID of the pipeline you want to query.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

In Unity Catalog, views support streaming queries. The following example uses Structured Streaming to query a view defined on top of an event log table:

df = spark.readStream.table("event_log_raw")

The owner of the pipeline can publish the event log as a public Delta table by toggling the Publish event log to metastore option in the Advanced section of the pipeline configuration. You can optionally specify a new table name, catalog, and schema for the event log.

Basic query examples

The following examples show how to query the event log to get general information about pipelines, and to help debug common scenarios.

Monitor pipeline updates by querying previous updates

The following example queries the updates (or runs) of your pipeline, showing the update ID, status, start time, completion time, and duration. This gives you an overview of runs for the pipeline.

Assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Debug materialized view incremental refresh issues

This example queries all flows from the most recent update of a pipeline. It shows whether they were incrementally updated or not, as well as other relevant planning information that is useful for debugging why an incremental refresh doesn't happen.

Assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Query the cost of a pipeline update

This example show how to query the DBU usage for a pipeline, as well as the user for a given pipeline run.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Advanced queries

The following examples show how to query the event log to handle less common or more advanced scenarios.

Query metrics for all flows in a pipeline

This example shows how to query detailed information about every flow in a pipeline. It shows the flow name, update duration, data quality metrics, and information about the rows processed (output rows, deleted, upserted, and dropped records).

Assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Query data quality or expectations metrics

If you define expectations on data sets in your pipeline, the metrics for the number of records that passed and failed an expectation are stored in the details:flow_progress.data_quality.expectations object. The metric for the number of dropped records is stored in the details:flow_progress.data_quality object. Events containing information about data quality have the event type flow_progress.

Data quality metrics might not be available for some data sets. See the expectation limitations.

The following data quality metrics are available:

Metric Description
dropped_records The number of records that were dropped because they failed one or more expectations.
passed_records The number of records that passed the expectation criteria.
failed_records The number of records that failed the expectation criteria.

The following example queries the data quality metrics for the last pipeline update. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Query lineage information

Events containing information about lineage have the event type flow_definition. The details:flow_definition object contains the output_dataset and input_datasets defining each relationship in the graph.

Use the following query to extract the input and output data sets to see lineage information. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Monitor cloud file ingestion with Auto Loader

Lakeflow Declarative Pipelines generates events when Auto Loader processes files. For Auto Loader events, the event_type is operation_progress and the details:operation_progress:type is either AUTO_LOADER_LISTING or AUTO_LOADER_BACKFILL. The details:operation_progress object also includes status, duration_ms, auto_loader_details:source_path, and auto_loader_details:num_files_listed fields.

The following example queries Auto Loader events for the latest update. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Monitor data backlog for optimizing streaming duration

Lakeflow Declarative Pipelines tracks how much data is present in the backlog in the details:flow_progress.metrics.backlog_bytes object. Events containing backlog metrics have the event type flow_progress. The following example queries backlog metrics for the last pipeline update. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Note

The backlog metrics might not be available depending on the pipeline's data source type and Databricks Runtime version.

Monitor autoscaling events for optimizing classic compute

For Lakeflow Declarative Pipelines that use classic compute (in other words, do not use serverless compute), the event log captures cluster resizes when enhanced autoscaling is enabled in your pipelines. Events containing information about enhanced autoscaling have the event type autoscale. The cluster resizing request information is stored in the details:autoscale object.

The following example queries the enhanced autoscaling cluster resize requests for the last pipeline update. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Monitor compute resource utilization for classic compute

cluster_resources events provide metrics on the number of task slots in the cluster, how much those task slots are used, and how many tasks are waiting to be scheduled.

When enhanced autoscaling is enabled, cluster_resources events also contain metrics for the autoscaling algorithm, including latest_requested_num_executors, and optimal_num_executors. The events also show the status of the algorithm as different states such as CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS, and BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. This information can be viewed in conjunction with the autoscaling events to provide an overall picture of enhanced autoscaling.

The following example queries the task queue size history, utilization history, executor count history, and other metrics and state for autoscaling in the last pipeline update. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Audit Lakeflow Declarative Pipelines

You can use Lakeflow Declarative Pipelines event log records and other Azure Databricks audit logs to get a complete picture of how data is being updated in Lakeflow Declarative Pipelines.

Lakeflow Declarative Pipelines uses the credentials of the pipeline owner to run updates. You can change the credentials used by updating the pipeline owner. Lakeflow Declarative Pipelines records the user for actions on the pipeline, including pipeline creation, edits to configuration, and triggering updates.

See Unity Catalog events for a reference of Unity Catalog audit events.

Query user actions in the event log

You can use the event log to audit events, for example, user actions. Events containing information about user actions have the event type user_action.

Information about the action is stored in the user_action object in the details field. Use the following query to construct an audit log of user events. This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Runtime information

You can view runtime information for a pipeline update, for example, the Databricks Runtime version for the update, This assumes that you have created the event_log_raw view for the pipeline you are interested in, as described in Query the event log.

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0