Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
The Lakeflow Declarative Pipelines event log contains all information related to a pipeline, including audit logs, data quality checks, pipeline progress, and data lineage. You can use the event log to track, understand, and monitor the state of your data pipelines.
You can view event log entries in the Lakeflow Declarative Pipelines user interface, the Lakeflow Declarative Pipelines API, or by directly querying the event log. This section focuses on querying the event log directly.
You can also define custom actions to run when events are logged, for example, sending alerts, with event hooks.
Important
Do not delete the event log or the parent catalog or schema where the event log is published. Deleting the event log might result in your pipeline failing to update during future runs.
For full details of the event log schema, see Lakeflow Declarative Pipelines event log schema.
Query the event log
Note
This section describes the default behavior and syntax for working with event logs for pipelines configured with Unity Catalog and the default publishing mode.
- For behavior for Unity Catalog pipelines that use legacy publishing mode, see Work with event log for Unity Catalog legacy publishing mode pipelines.
- For behavior and syntax for Hive metastore pipelines, see Work with event log for Hive metastore pipelines.
By default, Lakeflow Declarative Pipelines writes the event log to a hidden Delta table in the default catalog and schema configured for the pipeline. While hidden, the table can still be queried by all sufficiently privileged users. By default, only the owner of the pipeline can query the event log table.
To query the event log as the owner, use the pipeline ID:
SELECT * FROM event_log(<pipelineId>);
By default, the name for the hidden event log is formatted as event_log_{pipeline_id}
, where the pipeline ID is the system-assigned UUID with dashes replaced by underscores.
You can publish the event log by editing the Advanced settings for your pipeline, then select Publish event log to metastore. For details, see Pipeline setting for event log. When you publish an event log, you specify the name for the event log and can optionally specify a catalog and schema, as in the following example:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
The event log location also serves as the schema location for any Auto Loader queries in the pipeline. Databricks recommends creating a view over the event log table before modifying the privileges, as some compute settings might allow users to gain access to schema metadata if the event log table is shared directly. The following example syntax creates a view on an event log table, and is used in the example event log queries included in this article. Replace <catalog_name>.<schema_name>.<event_log_table_name>
with the fully qualified table name of your pipeline event log. If you've published the event log, use the name specified when publishing. Otherwise, use event_log(<pipelineId>)
where the pipelineId is the ID of the pipeline you want to query.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
In Unity Catalog, views support streaming queries. The following example uses Structured Streaming to query a view defined on top of an event log table:
df = spark.readStream.table("event_log_raw")
The owner of the pipeline can publish the event log as a public Delta table by toggling the Publish event log to metastore
option in the Advanced section of the pipeline configuration. You can optionally specify a new table name, catalog, and schema for the event log.
Basic query examples
The following examples show how to query the event log to get general information about pipelines, and to help debug common scenarios.
Monitor pipeline updates by querying previous updates
The following example queries the updates (or runs) of your pipeline, showing the update ID, status, start time, completion time, and duration. This gives you an overview of runs for the pipeline.
Assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Debug materialized view incremental refresh issues
This example queries all flows from the most recent update of a pipeline. It shows whether they were incrementally updated or not, as well as other relevant planning information that is useful for debugging why an incremental refresh doesn't happen.
Assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Query the cost of a pipeline update
This example show how to query the DBU usage for a pipeline, as well as the user for a given pipeline run.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Advanced queries
The following examples show how to query the event log to handle less common or more advanced scenarios.
Query metrics for all flows in a pipeline
This example shows how to query detailed information about every flow in a pipeline. It shows the flow name, update duration, data quality metrics, and information about the rows processed (output rows, deleted, upserted, and dropped records).
Assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Query data quality or expectations metrics
If you define expectations on data sets in your pipeline, the metrics for the number of records that passed and failed an expectation are stored in the details:flow_progress.data_quality.expectations
object. The metric for the number of dropped records is stored in the details:flow_progress.data_quality
object. Events containing information about data quality have the event type flow_progress
.
Data quality metrics might not be available for some data sets. See the expectation limitations.
The following data quality metrics are available:
Metric | Description |
---|---|
dropped_records |
The number of records that were dropped because they failed one or more expectations. |
passed_records |
The number of records that passed the expectation criteria. |
failed_records |
The number of records that failed the expectation criteria. |
The following example queries the data quality metrics for the last pipeline update. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Query lineage information
Events containing information about lineage have the event type flow_definition
. The details:flow_definition
object contains the output_dataset
and input_datasets
defining each relationship in the graph.
Use the following query to extract the input and output data sets to see lineage information. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Monitor cloud file ingestion with Auto Loader
Lakeflow Declarative Pipelines generates events when Auto Loader processes files. For Auto Loader events, the event_type
is operation_progress
and the details:operation_progress:type
is either AUTO_LOADER_LISTING
or AUTO_LOADER_BACKFILL
. The details:operation_progress
object also includes status
, duration_ms
, auto_loader_details:source_path
, and auto_loader_details:num_files_listed
fields.
The following example queries Auto Loader events for the latest update. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Monitor data backlog for optimizing streaming duration
Lakeflow Declarative Pipelines tracks how much data is present in the backlog in the details:flow_progress.metrics.backlog_bytes
object. Events containing backlog metrics have the event type flow_progress
. The following example queries backlog metrics for the last pipeline update. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Note
The backlog metrics might not be available depending on the pipeline's data source type and Databricks Runtime version.
Monitor autoscaling events for optimizing classic compute
For Lakeflow Declarative Pipelines that use classic compute (in other words, do not use serverless compute), the event log captures cluster resizes when enhanced autoscaling is enabled in your pipelines. Events containing information about enhanced autoscaling have the event type autoscale
. The cluster resizing request information is stored in the details:autoscale
object.
The following example queries the enhanced autoscaling cluster resize requests for the last pipeline update. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Monitor compute resource utilization for classic compute
cluster_resources
events provide metrics on the number of task slots in the cluster, how much those task slots are used, and how many tasks are waiting to be scheduled.
When enhanced autoscaling is enabled, cluster_resources
events also contain metrics for the autoscaling algorithm, including latest_requested_num_executors
, and optimal_num_executors
. The events also show the status of the algorithm as different states such as CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
, and BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
This information can be viewed in conjunction with the autoscaling events to provide an overall picture of enhanced autoscaling.
The following example queries the task queue size history, utilization history, executor count history, and other metrics and state for autoscaling in the last pipeline update. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Audit Lakeflow Declarative Pipelines
You can use Lakeflow Declarative Pipelines event log records and other Azure Databricks audit logs to get a complete picture of how data is being updated in Lakeflow Declarative Pipelines.
Lakeflow Declarative Pipelines uses the credentials of the pipeline owner to run updates. You can change the credentials used by updating the pipeline owner. Lakeflow Declarative Pipelines records the user for actions on the pipeline, including pipeline creation, edits to configuration, and triggering updates.
See Unity Catalog events for a reference of Unity Catalog audit events.
Query user actions in the event log
You can use the event log to audit events, for example, user actions. Events containing information about user actions have the event type user_action
.
Information about the action is stored in the user_action
object in the details
field. Use the following query to construct an audit log of user events. This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Runtime information
You can view runtime information for a pipeline update, for example, the Databricks Runtime version for the update, This assumes that you have created the event_log_raw
view for the pipeline you are interested in, as described in Query the event log.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11.0 |