Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Ejemplo: Escribir en una tabla de flujo continuo desde varios tópicos de Kafka
En los ejemplos siguientes se crea una tabla de streaming denominada kafka_target y se escribe en esa tabla de streaming desde dos tópicos de Kafka.
Pitón
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para obtener más información sobre la read_kafka() función con valores de tabla usada en las consultas SQL, consulte read_kafka en la referencia del lenguaje SQL.
En Python, puede crear mediante programación varios flujos que tienen como destino una sola tabla. En el ejemplo siguiente se muestra este patrón para obtener una lista de temas de Kafka.
Nota:
Este patrón tiene los mismos requisitos que el uso de un for bucle para crear tablas. Debe pasar explícitamente un valor de Python a la función que define el flujo. Consulte Creación de tablas en un for bucle.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Ejemplo: Ejecutar un relleno de datos único
Si desea ejecutar una consulta para anexar datos a una tabla de streaming existente, use append_flow.
Después de anexar un conjunto de datos existentes, tiene varias opciones:
- Si desea que la consulta anexe nuevos datos si llegan al directorio de relleno, mantenga la consulta en su lugar.
- Si quiere que sea una reposición única y nunca vuelva a ejecutarse, elimine la consulta después de ejecutar la canalización una vez.
- Si desea que la consulta se ejecute una vez y solo se ejecute de nuevo en los casos en los que los datos se actualizan por completo, establezca el
onceparámetroTrueen el flujo de anexar. En SQL, useINSERT INTO ONCE.
En los ejemplos siguientes se ejecuta una consulta para anexar datos históricos a una tabla de streaming:
Pitón
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Para obtener un ejemplo más detallado, consulte Reposición de datos históricos con canalizaciones.
Ejemplo: Uso del procesamiento de flujo de anexión en lugar de UNION
En lugar de usar una consulta con una UNION cláusula, puede utilizar consultas de flujo de apéndice para combinar varios orígenes y escribir en una sola tabla de transmisión. El uso de consultas de flujo de anexión en lugar de UNION permite anexar a una tabla de streaming desde varios orígenes sin ejecutar una actualización completa.
En el ejemplo de Python siguiente se incluye una consulta que combina varios orígenes de datos con una UNION cláusula :
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
En los siguientes ejemplos, se reemplaza la consulta UNION por consultas de flujo de datos agregadas.
Pitón
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);