Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Область применения:
Databricks SQL
Это важно
Эта функция доступна в бета-версии. Требуется Databricks Runtime 17.3 и более поздней версии.
FLOW AUTO CDC Используйте предложение для CREATE STREAMING TABLE обработки записей отслеживания измененных данных (CDC) из источника в потоковую таблицу.
Ранее инструкция MERGE INTO часто использовалась для обработки записей CDC в Azure Databricks.
MERGE INTO Однако может привести к неправильным результатам из-за неупорядоченных записей или требуется сложная логика для повторного упорядочивания записей.
AUTO CDC упрощает CDC путем автоматической обработки записей вне порядка. Ключи для идентификации записей, столбца последовательности для упорядочивания и хранения результатов в формате SCD типа 1 (прямые обновления) или SCD типа 2 (отслеживание журнала).
Синтаксис
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Поведение по умолчанию для INSERT и UPDATE событий заключается в том, чтобы обновить все строки в целевой таблице, соответствующие указанным ключам, или вставить новую строку, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE можно задать с условием APPLY AS DELETE WHEN.
Параметры
sourceИсточник данных. Источник должен быть источником потоковой передачи. Используйте ключевое слово
STREAMдля применения семантики потоковой передачи при чтении из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления.Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью конвейеров.
KEYSСтолбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Значения в этих столбцах используются для определения того, какие события CDC применяются к определенным записям в целевой таблице.
Чтобы определить сочетание столбцов, используйте разделенный запятыми список столбцов.
Это предложение является обязательным.
IGNORE NULL UPDATESПозволяет получать обновления, содержащие подмножество целевых столбцов. Если событие CDC соответствует существующей строке и
IGNORE NULL UPDATESуказано, столбцы соnullзначением сохраняют существующие значения в целевом объекте. Это также относится к вложенным столбцам со значениемnull.Это предложение является необязательным.
Значение по умолчанию — перезаписать существующие столбцы со значениями
null.APPLY AS DELETE WHENУказывает, когда событие CDC следует рассматривать как
DELETEвместо upsert.Для источников SCD типа 2 для обработки данных, поступающих не по порядку, удаленная строка временно сохраняется как метка удаления в базовой таблице Delta, а представление создается в хранилище метаданных, которое фильтрует эти метки удаления. Интервал хранения можно настроить с помощью
pipelines.cdc.tombstoneGCThresholdInSecondsсвойства таблицы.Это предложение является необязательным.
APPLY AS TRUNCATE WHENУказывает, когда событие CDC должно рассматриваться как полная таблица
TRUNCATE. Поскольку эта команда активирует полную очистку целевой таблицы, её следует использовать только для конкретных случаев, требующих этой функции.Предложение
APPLY AS TRUNCATE WHENподдерживается только для SCD типа 1. SCD типа 2 не поддерживает операцию усечения.Это предложение является необязательным.
SEQUENCE BYИмя столбца, указывающее логический порядок событий CDC в исходных данных. Обработка конвейера использует эту последовательность для обработки событий изменений, поступающих не по порядку.
Если для упорядочивания требуется несколько столбцов, используйте
STRUCTвыражение: сначала оно будет упорядочивать по первому полю структуры, затем, если значения одинаковы, по второму полю и так далее.Указанные столбцы должны быть сортируемыми типами данных.
Это предложение является обязательным.
COLUMNSУказывает подмножество столбцов для включения в целевую таблицу. Вы можете сделать одно из двух:
- Укажите полный список столбцов, которые необходимо включить:
COLUMNS (userId, name, city) - Укажите список столбцов, которые следует исключить:
COLUMNS * EXCEPT (operation, sequenceNum)
Это предложение является необязательным.
По умолчанию все столбцы включаются в целевую таблицу, если не указано условие
COLUMNS.- Укажите полный список столбцов, которые необходимо включить:
STORED ASСледует ли хранить записи как SCD типа 1 или SCD типа 2.
Это предложение является необязательным.
Значение по умолчанию — SCD тип 1.
TRACK HISTORY ONЗадает подмножество выходных столбцов для создания записей журнала при наличии изменений в указанных столбцах. Вы можете сделать одно из двух:
- Укажите полный список столбцов для отслеживания:
COLUMNS (userId, name, city) - Укажите список столбцов, которые следует исключить из отслеживания:
COLUMNS * EXCEPT (operation, sequenceNum)
Это предложение является необязательным. По умолчанию ведется история изменений для всех выходных столбцов при любом изменении, что эквивалентно
TRACK HISTORY ON *.- Укажите полный список столбцов для отслеживания:
Примеры
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);