Compartir a través de


Carga y procesamiento de datos de forma incremental con flujos de canalizaciones declarativas de Spark de Lakeflow

Los datos se procesan en canalizaciones a través de flujos. Cada flujo consta de una consulta y, normalmente, un destino. El flujo procesa la consulta, ya sea por lotes o de manera incremental como un flujo de datos hacia el destino. Un flujo reside dentro de una canalización en las Canalizaciones Declarativas de Lakeflow Spark.

Normalmente, los flujos se definen automáticamente al crear una consulta en una canalización que actualiza un destino, pero también puede definir explícitamente flujos adicionales para un procesamiento más complejo, como anexar a un único destino de varios orígenes.

Actualizaciones

Se ejecuta un flujo cada vez que se actualiza la canalización que lo define. El flujo creará o actualizará tablas con los datos más recientes disponibles. Según el tipo de flujo y el estado de los cambios en los datos, la actualización puede realizar una actualización incremental, que procesa solo registros nuevos o realizar una actualización completa, que vuelve a procesar todos los registros del origen de datos.

Creación de un flujo predeterminado

Al crear una canalización, normalmente se define una tabla o una vista junto con la consulta que lo admite. Por ejemplo, en esta consulta SQL, se crea una tabla de streaming denominada customers_silver leyendo desde la tabla denominada customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

También puede crear la misma tabla de streaming en Python. En Python, se utilizan canalizaciones al crear una función de consulta que devuelve un dataframe, con decoradores para añadir la funcionalidad de Lakeflow Spark Declarative Pipelines.

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

En este ejemplo, ha creado una tabla de streaming. También puede crear vistas materializadas con una sintaxis similar en SQL y Python. Para obtener más información, consulte tablas en streaming y vistas materializadas.

En este ejemplo se crea un flujo predeterminado junto con la tabla de streaming. El flujo predeterminado de una tabla de streaming es un flujo de adición, que agrega nuevas filas con cada desencadenador. Esta es la manera más común de usar canalizaciones: cree un flujo y el destino en un solo paso. Puede usar este estilo para ingerir datos o transformar datos.

Los flujos de anexión también admiten el procesamiento que requiere leer datos de varios orígenes de streaming para actualizar un único destino. Por ejemplo, puede utilizar la funcionalidad de flujo de adición cuando disponga de una tabla y un flujo de streaming existentes y desee incorporar un nuevo origen de streaming que escriba en esta tabla de streaming existente.

Uso de varios flujos para escribir en un único destino

En el ejemplo anterior, creó un flujo y una tabla de streaming en un solo paso. También puede crear flujos para una tabla creada anteriormente. En este ejemplo, puede ver la creación de una tabla y el flujo asociado a ella en pasos independientes. Este código tiene resultados idénticos a la creación de un flujo predeterminado, incluido el uso del mismo nombre para la tabla de streaming y el flujo.

Pitón

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

La creación de un flujo independientemente del destino significa que también puede crear varios flujos que anexan datos al mismo destino.

Use el @dp.append_flow decorador en la interfaz de Python o en la CREATE FLOW...INSERT INTO cláusula de la interfaz SQL para crear un nuevo flujo, por ejemplo, para tener como destino una tabla de streaming desde varios orígenes de streaming. Use flujo de anexar para procesar tareas como las siguientes:

  • Agregue orígenes de streaming que anexe datos a una tabla de streaming existente sin necesidad de una actualización completa. Por ejemplo, puede tener una tabla que combine datos regionales de cada región en la que opera. A medida que se implementan nuevas regiones, puede agregar los nuevos datos de región a la tabla sin realizar una actualización completa. Para ver un ejemplo de cómo agregar fuentes de streaming a una tabla de streaming existente, consulte Ejemplo: Escribir en una tabla de streaming desde múltiples tópicos de Kafka.
  • Actualice una tabla de streaming anexando datos históricos que faltan (reposición). Puede usar la sintaxis INSERT INTO ONCE para crear un complemento de relleno histórico que se ejecute una sola vez. Por ejemplo, tiene una tabla de streaming existente escrita en un tema de Apache Kafka. También tiene datos históricos almacenados en una tabla que necesita insertar exactamente una vez en la tabla de streaming y no puede transmitir los datos porque el procesamiento incluye realizar una agregación compleja antes de insertar los datos. Para obtener un ejemplo de reposición, consulte Reposición de datos históricos con canalizaciones.
  • Combine datos de varios orígenes y escriba en una sola tabla de transmisión en lugar de usar la cláusula UNION en una consulta. El uso del proceso de adición en lugar de UNION le permite actualizar la tabla de destino de forma incremental sin ejecutar una actualización completa. Para obtener un ejemplo de una unión realizada de esta manera, vea Ejemplo: Uso del procesamiento de flujo de anexión en lugar de UNION.

El destino de los registros que genera el procesamiento del flujo de anexión puede ser una tabla existente o una tabla nueva. Para las consultas de Python, use la función create_streaming_table() para crear una tabla de destino.

En el ejemplo siguiente se agregan dos flujos para el mismo destino, creando una unión de las dos tablas de origen:

Pitón

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Importante

  • Si necesita definir restricciones de calidad de datos con expectativas, defina las expectativas en la tabla de destino como parte de la create_streaming_table() función o en una definición de tabla existente. No se pueden definir expectativas en la @append_flow definición.
  • Los flujos se identifican mediante un nombre de flujo y este nombre se usa para identificar los puntos de control de streaming. El uso del nombre del flujo para identificar el punto de control significa lo siguiente:
    • Si se cambia el nombre de un flujo existente en una canalización, el punto de control no se transfiere y el flujo renombrado se considera efectivamente un flujo completamente nuevo.
    • No se puede reutilizar un nombre de flujo en una canalización, ya que el punto de control existente no coincidirá con la nueva definición de flujo.

Tipos de flujos

Los flujos predeterminados para las tablas de streaming y las vistas materializadas son flujos anexados. También puede crear flujos para leer desde fuentes de datos de captura de datos de cambios. En la tabla siguiente se describen los distintos tipos de flujos.

Tipo de flujo Description
Append Los flujos de anexión son el tipo de flujo más común, donde los nuevos registros del origen se escriben en el destino con cada actualización. Corresponden al modo de adición en streaming estructurado. Puede agregar la ONCE marca , que indica una consulta por lotes cuyos datos se deben insertar en el destino una sola vez, a menos que el destino se actualice por completo. Cualquier número de flujos de anexión puede escribir en un destino determinado.
Los flujos predeterminados (creados con la tabla de streaming de destino o la vista materializada) tendrán el mismo nombre que el destino. Otros destinos no tienen flujos predeterminados.
CDC automático (anteriormente aplicar cambios) Un flujo CDC automático ingiere una consulta que contiene datos de captura de datos de cambio (CDC). Los flujos CDC automáticos solo pueden dirigirse a tablas de streaming y el origen debe ser un origen de streaming (incluso en el caso de los flujos ONCE). Varios flujos CDC automáticos pueden tener como destino una sola tabla de streaming. Una tabla de streaming que actúa como destino para un flujo CDC automático solo puede ser dirigida por otros flujos CDC automáticos.
Para obtener más información sobre los datos de CDC, consulte Las API de AUTO CDC: Simplifique la captura de datos de cambios con canalizaciones.

Información adicional

Para obtener más información sobre los flujos y su uso, consulte los temas siguientes: