Поделиться через


Использование потока изменений данных Delta Lake в Azure Databricks

Поток изменений данных позволяет Azure Databricks отслеживать изменения на уровне строк между версиями таблицы Delta. Когда эта функция включена в таблице Delta, среда выполнения регистрирует события изменений для всех данных, записываемых в таблицу. Сюда входят данные строк вместе с метаданными, указывающими, была ли соответствующая строка вставлена, удалена или обновлена.

Вы можете использовать канал изменений данных для использования распространенных вариантов использования данных, включая следующие:

  • Конвейеры ETL: добавочно обрабатываются только строки, которые изменились с момента последнего запуска конвейера.
  • Следы аудита: отслеживайте изменения данных для требований соответствия требованиям и управлению.
  • Репликация данных. Синхронизация изменений в подчиненных таблицах, кэшах или внешних системах.

Внимание

Поток данных изменений работает в тандеме с историей таблиц для предоставления сведений об изменениях. Поскольку клонирование таблицы Delta создает отдельную историю, поток данных изменений в клонированных таблицах не соответствует потоку исходной таблицы.

Включение канала изменений данных

Канал изменений данных должен быть явно включен в таблицах, из которых требуется прочитать. Используйте один из следующих методов.

Новая таблица

Задайте свойство delta.enableChangeDataFeed = true таблицы в команде CREATE TABLE .

CREATE TABLE student (id INT, name STRING, age INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true)

Существующая таблица

Задайте свойство delta.enableChangeDataFeed = true таблицы в команде ALTER TABLE .

ALTER TABLE myDeltaTable
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Все новые таблицы в сеансе

Задайте конфигурацию Spark, чтобы включить поток данных изменений для всех новых таблиц, создаваемых в сеансе.

SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

Внимание

Записываются только изменения, внесенные после включения канала данных об изменениях. Прошлые изменения таблицы не записываются.

Изменение схемы канала данных

При чтении схемы из канала измененных данных для таблицы используется схема последней версии таблицы. Azure Databricks полностью поддерживает большинство операций изменения схемы и эволюции, но таблицы с включенным сопоставлением столбцов имеют ограничения. Сведения об ограничениях канала изменений для таблиц с сопоставлением столбцов.

Помимо столбцов данных из схемы таблицы Delta, канал изменений содержит столбцы метаданных, определяющие тип события изменения:

Имя столбца Тип Значения
_change_type Строка insert, update_preimage, update_postimage, delete(1)
_commit_version Длинный Журнал Delta или версия таблицы, содержащая изменение.
_commit_timestamp Метка времени Метка времени, связанная с созданием коммита.

(1)preimage — это значение перед обновлением, postimage это значение после обновления.

Невозможно включить веб-канал изменений в таблице, если схема содержит столбцы с теми же именами, что и эти столбцы метаданных. Переименуйте столбцы в таблице, чтобы устранить этот конфликт, прежде чем включить веб-канал изменений.

Инкрементная обработка данных об изменениях

Databricks рекомендует использовать поток изменений в сочетании со структурированной потоковой передачей для постепенной обработки изменений из таблиц Delta. Чтобы автоматически отслеживать версии канала изменений данных вашей таблицы, необходимо использовать структурированную потоковую обработку в Azure Databricks. Сведения об обработке CDC с таблицами типа 1 или типа 2 см. в API AUTO CDC: упрощение захвата измененных данных с помощью конвейеров.

Установите параметр readChangeFeed в true при настройке потока для чтения канала данных изменений таблицы, как показано в следующем примере синтаксиса.

Питон

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

язык программирования Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Поведение по умолчанию

При первом запуске потока возвращается последний моментальный снимок таблицы в виде INSERT записей, а затем возвращает будущие изменения в виде измененных данных. Изменения данных фиксируются в рамках транзакции Delta Lake и становятся доступными одновременно с фиксацией новых данных в таблице.

Дополнительные параметры

При необходимости можно указать начальную версию (см. раздел "Указать начальную версию") или использовать пакетное выполнение (см. раздел "Чтение изменений в пакетных запросах"). Azure Databricks также поддерживает ограничения скорости (maxFilesPerTrigger, maxBytesPerTrigger) и excludeRegex при чтении измененных данных.

Для версий, отличных от начального моментального снимка, ограничение скорости применяется атомарно к целым фиксациям: каждая фиксация либо включается в текущий пакет, либо откладывается до следующего пакета.

Указание начальной версии

Чтобы считывать изменения из определенной точки, укажите начальную версию с помощью метки времени или номера версии. Начальные версии данных необходимы для пакетных операций чтения. При необходимости можно указать конечную версию, чтобы ограничить диапазон. Для получения дополнительной информации об истории таблиц Delta Lake см. «Что такое путешествие во времени?».

При настройке структурированной потоковой обработки рабочих нагрузок с использованием канала изменений важно понять, как указание начальной версии влияет на обработку.

  • Новые конвейеры обработки данных обычно используют поведение по умолчанию, которое записывает все существующие записи в таблице как INSERT операции при первом запуске потока.
  • Если целевая таблица уже содержит все записи с соответствующими изменениями до определенной точки, укажите начальную версию, чтобы избежать обработки состояния исходной таблицы в виде INSERT событий.

В следующем примере показан синтаксис восстановления после сбоя потоковой передачи, в котором контрольная точка повреждена. В этом примере предполагается следующее:

  1. Канал изменений данных включен в исходной таблице при создании таблицы.
  2. Целевая конечная таблица обработала все изменения включительно до версии 75.
  3. Журнал версий исходной таблицы доступен для версий 70 и выше.

Питон

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

язык программирования Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

В этом примере также необходимо указать новое расположение контрольной точки.

Внимание

Если указать начальную версию, поток не может начинаться с новой контрольной точки, если начальная версия больше не присутствует в журнале таблиц. Delta Lake автоматически очищает исторические версии, что означает, что все указанные начальные версии в конечном итоге удаляются.

См. историю таблицы реплея.

Чтение изменений в пакетных запросах

С помощью синтаксиса пакетного запроса можно считывать все изменения, начиная с конкретной версии или считывать изменения в заданном диапазоне версий.

  • Укажите версии в виде целых чисел и меток времени в виде строк в формате yyyy-MM-dd[ HH:mm:ss[.SSS]].
  • Начальные и конечные версии являются включающими.
  • Чтобы прочитать исходную версию до последней версии, укажите только начальную версию.
  • Указание версии перед включением веб-канала измененных данных вызывает ошибку.

В следующих примерах синтаксиса показано использование параметров начальной и конечной версии с пакетными считываниями:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name,
-- with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Питон

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

язык программирования Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Обработка версий вне диапазона

По умолчанию при указании версии или метки времени, превышающей последнюю фиксацию, возникает ошибка timestampGreaterThanLatestCommit. В Databricks Runtime 11.3 LTS и более поздних версиях можно включить отказоустойчивость для устаревших версий:

SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Если этот параметр включен, выполните следующие действия.

  • Начальная версия и метка времени за пределами последней фиксации: возвращает пустой результат.
  • Конечная версия и метка времени за пределами последней фиксации: возвращает все изменения с начала последней фиксации.

Запись изменений данных

Delta Lake записывает изменения данных эффективно и может использовать другие функции Delta Lake для оптимизации представления хранилища.

Рекомендации по хранению

  • Затраты на хранение. Включение канала данных изменений может привести к небольшому увеличению затрат на хранение, так как изменения могут записываться в отдельных файлах.
  • Операции без файлов данных изменений: некоторые операции (только вставка, полносекционные удаления) не создают файлы изменений данных. Azure Databricks рассчитывает поток изменений непосредственно из журнала транзакций.
  • Хранение. Изменение файлов данных следует политике хранения таблицы. Команда VACUUM удаляет их и изменяет журнал транзакций после хранения контрольных точек.

Не пытайтесь восстановить канал измененных данных, напрямую запрашивая файлы измененных данных. Всегда используйте API Delta Lake.

Журнал таблиц воспроизведения

Поток данных изменений не предназначен для постоянного архива всех изменений в таблице. Он записывает только изменения, возникающие после включения, и вы можете запустить новое потоковое чтение для записи текущей версии и всех последующих изменений.

Записи в потоке измененных данных являются временными и доступны только в течение указанного периода хранения. Журнал транзакций Delta Lake удаляет версии таблиц и соответствующие версии потоков изменений данных через регулярные интервалы. При удалении версии вы больше не сможете прочитать веб-канал измененных данных для этой версии.

Архивирование данных об изменениях для постоянного журнала

Если ваш сценарий использования требует сохранения постоянной истории всех изменений в таблице, используйте инкрементальную логику для записи данных из канала изменения данных в новую таблицу. В следующем примере показано использование trigger.AvailableNow для обработки доступных данных в виде пакетной рабочей нагрузки для аудита или полной воспроизведения:

Питон

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

язык программирования Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Изменение ограничений канала данных для таблиц с сопоставлением столбцов

С включенным сопоставлением столбцов в таблице Delta можно удалять или переименовать столбцы без перезаписи файлов данных. Однако поток изменений данных имеет ограничения после неаддитивных изменений схемы, таких как переименование или удаление столбцов, изменение типов данных или изменение допустимости NULL.

  • Пакетная семантика: Вы не можете считывать поток данных изменений для транзакции или диапазона, в котором происходит изменение неаддитивной схемы.
  • Databricks Runtime 12.2 LTS и более ранние версии: Таблицы с включенным сопоставлением столбцов, которые имеют изменения схемы, не являющиеся добавлением, не поддерживают потоковую обработку операций чтения при использовании канала изменений данных. См. сопоставление столбцов и потоковую передачу.
  • Databricks Runtime 11.3 LTS и ниже: невозможно прочитать канал данных изменений для таблиц, где включено сопоставление столбцов и имело место переименование или удаление столбцов.

В Databricks Runtime 12.2 LTS и более поздних версиях вы можете выполнять пакетное чтение канала данных изменений для таблиц с включенным сопоставлением столбцов, в которых произошли недобавочные изменения схемы. Операции чтения используют схему конечной версии, указанной в запросе, а не последнюю версию таблицы. Запросы по-прежнему завершаются ошибкой, если диапазон версий охватывает неаддитивное изменение схемы.