Поделиться через


ETL в Databricks SQL

При работе с большими объемами данных требуется конвейер, который может обрабатывать только новые и измененные записи вместо повторной обработки всего набора данных. Это называется добавочным ETL. В Databricks SQL можно создавать добавочные конвейеры ETL с помощью потоковых таблиц и материализованных представлений без написания процедурного кода или планирования обновлений вручную.

В этом руководстве описан общий шаблон: отслеживание изменений продукта с течением времени. Вы создаете исходную таблицу, фиксируете события изменений, создаете таблицу измерений, которая сохраняет полную историю каждого продукта и добавляет статистический уровень отчетов сверху.

Ключевым компонентом этого руководства является AUTO CDC. В традиционном хранилище вам пришлось бы писать сложные MERGE INTO инструкции для согласования событий вставки, обновления и удаления в целевую таблицу. Этот подход подвержен ошибкам, особенно когда события приходят в неправильном порядке. AUTO CDC обрабатывает это для вас. Вы указываете бизнес-ключ, столбец последовательности, а также выбираете, хотите ли использовать SCD Type 1 (только последнее значение) или SCD Type 2 (полная история), и Azure Databricks автоматически применяет правильную логику слияния. Общие сведения о CDC см. в api-интерфейсах AUTO CDC: упрощение отслеживания измененных данных с помощью конвейеров.

К концу этого руководства вы будете иметь следующее:

  1. Создана исходная таблица, которая отслеживает изменения с помощью потока данных об изменениях.
  2. Проверьте необработанные данные об изменениях, чтобы понять поток событий CDC.
  3. Используется AUTO CDC для создания таблицы измерений SCD Типа 2 из этих событий.
  4. События удаления обрабатываются поэтапно через конвейер.
  5. Создано материализованное представление, которое пошагово обновляет сводный отчет.
  6. Настроено SCHEDULE REFRESH EVERY 1 DAY , чтобы изменения распространялись автоматически через конвейер.

Требования

Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:

Шаг 1. Настройка каталога и схемы

Откройте редактор SQL Databricks и задайте рабочий каталог и схему. У вас должно быть разрешение на USE выбор каталога и схемы:

USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

Шаг 2. Создание исходной таблицы и загрузка данных

Создайте таблицу products с включенным каналом данных Use Delta Lake change feed on Azure Databricks (CDF). CDF — это функция Delta Lake, которая записывает каждую вставку, обновление и удаление в виде запрашиваемого журнала изменений. Это похоже на поток CDC из исходной системы транзакций, за исключением того, что изменения фиксируются непосредственно в таблице Delta, а не из внешнего журнала. Здесь вы используете CDF для создания событий изменений, которые будет использоваться подчиненным конвейером.

  1. Создайте таблицу и загрузите начальные записи:

    CREATE OR REPLACE TABLE products (
      product_id INT,
      product_name STRING,
      category STRING,
      warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);
    
    INSERT INTO products VALUES
      (1, 'Spoon', 'Cutlery', 'Seattle'),
      (2, 'Fork', 'Cutlery', 'Portland'),
      (3, 'Knife', 'Cutlery', 'Denver'),
      (4, 'Chair', 'Furniture', 'Austin'),
      (5, 'Table', 'Furniture', 'Chicago'),
      (6, 'Lamp', 'Lighting', 'Boston'),
      (7, 'Mug', 'Kitchenware', 'Seattle'),
      (8, 'Plate', 'Kitchenware', 'Atlanta'),
      (9, 'Bowl', 'Kitchenware', 'Dallas'),
      (10, 'Glass', 'Kitchenware', 'Phoenix');
    
  2. Имитируйте изменения вышестоящего потока, включая новые продукты, перемещение склада и переназначение категории:

    INSERT INTO products VALUES
      (11, 'Napkin', 'Dining', 'San Francisco'),
      (12, 'Coaster', 'Dining', 'New York');
    
    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;
    

Шаг 3. Запрос потока измененных данных

Прежде чем создавать низовой конвейер, полезно просмотреть события сырого изменения, чтобы понять, что AUTO CDC будет обрабатываться. Функция table_changes() считывает журнал CDF и возвращает каждую записанную операцию вместе со столбцами метаданных:

SELECT
  product_id, product_name, warehouse,
  _change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

Например, у Ложки есть три события: одно insert (Сиэтл), другое update_preimage (Сиэтл) и еще одно update_postimage (Лос-Анджелес).

Обратите внимание, что одно логическое изменение (например, перемещение ложки в другое хранилище) создает несколько событий: предварительное создание и запись. В традиционном хранилище вы напишете MERGE инструкцию, чтобы примирить все эти события в целевую таблицу, обрабатывать вставки, обновления и удаления с отдельной логикой и убедиться, что события применяются в правильном порядке. Это именно сложность, которая AUTO CDC устраняется на следующем шаге.

Шаг 4. Создание измерения SCD Типа 2 с помощью AUTO CDC

Это важно

AUTO CDC находится в бета-версии. Требуется Databricks Runtime 17.3 или более поздней версии.

Потоковая таблица обрабатывает данные постепенно. При каждом обновлении он считывает только новые строки с момента последнего запуска, поэтому не требуется повторно обрабатывать полный набор данных. Это делает его хорошо подходящим для большого объема или часто меняющихся источников.

AUTO CDC добавляет обработку отслеживания измененных данных поверх потоковой таблицы. Вместо написания инструкции MERGE INTO, которая вручную обрабатывает вставки, обновления и удаления, вы объявляете бизнес-ключ и последовательный столбец и позволяет Azure Databricks применить правильную логику. AUTO CDC также обрабатывает события вне порядка автоматически, что является распространенной проблемой при обработке MERGE INTO событий, поступающих из распределенных систем или пакетной загрузки с перекрывающимися метками времени.

Следующая инструкция создает таблицу SCD Type 2, которая сохраняет полную историю версий каждого продукта. Каждая версия получает __START_AT и __END_AT метки времени. NULL в __END_AT помечает текущую версию.

CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAY: обновляет таблицу по ежедневному расписанию.
  • FLOW AUTO CDC: обозначает это как поток CDC. Azure Databricks автоматически применяет семантику вставки, обновления и удаления.
  • KEYS (product_id): бизнес-ключ. События с тем же ключом объединяются в версионные строки.
  • APPLY AS DELETE WHEN _change_type = 'delete': закрывает текущую версию при поступлении события удаления. Это позволяет определить условие, определяющее событие удаления.
  • SEQUENCE BY _commit_timestamp: устанавливает порядок событий. Правильно обрабатывает поступления вне порядка.
  • STORED AS SCD TYPE 2: сохраняет полную историю. AUTO CDC поддерживает как SCD Type 1, так и SCD Type 2.

Запросите таблицу измерений:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • Ложка: две версии. Сиэтл (закрыт, набор __END_AT) и Лос-Анджелес (действующий, __END_AT = NULL).
  • Вилка: две версии. Категория столовых приборов (закрытая) и категория "Столовая" (текущая).
  • Салфетка и Подставка: по одной версии каждой (только что вставленные, __END_AT = NULL).
  • Все остальные продукты: одна версия каждой (__END_AT = NULL).

Шаг 5. Процесс удаления через конвейер

Теперь имитируйте два прекращенных продукта, удалив их из исходной таблицы:

DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

Эти события удаления записываются в журнал CDF, но стриминговая таблица еще не видела их. Обновите таблицу потоковой передачи, чтобы обработать новые события:

REFRESH STREAMING TABLE products_history;

Запросите таблицу измерений, чтобы убедиться, что были применены удаления:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

Чаша и бокал теперь считаются закрытыми с комплектом __END_AT, отметив их как снятые с производства. Все остальные текущие продукты остаются неизменными. Таблица потоковой передачи обрабатывает только новые события удаления без повторной обработки вставок и обновлений из предыдущего обновления.

Шаг 6. Создание статистического материализованного представления

Теперь, когда у вас есть таблица измерений, которая остается текущей с изменениями источника, можно добавить слой отчетов сверху.

Материализованное представление сохраняет предварительно вычисляемые результаты запроса в виде физической таблицы. В отличие от обычного представления, которое повторно выполняет запрос при каждом чтении из него, материализованное представление сохраняет результаты и пересчитывает только строки, затронутые изменениями в исходных данных, при каждом обновлении. Он хорошо подходит для панелей мониторинга и отчетов, где важна производительность запросов.

CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
  category,
  COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY означает, что это представление обновляется по ежедневному расписанию. В сочетании с тем же расписанием в таблице потоковой передачи теперь есть трехэтапный конвейер, в котором изменения в исходной таблице каскадируются через измерение и агрегируются в каждом цикле обновления. Нет возможности выполнить обновление вручную.

SELECT * FROM products_by_category ORDER BY active_products DESC;

Шаг 7. Проверка сквозного каскада

Чтобы проверить полный каскад конвейера, внесите изменения в исходную таблицу:

UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

Нож перемещается из Денвера в Сиэтл. Это одно изменение DML активирует полный каскад конвейера, демонстрируя, как три этапа работают вместе:

  1. products записывает событие изменения через CDF.
  2. products_history обрабатывает событие и добавляет новую версию для ножа.
  3. products_by_category пересчитывает только затронутую строку столовых приборов.

Проверить.

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

Очистка

Чтобы очистить ресурсы, созданные этим руководством, используйте следующий SQL:

DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

Дополнительные ресурсы