Compartir a través de


Ejemplos de flujos en las canalizaciones declarativas de Lakeflow Spark

Ejemplo: Escribir en una tabla de flujo continuo desde varios tópicos de Kafka

En los ejemplos siguientes se crea una tabla de streaming denominada kafka_target y se escribe en esa tabla de streaming desde dos tópicos de Kafka.

Pitón

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para obtener más información sobre la read_kafka() función con valores de tabla usada en las consultas SQL, consulte read_kafka en la referencia del lenguaje SQL.

En Python, puede crear mediante programación varios flujos que tienen como destino una sola tabla. En el ejemplo siguiente se muestra este patrón para obtener una lista de temas de Kafka.

Nota:

Este patrón tiene los mismos requisitos que el uso de un for bucle para crear tablas. Debe pasar explícitamente un valor de Python a la función que define el flujo. Consulte Creación de tablas en un for bucle.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Ejemplo: Ejecutar un relleno de datos único

Si desea ejecutar una consulta para anexar datos a una tabla de streaming existente, use append_flow.

Después de anexar un conjunto de datos existentes, tiene varias opciones:

  • Si desea que la consulta anexe nuevos datos si llegan al directorio de relleno, mantenga la consulta en su lugar.
  • Si quiere que sea una reposición única y nunca vuelva a ejecutarse, elimine la consulta después de ejecutar la canalización una vez.
  • Si desea que la consulta se ejecute una vez y solo se ejecute de nuevo en los casos en los que los datos se actualizan por completo, establezca el once parámetro True en el flujo de anexar. En SQL, use INSERT INTO ONCE.

En los ejemplos siguientes se ejecuta una consulta para anexar datos históricos a una tabla de streaming:

Pitón

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Para obtener un ejemplo más detallado, consulte Reposición de datos históricos con canalizaciones.

Ejemplo: Uso del procesamiento de flujo de anexión en lugar de UNION

En lugar de usar una consulta con una UNION cláusula, puede utilizar consultas de flujo de apéndice para combinar varios orígenes y escribir en una sola tabla de transmisión. El uso de consultas de flujo de anexión en lugar de UNION permite anexar a una tabla de streaming desde varios orígenes sin ejecutar una actualización completa.

En el ejemplo de Python siguiente se incluye una consulta que combina varios orígenes de datos con una UNION cláusula :

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

En los siguientes ejemplos, se reemplaza la consulta UNION por consultas de flujo de datos agregadas.

Pitón

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );