Поделиться через


Журнал событий конвейера

Журнал событий конвейера содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход выполнения конвейера и происхождения данных. С помощью журнала событий можно отслеживать, анализировать и контролировать состояние конвейеров данных.

Записи журнала событий можно просматривать в пользовательском интерфейсе мониторинга конвейера, REST API Конвейеров или напрямую запрашивая журнал событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.

Можно также определить пользовательские действия для выполнения при регистрации событий, например, отправки оповещений, используя перехватчики событий.

Это важно

Не удаляйте журнал событий или родительский каталог или схему, в которой публикуется журнал событий. Удаление журнала событий может привести к сбою обновления конвейера во время будущих запусков.

Полные сведения о схеме журнала событий см. в разделе "Схема журнала событий конвейера".

запрос журнала событий

Замечание

В этом разделе описывается поведение и синтаксис по умолчанию для работы с журналами событий для конвейеров, настроенных с каталогом Unity и режимом публикации по умолчанию.

По умолчанию конвейер записывает журнал событий в скрытую таблицу Delta в каталоге по умолчанию и схеме, настроенной для конвейера. Несмотря на скрытие, таблица по-прежнему может запрашиваться всеми достаточно привилегированными пользователями. По умолчанию только владелец конвейера может запрашивать таблицу журнала событий.

Чтобы запросить журнал событий в качестве владельца, используйте идентификатор конвейера:

SELECT * FROM event_log(<pipelineId>);

По умолчанию имя скрытого журнала событий отформатировано как event_log_{pipeline_id}, где идентификатор конвейера является назначенным системой идентификатором UUID с дефисами, замененными символами подчеркивания. Таблица журнала событий появилась в system.information_schema.tables, но не видна в обозревателе каталогов или других страницах интерфейса рабочей области. Доступ к нему необходимо получить с помощью event_log() функции.

Журнал событий можно опубликовать, изменив дополнительные параметры для конвейера. Дополнительные сведения см. в разделе "Параметры конвейера" для журнала событий. При публикации журнала событий укажите имя журнала событий и при необходимости укажите каталог и схему, как показано в следующем примере:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

Расположение журнала событий также служит местоположением схемы для любых запросов Auto Loader в потоке данных. Databricks рекомендует создать представление по таблице журнала событий перед изменением привилегий, так как некоторые параметры вычислений могут позволить пользователям получить доступ к метаданным схемы, если таблица журнала событий предоставляется напрямую. В следующем примере синтаксиса создается представление таблицы журнала событий и используется в примерах запросов журнала событий, включенных в эту статью. Замените <catalog_name>.<schema_name>.<event_log_table_name> на полное имя таблицы журнала событий вашего конвейера. Если вы опубликовали журнал событий, используйте имя, указанное при публикации. В противном случае используйте event_log(<pipelineId>) там, где pipelineId — это идентификатор конвейера, который требуется запросить.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

В каталоге Unity представления поддерживают потоковые запросы. В следующем примере используется структурированная потоковая передача для запроса представления, определенного в верхней части таблицы журнала событий:

df = spark.readStream.table("event_log_raw")

Примеры базовых запросов

В следующих примерах показано, как запросить журнал событий, чтобы получить общие сведения о конвейерах и помочь отладке распространенных сценариев.

Мониторинг обновлений конвейера путем запроса предыдущих обновлений

В следующем примере выполняется запрос обновлений (или запусков) конвейера с идентификатором обновления, состоянием, временем начала, временем завершения и длительностью. Это дает обзор запусков для конвейера.

Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в разделе "Запрос журнала событий".

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Отладка проблем с инкрементальным обновлением в материализованном представлении

Этот пример запрашивает все потоки из последнего обновления конвейера. Он показывает, были ли они постепенно обновлены или нет, а также другие соответствующие сведения о планировании, которые полезны для отладки, почему добавочное обновление не происходит.

Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в разделе "Запрос журнала событий".

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Запрос затрат на обновление конвейера

В этом примере показано, как запрашивать использование DBU для конвейера, а также пользователя для заданного запуска конвейера.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Усложненные запросы

В следующих примерах показано, как запросить журнал событий для обработки менее распространенных или более сложных сценариев.

Метрики запросов для всех потоков в конвейере

В этом примере показано, как запрашивать подробные сведения о каждом потоке в конвейере. В нем показаны имя потока, длительность обновления, метрики качества данных и информация о записях, обработанных (выходные записи, удаленные, обновленные и вставленные, и отброшенные записи).

Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в разделе "Запрос журнала событий".

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Запрос метрик качества данных или ожиданий

Если вы определяете ожидания для наборов данных в конвейере, метрики для количества переданных и неудачных записей хранятся в объекте details:flow_progress.data_quality.expectations . Метрика для количества удаленных записей хранится в объекте details:flow_progress.data_quality . События, содержащие сведения о качестве данных, имеют тип события flow_progress.

Метрики качества данных могут быть недоступны для некоторых наборов данных. См. ограничения ожидания.

Доступны следующие метрики качества данных:

Единица измерения Description
dropped_records Количество записей, которые были удалены из-за сбоя одного или нескольких ожиданий.
passed_records Количество записей, которые прошли критерии ожидания.
failed_records Количество записей, которые не выполнили критерии ожидания.

В следующем примере запрашивается метрика качества данных для последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Сведения о происхождении запросов

События, содержащие сведения о происхождении, имеют тип события flow_definition. Объект details:flow_definition содержит output_dataset и input_datasets, определяющие каждую связь в графе.

Используйте следующий запрос, чтобы извлечь входные и выходные наборы данных, чтобы просмотреть сведения о происхождении. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Мониторинг приема облачных файлов с помощью автозагрузчика

Конвейеры данных генерируют события при обработке файлов с помощью Auto Loader. Для событий загрузчика, event_typeoperation_progress, а details:operation_progress:type — это либо AUTO_LOADER_LISTING, либо AUTO_LOADER_BACKFILL. Объект details:operation_progress также включает поля status, duration_ms, auto_loader_details:source_pathи auto_loader_details:num_files_listed.

В следующем примере выполняется запрос событий автозагрузчика для последнего обновления. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Отслеживание очереди данных для оптимизации времени потоковой передачи

Каждый конвейер отслеживает объем данных, находящихся в очереди задач в объекте details:flow_progress.metrics.backlog_bytes. События, содержащие метрики невыполненной работы, имеют тип события flow_progress. В следующем примере запрашиваются метрики отставания для последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Замечание

Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии Databricks Runtime.

Мониторинг событий автомасштабирования для оптимизации классических вычислений

Для конвейеров, использующих классические вычисления (иными словами, не используйте бессерверные вычисления), журнал событий фиксирует изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale. Сведения об изменении размера кластера хранятся в объекте details:autoscale.

В следующем примере выполняется запрос на изменение размеров в увеличенном кластере автомасштабирования для последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Мониторинг использования вычислительных ресурсов для классических вычислений

cluster_resources события предоставляют метрики по количеству слотов задач в кластере, количеству используемых слотов задач и количеству задач, ожидающих планирования.

Если включен расширенный автомасштабирование, cluster_resources события также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executorsи optimal_num_executors. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSи BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.

В следующем примере выполняется запрос истории размера очереди задач, истории использования, истории количества исполнителей и других метрик и состояний для автомасштабирования в рамках последнего обновления конвейера. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Мониторинг метрик поточной обработки конвейера

Вы можете просмотреть метрики о ходе выполнения потока в конвейере. Запрос событий stream_progress для получения событий, похожих на метрики StreamingQueryListener , созданные структурированной потоковой передачей, со следующими исключениями:

  • Следующие метрики присутствуют в StreamingQueryListener, но не в stream_progress: numInputRows, inputRowsPerSecond и processedRowsPerSecond.
  • Для потоков Kafka и Kinesis поля startOffset, endOffset и latestOffset могут быть слишком большими и усечены. Для каждого из этих полей добавляется дополнительное поле ...Truncated, startOffsetTruncated, endOffsetTruncated, и latestOffsetTruncated с логическим значением, указывающим на то, усечены ли данные.

Для запроса stream_progress событий можно использовать запрос, например следующий:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Ниже приведен пример события в формате JSON:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

В этом примере показаны неусеченные записи в источнике Kafka с полями, установленными в ...Truncatedfalse.

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

Аудит конвейеров

Вы можете использовать записи журнала событий и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в конвейере.

Декларативные конвейеры Spark Lakeflow используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Журнал аудита регистрирует действия пользователя в конвейере, включая создание конвейера, изменения конфигурации и запуск обновлений.

См. события каталога Unity для справки о событиях аудита каталога Unity.

Запрос действий пользователя в журнале событий

Можно использовать журнал событий для аудита событий, например действий пользователя. События, содержащие сведения о действиях пользователя, имеют тип события user_action.

Сведения о действии хранятся в объекте user_action в поле details. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Предполагается, что вы создали event_log_raw представление для интересующего конвейера, как описано в журнале событий запроса.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Сведения о среде выполнения

Вы можете просмотреть информацию о среде выполнения обновления конвейера, например, версию Databricks Runtime для данного обновления. Предполагается, что вы уже создали event_log_raw представление, связанное с интересующим вас конвейером, как это описано в разделе "Запрос журнала событий".

SELECT origin.update_id, details:runtime_details:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'runtime_details'
update_id dbr_version
1234abcd-ef56-7890-abcd-ef1234abcd56 18,0