Compartilhar via


create_sink

Importante

A API do pipeline create_sink está em Visualização Pública.

A função create_sink() grava em um serviço de streaming de eventos, como Apache Kafka ou Hubs de Eventos do Azure ou em uma tabela Delta de um pipeline declarativo. Depois de criar um coletor com a função create_sink(), use o coletor em um fluxo de acréscimo para gravar dados no coletor. o fluxo de acréscimo é o único tipo de fluxo com suporte na função create_sink(). Não há suporte para outros tipos de fluxo, como create_auto_cdc_flow. Para obter detalhes sobre outros tipos de coletores no Lakeflow Spark Declarative Pipelines, consulte Sinks in Lakeflow Spark Declarative Pipelines.

O coletor Delta dá suporte às tabelas externas e gerenciadas do Catálogo do Unity e às tabelas gerenciadas do metastore do Hive. Os nomes de tabela devem ser totalmente qualificados. Por exemplo, as tabelas do Catálogo do Unity devem usar um identificador de três camadas: <catalog>.<schema>.<table>. As tabelas do metastore do Hive devem usar <schema>.<table>.

Observação

  • A execução de uma atualização completa não limpa os dados dos coletores. Todos os dados reprocessados serão acrescentados ao coletor e os dados existentes não serão alterados.
  • Não há suporte para expectativas com a sink API.

Sintaxe

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parâmetros

Parâmetro Tipo Description
name str Obrigatório Uma cadeia de caracteres que identifica o coletor e é usada para referenciar e gerenciar o coletor. Os nomes dos "sinks" devem ser exclusivos dentro do pipeline, incluindo todos os arquivos de código-fonte que são parte do pipeline.
format str Obrigatório Uma cadeia de caracteres que define o formato de saída, kafka ou delta.
options dict Uma lista de opções de coletor, formatada como {"key": "value"}, em que a chave e o valor são cadeias de caracteres. Todas as opções do Databricks Runtime compatíveis com os coletores Kafka e Delta têm suporte.

Exemplos

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)