Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Azure Databricks обеспечивает встроенную поддержку сериализации и десериализации между структурами Apache Spark и буферами протокола (protobuf). Поддержка Protobuf реализована как преобразователь кадра данных Apache Spark и может использоваться с структурированной потоковой передачей или для пакетных операций.
Десериализация и сериализация буферов протокола
В Databricks Runtime 12.2 LTS и более поздних версиях можно использовать from_protobuf и to_protobuf функции для сериализации и десериализации данных. Сериализация Protobuf обычно используется в стриминговых рабочих нагрузках.
Базовый синтаксис функций protobuf аналогичен функциям чтения и записи. Прежде чем использовать эти функции, необходимо импортировать эти функции.
from_protobuf выполняет конвертацию двоичного столбца в структуру, а to_protobuf выполняет конвертацию столбца структуры в двоичный формат. Необходимо указать реестр схем, указанный аргументом options , или файл дескриптора, определенный descFilePath аргументом.
Питон
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
язык программирования Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
В следующих примерах показана обработка двоичных записей protobuf с помощью from_protobuf() и преобразование структуры Spark SQL в двоичный protobuf с to_protobuf().
Используйте protobuf с реестром схем Confluent
Azure Databricks поддерживает использование реестра схем Confluent для определения Protobuf.
Питон
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
язык программирования Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Аутентификация во внешнем реестре схем Confluent
Чтобы выполнить проверку подлинности во внешнем реестре схем Confluent, обновите параметры реестра схем, чтобы включить учетные данные проверки подлинности и ключи API.
Питон
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
язык программирования Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Использование файлов truststore и хранилища ключей в томах каталога Unity
В Databricks Runtime 14.3 LTS и более поздних версиях можно использовать файлы truststore и keystore в разделах каталога Unity для аутентификации в реестр схем Confluent. Обновите параметры реестра схем в соответствии со следующим примером:
Питон
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
язык программирования Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Использование Protobuf с дескрипторным файлом
Вы также можете ссылаться на файл дескриптора protobuf, доступный для вычислительного кластера. Убедитесь, что у вас есть правильные разрешения на чтение файла в зависимости от его расположения.
Питон
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
язык программирования Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Поддерживаемые параметры в функциях Protobuf
Следующие параметры поддерживаются в функциях Protobuf.
режим. Определяет способ обработки ошибок при десериализации записей Protobuf. Ошибки могут быть вызваны различными типами неправильных записей, включая несоответствие между фактической схемой записи и ожидаемой схемой, предоставленной в
from_protobuf().-
Значения:
-
FAILFAST(по умолчанию): ошибка возникает при обнаружении неправильно сформированной записи, и задача завершается сбоем. -
PERMISSIVE: значение NULL возвращается для неправильно сформированных записей. Используйте этот параметр тщательно, так как это может привести к удалению множества записей. Это полезно, если небольшая доля записей в источнике неверны.
-
-
Значения:
recursive.fields.max.depth: добавляет поддержку рекурсивных полей. Схемы SQL Spark не поддерживают рекурсивные поля. Если этот параметр не указан, рекурсивные поля не допускаются. Чтобы поддерживать рекурсивные поля в Protobufs, их необходимо расширить до указанной глубины.
Значения:
-1 (по умолчанию): не допускаются рекурсивные поля.
0. Рекурсивные поля удаляются.
1. Разрешает один уровень рекурсии.
[2-10]: укажите пороговое значение для нескольких рекурсий до 10 уровней.
Установка значения больше 0 позволяет расширять вложенные поля до заданной глубины, обеспечивая рекурсивность. Значения, превышающие 10, не допускаются, чтобы избежать непреднамеренного создания очень больших схем. Если сообщение Protobuf имеет глубину, превышающую заданное ограничение, возвращаемая структура Spark будет усечена после достижения этого лимита рекурсии.
Пример. Рассмотрим protobuf со следующим рекурсивным полем:
message Person { string name = 1; Person friend = 2; }Ниже перечислены конечные схемы с различными значениями для этого параметра:
- Для параметра задано значение 1:
STRUCT<name: STRING> - Для параметра задано значение 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>> - Для параметра задано значение 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Для параметра задано значение 1:
convert.any.fields.to.json. Этот параметр позволяет преобразовать поля Protobuf Any в JSON. Эту функцию следует тщательно включить. Преобразование и обработка JSON неэффективны. Кроме того, строковое поле JSON теряет безопасность схемы Protobuf, что делает обработку нижестоящей подверженной ошибкам.
Значения:
- False (по умолчанию): во время выполнения такие поля подстановочных знаков могут содержать произвольные сообщения Protobuf в виде двоичных данных. По умолчанию такие поля обрабатываются как обычное сообщение Protobuf. В нем есть два поля со схемой
(STRUCT<type_url: STRING, value: BINARY>). По умолчанию двоичноеvalueполе не интерпретируется каким-либо образом. Но двоичные данные могут оказаться не удобными на практике для работы в некоторых приложениях. - True. При задании этого значения значение true позволяет преобразовывать
Anyполя в строки JSON во время выполнения. С помощью этого параметра двоичный файл анализируется, и сообщение Protobuf десериализировано в строку JSON.
- False (по умолчанию): во время выполнения такие поля подстановочных знаков могут содержать произвольные сообщения Protobuf в виде двоичных данных. По умолчанию такие поля обрабатываются как обычное сообщение Protobuf. В нем есть два поля со схемой
Пример. Рассмотрим два типа Protobuf, определенные следующим образом:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }Если этот параметр включен, схема
from_protobuf("col", messageName ="ProtoWithAny")будет:STRUCT<event_name: STRING, details: STRING>Во время выполнения, если
detailsполе содержитPersonсообщение Protobuf, возвращаемое значение выглядит следующим образом:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')Требования.
- Определения всех возможных типов Protobuf, используемых в
Anyполях, должны быть доступны в файле дескриптора Protobuf, переданном вfrom_protobuf(). - Если
AnyProtobuf не обнаружен, это приведет к ошибке в записи. - Эта функция в настоящее время не поддерживается в реестре схем.
- Определения всех возможных типов Protobuf, используемых в
emit.default.values: Позволяет отрисовывать поля с нулевыми значениями при десериализации Protobuf в структуру Spark. Этот параметр следует использовать экономно. Обычно не рекомендуется зависеть от таких более тонких различий в семантике.
Значения
- False (по умолчанию): если поле пусто в сериализованном Protobuf, результирующее поле структуры Spark по умолчанию равно NULL. Проще не включить этот параметр и рассматривать
nullкак значение по умолчанию. - True: если этот параметр включен, такие поля заполняются соответствующими значениями по умолчанию.
- False (по умолчанию): если поле пусто в сериализованном Protobuf, результирующее поле структуры Spark по умолчанию равно NULL. Проще не включить этот параметр и рассматривать
Пример. Рассмотрим следующий Protobuf, который построен следующим образом
Person(age=0, middle_name=""):syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }- Если для этого параметра задано значение False, то после вызова
from_protobuf()все значения Spark будут null:{"name": null, "age": null, "middle_name": "", "salary": null}Несмотря на то, что в двух полях (ageиmiddle_name) заданы значения, Protobuf не включает их в wire-формат, так как они являются значениями по умолчанию. - Если для этого параметра установлено значение True, структура Spark после вызова
from_protobuf()станет{"name": "", "age": 0, "middle_name": "", "salary": null}. Полеsalaryостается пустым, так как оно явно объявленоoptional, и оно не задано во входной записи.
- Если для этого параметра задано значение False, то после вызова
enums.as.ints: при включении поля перечислений в Protobuf отображаются как целочисленные поля в Spark.
Значения
- False (по умолчанию)
- Верно: При включении поля перечислений в Protobuf отображаются как целочисленные поля в Spark.
Пример. Рассмотрим следующий protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }Дано сообщение Protobuf, например
Person(job = ENGINEER):- Если этот параметр отключен, соответствующая структура Spark будет
{"job": "ENGINEER"}. - Если этот параметр включен, соответствующая структуру Spark будет иметь значение
{"job": 1}.
Обратите внимание, что схема этих полей отличается в каждом случае (целое число, а не строка по умолчанию). Такое изменение может повлиять на схему подчиненных таблиц.
- Если этот параметр отключен, соответствующая структура Spark будет
Параметры реестра схем
При использовании реестра схем с функциями Protobuf используются следующие параметры реестра схем.
-
schema.registry.subject
- Обязательное поле
- Указывает тему схемы в реестре схем, например "client-event"
-
schema.registry.address
- Обязательное поле
- URL-адрес реестра схем, например
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Необязательно
- По умолчанию:
<NONE>. - Запись в реестре схем для объекта может содержать несколько определений Protobuf, как и один
protoфайл. Если этот параметр не указан, первый Protobuf используется для схемы. Укажите имя сообщения Protobuf, если он не первый в записи. Например, рассмотрим запись с двумя определениями Protobuf: Person и Location в этом порядке. Если поток соответствует "Location", а не "Person", задайте для этого параметра значение Location (или полное имя, включая пакет com.example.protos.Location).
-
schema.registry.schema.evolution.mode
- Значение по умолчанию: "перезапустить".
- Поддерживаемые режимы:
- "перезапустить"
- "нет"
- Этот параметр задает режим эволюции схемы для
from_protobuf(). В начале запроса Spark записывает последний идентификатор схемы для заданной темы. Это определяет схему дляfrom_protobuf(). После запуска запроса новая схема может быть опубликована в реестре схем. Когда новый идентификатор схемы заметен во входящей записи, он указывает на изменение схемы. Этот параметр определяет, как обрабатывается такое изменение схемы:-
перезапуск (по умолчанию): активирует
UnknownFieldExceptionпри появлении нового идентификатора схемы. Это завершает запрос. Databricks рекомендует настроить задания для перезапуска при сбое запроса для получения изменений схемы. - нет: изменения идентификатора схемы игнорируются. Записи с более новым идентификатором схемы анализируются с той же схемой, которая наблюдалась в начале запроса. Ожидается, что новые определения Protobuf будут обратно совместимы, а новые поля игнорируются.
-
перезапуск (по умолчанию): активирует
-
confluent.schema.registry.
<schema-registy-client-option>- Необязательно
- Реестр схем подключается к реестру схем Confluent с помощью клиента Confluent Schema Registry. Любые параметры конфигурации, поддерживаемые клиентом, можно указать с префиксом confluent.schema.registry. Например, следующие две настройки предоставляют учетные данные для аутентификации USER_INFO.
- "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>:<SECRET>"