Compartir a través de


AUTO CDC APIs: Simplifica la captura de datos de cambios con canalizaciones

Las canalizaciones declarativas de Lakeflow Spark (SDP) simplifican la captura de cambios en datos (CDC) con las APIs AUTO CDC y AUTO CDC FROM SNAPSHOT.

Nota:

Las AUTO CDC API reemplazan a las APPLY CHANGES API y tienen la misma sintaxis. Las APPLY CHANGES API siguen estando disponibles, pero Databricks recomienda usar las AUTO CDC API en su lugar.

La interfaz que use depende del origen de los datos modificados:

  • Use AUTO CDC para procesar los cambios de un flujo de datos de cambios (CDF).
  • Use AUTO CDC FROM SNAPSHOT (versión preliminar pública y solo disponible para Python) para procesar los cambios en las instantáneas de base de datos.

Anteriormente, la MERGE INTO instrucción se usaba normalmente para procesar registros CDC en Azure Databricks. Sin embargo, MERGE INTO puede producir resultados incorrectos debido a registros fuera de secuencia o requiere una lógica compleja para volver a ordenar los registros.

La AUTO CDC API es compatible con las interfaces de pipeline SQL y Python. La AUTO CDC FROM SNAPSHOT API se admite en la interfaz de Python. Las AUTO CDC API no son compatibles con las canalizaciones declarativas de Apache Spark.

Tanto AUTO CDC como AUTO CDC FROM SNAPSHOT admiten la actualización de tablas utilizando SCD tipo 1 y tipo 2.

  • Use el tipo 1 de SCD para actualizar los registros directamente. El historial no se conserva para los registros actualizados.
  • Use el tipo SCD 2 para conservar un historial de registros, ya sea en todas las actualizaciones o en las actualizaciones de un conjunto de columnas especificado.

Para obtener sintaxis y otras referencias, consulte AUTO CDC para canalizaciones (SQL),AUTO CDC para canalizaciones (Python) y AUTO CDC FROM SNAPSHOT para canalizaciones (Python).

Nota:

En este artículo se describe cómo actualizar las tablas de las canalizaciones en función de los cambios en los datos de origen. Para obtener información sobre cómo registrar y consultar información de cambios de nivel de fila para tablas Delta, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.

Requisitos

Para usar las API de CDC, la canalización debe configurarse para usar SDP sin servidor o Pro las Advancedediciones SDP.

¿Cómo se implementa CDC con la API de AUTO CDC?

Mediante el control automático de registros fuera de secuencia, la API DE AUTO CDC garantiza el procesamiento correcto de registros CDC y elimina la necesidad de desarrollar lógica compleja para controlar los registros fuera de secuencia. Debe especificar una columna en los datos de origen en los que se van a secuenciar registros, que las API interpretan como una representación monotonicamente creciente de la ordenación adecuada de los datos de origen. Las canalizaciones manejan automáticamente los datos que llegan fuera de secuencia. Para los cambios de tipo 2 de SCD, las canalizaciones propagan los valores de secuenciación adecuados a las columnas __START_AT y __END_AT de la tabla de destino. Debe haber una actualización distinta por clave en cada valor de secuenciación y no se admiten los valores de secuenciación NULL.

Para realizar el procesamiento CDC con AUTO CDC, primero debe crear una tabla de streaming y, a continuación, usar la instrucción AUTO CDC ... INTO en SQL o la función create_auto_cdc_flow() en Python para especificar el origen, las claves y la secuenciación del flujo de cambios. Para crear la tabla de streaming de destino, use la CREATE OR REFRESH STREAMING TABLE instrucción en SQL o la create_streaming_table() función en Python. Consulte los ejemplos de procesamiento de tipo 1 y tipo 2 de SCD .

Para más información sobre la sintaxis, consulte la referencia de SQL de canalizaciones o la referencia de Python.

¿Cómo se implementa CDC con la AUTO CDC FROM SNAPSHOT API?

Importante

La AUTO CDC FROM SNAPSHOT API está en versión preliminar pública.

AUTO CDC FROM SNAPSHOT es una API declarativa que determina eficazmente los cambios en los datos de origen comparando una serie de instantáneas en orden y, a continuación, ejecuta el procesamiento necesario para el procesamiento CDC de los registros en las instantáneas. AUTO CDC FROM SNAPSHOT solo es compatible con la interfaz de canalizaciones de Python.

AUTO CDC FROM SNAPSHOT admite la ingesta de instantáneas de varios tipos de origen:

  • Utiliza la ingesta periódica de instantáneas para ingerir instantáneas desde una tabla o vista existente. AUTO CDC FROM SNAPSHOT tiene una interfaz sencilla y simplificada para admitir la ingesta periódica de instantáneas desde un objeto de base de datos existente. Se incorpora una nueva instantánea con cada actualización de la canalización y el tiempo de ingesta se utiliza como la versión de la instantánea. Cuando se ejecuta una tubería en modo continuo, se ingieren múltiples instantáneas cada vez que se actualiza la tubería en un período determinado por la configuración del intervalo de desencadenador para el flujo que incluye el procesamiento AUTO CDC FROM SNAPSHOT.
  • Use la ingesta de instantáneas históricas para procesar archivos que contienen instantáneas de base de datos, como instantáneas generadas a partir de una base de datos Oracle o MySQL o un almacenamiento de datos.

Para realizar el procesamiento CDC desde cualquier tipo de origen con AUTO CDC FROM SNAPSHOT, primero debe crear una tabla de flujo y, a continuación, usar la función create_auto_cdc_from_snapshot_flow() en Python para especificar la instantánea, las claves y otros argumentos necesarios para implementar el procesamiento. Consulte los ejemplos de ingesta de instantáneas periódicas e ingesta de instantáneas históricas .

Las instantáneas pasadas a la API deben estar en orden ascendente por versión. Si SDP detecta una instantánea desordenda, se produce un error.

Para obtener detalles sobre la sintaxis, consulte la referencia de Python de las canalizaciones.

Uso de varias columnas para la secuenciación

Puede secuenciar varias columnas (por ejemplo, una marca de tiempo y un identificador para romper los vínculos), puede usar un STRUCT para combinarlas: ordena por el primer campo del STRUCT primero y, en caso de un empate, tiene en cuenta el segundo campo, etc.

Ejemplo en SQL:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Ejemplo en Python:

sequence_by = struct("timestamp_col", "id_col")

Limitaciones

La columna utilizada para la secuenciación debe ser un tipo de datos ordenable.

Ejemplo: procesamiento de SCD tipo 1 y SCD tipo 2 con datos de origen de CDF

En las secciones siguientes se proporcionan ejemplos de consultas de tipo 1 y tipo 2 de SCD que actualizan las tablas de destino basadas en eventos de origen de una fuente de distribución de datos modificados que:

  1. Crea nuevos registros de usuario.
  2. Elimina un registro de usuario.
  3. Actualiza los registros de usuario. En el ejemplo del tipo 1 de SCD, las últimas UPDATE operaciones llegan tarde y se quitan de la tabla de destino, lo que muestra el control de eventos desordenados.

En los ejemplos siguientes se supone que está familiarizado con la configuración y actualización de canalizaciones. Consulte Tutorial: Compilación de una canalización de ETL mediante la captura de datos modificados.

Para ejecutar estos ejemplos, debe comenzar creando un conjunto de datos de ejemplo. Consulte Generación de datos de prueba.

A continuación se muestran los registros de entrada para estos ejemplos:

userId nombre city operation secuenciaNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancún INSERT 2
123 nulo nulo DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Si descomenta la fila final de los datos de ejemplo, se insertará el registro siguiente que especifica dónde se deben truncar los registros.

userId nombre city operation secuenciaNum
nulo nulo nulo TRUNCAR 3

Nota:

Todos los ejemplos siguientes incluyen opciones para especificar las DELETE operaciones y TRUNCATE , pero cada una es opcional.

Procesar actualizaciones del tipo 1 de SCD

En el ejemplo siguiente se muestra cómo procesar las actualizaciones del tipo 1 de SCD:

Pitón

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Después de ejecutar el ejemplo del tipo 1 de SCD, la tabla de destino contiene los siguientes registros:

userId nombre city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancún

Después de ejecutar el ejemplo de SCD tipo 1 con el registro adicional TRUNCATE , los registros 124 y 126 se truncan debido a la TRUNCATE operación en sequenceNum=3y la tabla de destino contiene el registro siguiente:

userId nombre city
125 Mercedes Guadalajara

Procesar actualizaciones del tipo 2 de SCD

En el ejemplo siguiente se muestra el procesamiento de actualizaciones del tipo 2 de SCD:

Pitón

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Después de ejecutar el ejemplo 2 de SCD, la tabla de destino contiene los siguientes registros:

userId nombre city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 nulo
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nulo
126 Lily Cancún 2 nulo

Una consulta de tipo 2 de SCD también puede especificar un subconjunto de columnas de salida a las que se va a realizar un seguimiento del historial en la tabla de destino. Los cambios en otras columnas se actualizan directamente en lugar de generar nuevos registros de historial. En el ejemplo siguiente se muestra la exclusión de la city columna del seguimiento:

En el ejemplo siguiente se muestra el uso del historial de seguimiento con el tipo 2 de SCD:

Pitón

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Después de ejecutar este ejemplo sin el registro adicional TRUNCATE , la tabla de destino contiene los siguientes registros:

userId nombre city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 nulo
125 Mercedes Guadalajara 2 nulo
126 Lily Cancún 2 nulo

Generación de datos de prueba

El código siguiente se proporciona para generar un conjunto de datos de ejemplo para su uso en las consultas de ejemplo presentes en este tutorial. Suponiendo que tiene las credenciales adecuadas para crear un nuevo esquema y crear una nueva tabla, puede ejecutar estas instrucciones con un cuaderno o Databricks SQL. El código siguiente no está diseñado para ejecutarse como parte de una definición de canalización:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Ejemplo: Procesamiento periódico de instantáneas

En el ejemplo siguiente se muestra el procesamiento del tipo 2 de SCD que ingiere instantáneas de una tabla almacenada en mycatalog.myschema.mytable. Los resultados del procesamiento se escriben en una tabla denominada target.

mycatalog.myschema.mytable registros en la marca de tiempo 2024-01-01 00:00:00

Key Importancia
1 a1
2 a2

mycatalog.myschema.mytable registros en la marca de tiempo 2024-01-01 12:00:00

Key Importancia
2 b2
3 a3
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Después de procesar las instantáneas, la tabla de destino contiene los siguientes registros:

Key Importancia __START_AT __END_AT
1 a1 01-01-2024 00:00 2024-01-01 12:00:00
2 a2 01-01-2024 00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 nulo
3 a3 2024-01-01 12:00:00 nulo

Ejemplo: Procesamiento histórico de instantáneas

En el ejemplo siguiente se muestra el procesamiento del tipo 2 de SCD que actualiza una tabla de destino basada en eventos de origen de dos instantáneas almacenadas en un sistema de almacenamiento en la nube:

Instantánea en timestamp, almacenada en /<PATH>/filename1.csv

Key Columna de seguimiento ColumnaSinSeguimiento
1 a1 b1
2 a2 b2
4 a4 b4

Instantánea en timestamp + 5, almacenada en /<PATH>/filename2.csv

Key Columna de seguimiento ColumnaSinSeguimiento
2 a2_new b2
3 a3 b3
4 a4 b4_new

En el ejemplo de código siguiente se muestra cómo procesar las actualizaciones del tipo 2 de SCD con estas instantáneas:

from pyspark import pipelines as dp

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dp.create_streaming_live_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Después de procesar las instantáneas, la tabla de destino contiene los siguientes registros:

Key Columna de seguimiento ColumnaSinSeguimiento __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 nulo
3 a3 b3 2 nulo
4 a4 b4_new 1 nulo

Agregar, cambiar o eliminar datos en una tabla de streaming de destino

Si la canalización publica tablas en el Catálogo de Unity, puede usar instrucciones de lenguaje de manipulación de datos (DML), incluidas las instrucciones insert, update, delete y merge, para modificar las tablas de streaming de destino creadas por AUTO CDC ... INTO.

Nota:

  • No se admiten instrucciones DML que modifican el esquema de tabla de una tabla de streaming. Asegúrese de que las instrucciones DML no intenten evolucionar el esquema de la tabla.
  • Las instrucciones DML que actualizan una tabla de streaming solo se pueden ejecutar en un clúster compartido de Unity Catalog o en un almacenamiento de SQL mediante Databricks Runtime 13.3 LTS y versiones posteriores.
  • Dado que el streaming requiere orígenes de datos de solo anexión, si el procesamiento requiere streaming desde una tabla de streaming de origen con cambios (por ejemplo, por instrucciones de DML), establezca la marca skipChangeCommits al leer la tabla de streaming de origen. Cuando se establezca skipChangeCommits, se omitirán las transacciones que eliminen o modifiquen registros de la tabla de origen. Si el procesamiento no requiere una tabla de streaming, puede usar una vista materializada (que no tiene la restricción de solo anexión) como tabla de destino.

Dado que Lakeflow Spark Declarative Pipelines usa una columna especificada SEQUENCE BY y propaga los valores de secuenciación adecuados a las __START_AT columnas y __END_AT de la tabla de destino (para el tipo SCD 2), debe asegurarse de que las instrucciones DML usan valores válidos para estas columnas para mantener el orden adecuado de los registros. Vea ¿Cómo se implementa CDC con la AUTO CDC API?.

Para obtener más información sobre el uso de instrucciones DML con tablas de streaming, vea Agregar, cambiar o eliminar datos en una tabla de streaming.

En el ejemplo siguiente se inserta un registro activo con una secuencia de inicio de 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Leer un flujo de datos de cambios desde una tabla de destino CDC AUTOMÁTICO

En Databricks Runtime 15.2 y versiones posteriores, puede leer una alimentación de datos de cambios de una tabla de transmisión que sea el destino de consultas AUTO CDC o AUTO CDC FROM SNAPSHOT, de la misma manera que lee un cambio de alimentación de datos de otras tablas Delta. A continuación se requieren para leer el flujo de datos de cambios de una tabla de streaming de destino:

  • La tabla de streaming de destino debe publicarse en el catálogo de Unity. Consulte Uso del catálogo de Unity con canalizaciones.
  • Para leer el flujo de cambios de datos desde la tabla de transmisión de destino, debe usar Databricks Runtime 15.2 o superior. Para leer el flujo de datos de cambios en una canalización diferente, la canalización debe configurarse para usar Databricks Runtime 15.2 o superior.

Lea el flujo de datos de cambios de una tabla de transmisión de destino que fue creada en las Lakeflow Spark Declarative Pipelines de la misma manera en que se lee un flujo de datos de cambios de otras tablas Delta. Para obtener más información sobre el uso de la funcionalidad de flujo de datos de cambios Delta, incluidos ejemplos en Python y SQL, consulte Uso del flujo de datos de cambios de Delta Lake en Azure Databricks.

Nota:

El registro de fuente de distribución de datos modificados incluye metadatos que identifican el tipo de evento de cambio. Cuando se actualiza un registro en una tabla, los metadatos de los registros de cambios asociados normalmente incluyen _change_type valores establecidos en update_preimage y eventos update_postimage.

Sin embargo, los _change_type valores son diferentes si se realizan actualizaciones en la tabla de streaming de destino que incluyen el cambio de valores de clave principal. Cuando los cambios incluyen actualizaciones de las claves principales, los _change_type campos de metadatos se establecen en eventos de insert y delete. Los cambios en las claves principales pueden ocurrir cuando se realizan actualizaciones manuales en uno de los campos clave con una instrucción UPDATE o MERGE, o, para las tablas de tipo SCD 2, cuando el campo __start_at cambia para reflejar un valor de secuencia de inicio anterior.

La AUTO CDC consulta determina los valores de clave principal, que difieren para el procesamiento de tipo 1 y SCD 2:

  • Para el procesamiento del tipo 1 de SCD y la interfaz de canalizaciones de Python, la clave principal es el valor del parámetro keys en la función create_auto_cdc_flow(). Para la interfaz SQL, la clave principal es las columnas definidas por la KEYS cláusula en la AUTO CDC ... INTO instrucción .
  • Para el tipo SCD 2, la clave principal es el keys parámetro o KEYS cláusula más el valor devuelto de la coalesce(__START_AT, __END_AT) operación, donde __START_AT y __END_AT son las columnas correspondientes de la tabla de streaming de destino.

Obtención de datos sobre los registros procesados por una consulta de CDC en tuberías

Nota:

Las siguientes métricas solo las capturan las AUTO CDC consultas y no las AUTO CDC FROM SNAPSHOT consultas.

Las siguientes métricas son capturadas por las consultas AUTO CDC.

  • num_upserted_rows: número de filas de salida actualizadas en el conjunto de datos durante una actualización.
  • num_deleted_rows: número de filas de salida existentes eliminadas del conjunto de datos durante una actualización.

La métrica num_output_rows, de salida para los flujos que no son CDC, no se captura para las consultas AUTO CDC.

¿Qué objetos de datos se usan para el procesamiento de CDC en un pipeline?

Nota:

  • Estas estructuras de datos se aplican solo al procesamiento de AUTO CDC, no al procesamiento de AUTO CDC FROM SNAPSHOT.
  • Estas estructuras de datos solo se aplican cuando la tabla de destino se publica en el metastore de Hive. Si un flujo de trabajo se publica en Unity Catalog, los usuarios no pueden acceder a las tablas de respaldo internas.

Al declarar la tabla de destino en el metastore de Hive, se crean dos estructuras de datos:

  • Vista con el nombre asignado a la tabla de destino.
  • Tabla interna de respaldo usada por la canalización para administrar el procesamiento de CDC. Esta tabla se denomina anteponiendo __apply_changes_storage_ al nombre de la tabla de destino.

Por ejemplo, si declara una tabla de destino denominada dp_cdc_target, verá una vista denominada dp_cdc_target y una tabla denominada __apply_changes_storage_dp_cdc_target en el metastore. La creación de una vista permite que las canalizaciones declarativas de Spark de Lakeflow filtren la información adicional (por ejemplo, las lápidas y las versiones) necesarias para controlar los datos desordenados. Para ver los datos procesados, realice una consulta a la vista de destino. Dado que el esquema de la __apply_changes_storage_ tabla puede cambiar para admitir futuras características o mejoras, no debe consultar la tabla para su uso en producción. Si agrega datos manualmente a la tabla, se supone que los registros vienen antes de otros cambios porque faltan las columnas de versión.

Recursos adicionales