Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Журнал событий конвейера содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход выполнения конвейера и происхождения данных. С помощью журнала событий можно отслеживать, анализировать и контролировать состояние конвейеров данных.
Записи журнала событий можно просматривать в пользовательском интерфейсе мониторинга конвейера, REST API Конвейеров или напрямую запрашивая журнал событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.
Можно также определить пользовательские действия для выполнения при регистрации событий, например, отправки оповещений, используя перехватчики событий.
Это важно
Не удаляйте журнал событий или родительский каталог или схему, в которой публикуется журнал событий. Удаление журнала событий может привести к сбою обновления конвейера во время будущих запусков.
Полные сведения о схеме журнала событий см. в разделе "Схема журнала событий конвейера".
запрос журнала событий
Замечание
В этом разделе описывается поведение и синтаксис по умолчанию для работы с журналами событий для конвейеров, настроенных с каталогом Unity и режимом публикации по умолчанию.
- Сведения о поведении конвейеров каталога Unity, использующих устаревший режим публикации, см. в статье Работа с журналом событий для конвейеров устаревшей публикации каталога Unity.
- Сведения о поведении и синтаксисе конвейеров хранилища метаданных Hive см. в статье Работа с журналом событий для конвейеров хранилища метаданных Hive.
По умолчанию конвейер записывает журнал событий в скрытую таблицу Delta в каталоге по умолчанию и схеме, настроенной для конвейера. Несмотря на скрытие, таблица по-прежнему может запрашиваться всеми достаточно привилегированными пользователями. По умолчанию только владелец конвейера может запрашивать таблицу журнала событий.
Чтобы запросить журнал событий в качестве владельца, используйте идентификатор конвейера:
SELECT * FROM event_log(<pipelineId>);
По умолчанию имя скрытого журнала событий отформатировано как event_log_{pipeline_id}, где идентификатор конвейера является назначенным системой идентификатором UUID с дефисами, замененными символами подчеркивания. Таблица журнала событий появилась в system.information_schema.tables, но не видна в обозревателе каталогов или других страницах интерфейса рабочей области. Доступ к нему необходимо получить с помощью event_log() функции.
Журнал событий можно опубликовать, изменив дополнительные параметры для конвейера. Дополнительные сведения см. в разделе "Параметры конвейера" для журнала событий. При публикации журнала событий укажите имя журнала событий и при необходимости укажите каталог и схему, как показано в следующем примере:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Расположение журнала событий также служит местоположением схемы для любых запросов Auto Loader в потоке данных. Databricks рекомендует создать представление по таблице журнала событий перед изменением привилегий, так как некоторые параметры вычислений могут позволить пользователям получить доступ к метаданным схемы, если таблица журнала событий предоставляется напрямую. В следующем примере синтаксиса создается представление таблицы журнала событий и используется в примерах запросов журнала событий, включенных в эту статью. Замените <catalog_name>.<schema_name>.<event_log_table_name> на полное имя таблицы журнала событий вашего конвейера. Если вы опубликовали журнал событий, используйте имя, указанное при публикации. В противном случае используйте event_log(<pipelineId>) там, где pipelineId — это идентификатор конвейера, который требуется запросить.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
В каталоге Unity представления поддерживают потоковые запросы. В следующем примере используется структурированная потоковая передача для запроса представления, определенного в верхней части таблицы журнала событий:
df = spark.readStream.table("event_log_raw")
Примеры базовых запросов
В следующих примерах показано, как запросить журнал событий, чтобы получить общие сведения о конвейерах и помочь отладке распространенных сценариев.
Мониторинг обновлений конвейера путем запроса предыдущих обновлений
В следующем примере выполняется запрос обновлений (или запусков) конвейера с идентификатором обновления, состоянием, временем начала, временем завершения и длительностью. Это дает обзор запусков для конвейера.
Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в разделе "Запрос журнала событий".
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Отладка проблем с инкрементальным обновлением в материализованном представлении
Этот пример запрашивает все потоки из последнего обновления конвейера. Он показывает, были ли они постепенно обновлены или нет, а также другие соответствующие сведения о планировании, которые полезны для отладки, почему добавочное обновление не происходит.
Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в разделе "Запрос журнала событий".
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Запрос затрат на обновление конвейера
В этом примере показано, как запрашивать использование DBU для конвейера, а также пользователя для заданного запуска конвейера.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Усложненные запросы
В следующих примерах показано, как запросить журнал событий для обработки менее распространенных или более сложных сценариев.
Метрики запросов для всех потоков в конвейере
В этом примере показано, как запрашивать подробные сведения о каждом потоке в конвейере. В нем показаны имя потока, длительность обновления, метрики качества данных и информация о записях, обработанных (выходные записи, удаленные, обновленные и вставленные, и отброшенные записи).
Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в разделе "Запрос журнала событий".
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Запрос метрик качества данных или ожиданий
Если вы определяете ожидания для наборов данных в конвейере, метрики для количества переданных и неудачных записей хранятся в объекте details:flow_progress.data_quality.expectations . Метрика для количества удаленных записей хранится в объекте details:flow_progress.data_quality . События, содержащие сведения о качестве данных, имеют тип события flow_progress.
Метрики качества данных могут быть недоступны для некоторых наборов данных. См. ограничения ожидания.
Доступны следующие метрики качества данных:
| Единица измерения | Description |
|---|---|
dropped_records |
Количество записей, которые были удалены из-за сбоя одного или нескольких ожиданий. |
passed_records |
Количество записей, которые прошли критерии ожидания. |
failed_records |
Количество записей, которые не выполнили критерии ожидания. |
В следующем примере запрашивается метрика качества данных для последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Сведения о происхождении запросов
События, содержащие сведения о происхождении, имеют тип события flow_definition. Объект details:flow_definition содержит output_dataset и input_datasets, определяющие каждую связь в графе.
Используйте следующий запрос, чтобы извлечь входные и выходные наборы данных, чтобы просмотреть сведения о происхождении. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Мониторинг приема облачных файлов с помощью автозагрузчика
Конвейеры данных генерируют события при обработке файлов с помощью Auto Loader. Для событий загрузчика, event_type — operation_progress, а details:operation_progress:type — это либо AUTO_LOADER_LISTING, либо AUTO_LOADER_BACKFILL. Объект details:operation_progress также включает поля status, duration_ms, auto_loader_details:source_pathи auto_loader_details:num_files_listed.
В следующем примере выполняется запрос событий автозагрузчика для последнего обновления. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Отслеживание очереди данных для оптимизации времени потоковой передачи
Каждый конвейер отслеживает объем данных, находящихся в очереди задач в объекте details:flow_progress.metrics.backlog_bytes. События, содержащие метрики невыполненной работы, имеют тип события flow_progress. В следующем примере запрашиваются метрики отставания для последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Замечание
Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии Databricks Runtime.
Мониторинг событий автомасштабирования для оптимизации классических вычислений
Для конвейеров, использующих классические вычисления (иными словами, не используйте бессерверные вычисления), журнал событий фиксирует изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale. Сведения об изменении размера кластера хранятся в объекте details:autoscale.
В следующем примере выполняется запрос на изменение размеров в увеличенном кластере автомасштабирования для последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Мониторинг использования вычислительных ресурсов для классических вычислений
cluster_resources события предоставляют метрики по количеству слотов задач в кластере, количеству используемых слотов задач и количеству задач, ожидающих планирования.
Если включен расширенный автомасштабирование, cluster_resources события также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executorsи optimal_num_executors. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSи BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.
В следующем примере выполняется запрос истории размера очереди задач, истории использования, истории количества исполнителей и других метрик и состояний для автомасштабирования в рамках последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Мониторинг метрик поточной обработки конвейера
Вы можете просмотреть метрики о ходе выполнения потока в конвейере. Запрос событий stream_progress для получения событий, похожих на метрики StreamingQueryListener , созданные структурированной потоковой передачей, со следующими исключениями:
- Следующие метрики присутствуют в
StreamingQueryListener, но не вstream_progress:numInputRows,inputRowsPerSecondиprocessedRowsPerSecond. - Для потоков Kafka и Kinesis поля
startOffset,endOffsetиlatestOffsetмогут быть слишком большими и усечены. Для каждого из этих полей добавляется дополнительное поле...Truncated,startOffsetTruncated,endOffsetTruncated, иlatestOffsetTruncatedс логическим значением, указывающим на то, усечены ли данные.
Для запроса stream_progress событий можно использовать запрос, например следующий:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Ниже приведен пример события в формате JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
В этом примере показаны неусеченные записи в источнике Kafka с полями, установленными в ...Truncatedfalse.
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Аудит конвейеров
Вы можете использовать записи журнала событий и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в конвейере.
Декларативные конвейеры Spark Lakeflow используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Журнал аудита регистрирует действия пользователя в конвейере, включая создание конвейера, изменения конфигурации и запуск обновлений.
См. события каталога Unity для справки о событиях аудита каталога Unity.
Запрос действий пользователя в журнале событий
Можно использовать журнал событий для аудита событий, например действий пользователя. События, содержащие сведения о действиях пользователя, имеют тип события user_action.
Сведения о действии хранятся в объекте user_action в поле details. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Сведения о среде выполнения
Вы можете просмотреть информацию о среде выполнения обновления конвейера, например, версию Databricks Runtime для данного обновления. Предполагается, что вы уже создали event_log_raw представление, связанное с интересующим вас конвейером, как это описано в разделе "Запрос журнала событий".
SELECT origin.update_id, details:runtime_details:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'runtime_details'
update_id |
dbr_version |
|---|---|
1234abcd-ef56-7890-abcd-ef1234abcd56 |
18,0 |