Compartir a través de


NotebookUtils ejecución del notebook y orquestación

Use las utilidades del cuaderno para ejecutar un cuaderno, ejecutar varios cuadernos en paralelo o salir de un cuaderno con un valor. Ejecute el siguiente comando para obtener información general sobre los métodos disponibles:

notebookutils.notebook.help()

En la tabla siguiente se enumeran los métodos de ejecución y orquestación de cuadernos disponibles:

Método Signature Descripción
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Ejecuta un cuaderno y devuelve su valor de salida.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Ejecuta varios cuadernos simultáneamente con compatibilidad con las relaciones de dependencia.
validateDAG validateDAG(dag: Any): bool Valida si una definición de DAG está estructurada correctamente.
exit exit(value: str): None Sale del cuaderno actual con un valor.

Para las operaciones CRUD de cuadernos (crear, obtener, actualizar, eliminar, enumerar), consulte Administración de artefactos de cuadernos.

Nota:

El parámetro config en runMultiple() solo está disponible en Python. Scala y R no admiten este parámetro.

Nota:

Las utilidades de cuaderno no son aplicables a las definiciones de trabajo de Apache Spark (SJD).

Referencia a un cuaderno

El run() método hace referencia a un cuaderno y devuelve su valor de salida. Puede ejecutar llamadas de funciones anidadas en un notebook de manera interactiva o en una canalización. El cuaderno al que se hace referencia se ejecuta en el grupo de Spark del cuaderno que invoca esta función.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Por ejemplo:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Valor de retorno

El run() método devuelve la cadena exacta que se pasa al notebookutils.notebook.exit(value) bloc de notas secundario. Si exit() no se llama en el notebook secundario, se devuelve una cadena vacía ("").

Los cuadernos de Fabric también admiten la referencia a cuadernos entre áreas de trabajo especificando el identificador del área de trabajo.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Abra el enlace de captura en la salida de la celda para revisar la ejecución de referencia. La instantánea captura los resultados de ejecución y le ayuda a depurar el cuaderno al que se hace referencia.

Captura de pantalla del resultado de la ejecución de referencia.

Captura de pantalla de un ejemplo de instantánea.

Configura cuadernos hijos para recibir parámetros

Al crear un cuaderno secundario al que se llama a través de run() o runMultiple(), configure una celda de parámetros para que el cuaderno pueda recibir argumentos del cuaderno principal.

  1. Cree una celda de código con valores de parámetro predeterminados.
  2. Marque la celda como celda de parámetro seleccionando Marcar celda como parámetro en la interfaz de usuario del notebook.
  3. Durante la ejecución, los valores de celda del parámetro se reemplazan con los argumentos pasados desde el elemento principal.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Sugerencia

Los valores de salida siempre son cadenas. Si necesita un valor numérico en el cuaderno primario, convierta el resultado después de la recuperación (por ejemplo, int(result)).

Consideraciones

  • El cuaderno de referencia entre áreas de trabajo es compatible con la versión 1.2 y posteriores del tiempo de ejecución.
  • Si utiliza los archivos del Recurso del Cuaderno, use notebookutils.nbResPath en el cuaderno de referencia para asegurarse de que apunta a la misma carpeta que en la ejecución interactiva.
  • La ejecución de referencia permite que los cuadernos secundarios se ejecuten solo si usan el mismo lakehouse que el principal, heredan el lakehouse del principal, o ninguno de los dos define uno. La ejecución se bloquea si el elemento secundario especifica una instancia de lakehouse diferente a la del cuaderno primario. Para omitir esta comprobación, establezca useRootDefaultLakehouse: True en los argumentos .
  • No llames notebookutils.notebook.exit(value) dentro de un try-catch bloque. La llamada de salida no surtirá efecto cuando esté envuelta en el manejo de excepciones.

Referencia para ejecutar múltiples notebooks en paralelo

Use notebookutils.notebook.runMultiple() para ejecutar varios cuadernos en paralelo o en una estructura topológica predefinida. La API usa una implementación multiproceso dentro de una sesión de Spark, lo que significa que el cuaderno al que se hace referencia comparte recursos de proceso.

Con notebookutils.notebook.runMultiple(), puede:

  • Ejecute varios cuadernos simultáneamente, sin esperar a que finalice cada uno.

  • Especifique las dependencias y el orden de ejecución de los cuadernos mediante un formato JSON simple.

  • Optimice el uso de recursos de proceso de Spark y reduzca el costo de los proyectos de Fabric.

  • Vea las instantáneas del registro de ejecución de cada cuaderno en la salida y supervise o depure las tareas del cuaderno de manera conveniente.

  • Obtenga el valor de salida de cada actividad ejecutiva y úselos en tareas descendentes.

Ejecute notebookutils.notebook.help("runMultiple") para ver más ejemplos y detalles de uso.

Ejecución de una lista sencilla de cuadernos

En el ejemplo siguiente se ejecuta una lista de cuadernos en paralelo:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

El resultado de la ejecución del cuaderno raíz es el siguiente:

Captura de pantalla de una lista de cuadernos.

Valor de retorno

El runMultiple() método devuelve un diccionario donde cada clave es el nombre de la actividad y cada valor es un diccionario con las claves siguientes:

  • exitVal: la cadena devuelta por la llamada del bloc de notas secundario exit() o una cadena vacía si no se llamó a exit().
  • exception: objeto de error si se produjo un error en la actividad o None si se realizó correctamente.

Ejecutar cuadernos con una estructura DAG

En el ejemplo siguiente se ejecutan cuadernos en una estructura DAG mediante notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

El resultado de la ejecución del cuaderno raíz es el siguiente:

Captura de pantalla de una lista de referencia de cuadernos con parámetros.

Referencia de parámetros DAG

En la tabla siguiente se describe cada campo que puede usar en la definición de DAG:

Campo Level Obligatorio Descripción
activities Raíz Lista de objetos de actividad que definen los cuadernos que se van a ejecutar.
timeoutInSeconds Raíz No Tiempo de espera máximo para todo el DAG. El valor predeterminado es 43200 (12 horas).
concurrency Raíz No Número máximo de cuadernos que se van a ejecutar simultáneamente. El valor predeterminado es 3 veces el número de núcleos de CPU disponibles. Establezca este valor explícitamente si necesita un control más estricto o use 0 para la simultaneidad ilimitada.
name Actividad Un nombre único para la actividad. Se usa para identificar los resultados y definir dependencias.
path Actividad Nombre o ruta de acceso del elemento del cuaderno a ejecutar.
timeoutPerCellInSeconds Actividad No Tiempo de espera máximo para cada celda del cuaderno secundario. El valor predeterminado es de 90 segundos.
args Actividad No Diccionario de parámetros que se van a pasar al cuaderno secundario.
workspace Actividad No Nombre o identificador del área de trabajo donde reside el cuaderno. De forma predeterminada, el cuaderno secundario se ejecuta en la misma área de trabajo que el autor de la llamada.
retry Actividad No Número de reintentos si se produce un error en la actividad. El valor predeterminado es 0.
retryIntervalInSeconds Actividad No Tiempo de espera en segundos entre los intentos de reintento. El valor predeterminado es 0.
dependencies Actividad No Lista de nombres de actividad que deben completarse antes de que se inicie esta actividad.

Valores de salida de referencia entre actividades

Puede hacer referencia al valor de salida de una actividad de dependencia en el args campo mediante la @activity() expresión . Este patrón le permite pasar datos entre cuadernos de un DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Sugerencia

Use la @activity('activity_name').exitValue() expresión en el args campo para pasar los resultados de una actividad a otra dentro de un DAG.

Creación de un DAG dinámico

Puede generar estructuras DAG mediante programación para escenarios como el procesamiento de distribución ramificada en varias particiones:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Validar un DAG

Use validateDAG() para comprobar que la estructura DAG es válida antes de la ejecución. Detecta problemas como nombres de actividad duplicados, dependencias que faltan y referencias circulares.

notebookutils.notebook.validateDAG(DAG)

Valor de retorno

El validateDAG() método devuelve True si la estructura DAG es válida o genera una excepción si se produce un error en la validación.

Sugerencia

Llame siempre a validateDAG() antes de runMultiple() en los flujos de trabajo de producción para detectar los errores estructurales a tiempo.

Manejo de fallos de runMultiple

El runMultiple() método devuelve un diccionario donde cada clave es el nombre de la actividad y cada valor contiene una exitVal (cadena) y un exception (objeto de error o None). Puede inspeccionar los resultados parciales incluso cuando se produce un error en algunas actividades:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Consideraciones

  • El grado de paralelismo de la ejecución de varios cuadernos está restringido al recurso de proceso total disponible de una sesión de Spark.
  • El número predeterminado de cuadernos simultáneos es 3 veces el número de núcleos de CPU disponibles. Puede personalizar este valor, pero el paralelismo excesivo podría provocar problemas de estabilidad y rendimiento debido al uso elevado de recursos de proceso. Si surgen problemas, considere dividir los cuadernos en múltiples llamadas de runMultiple o disminuir la simultaneidad ajustando el campo de simultaneidad en el parámetro DAG.
  • El tiempo de espera predeterminado para todo el DAG es de 12 horas y el tiempo de espera predeterminado para cada celda de un cuaderno secundario es de 90 segundos. Puede cambiar el tiempo de expiración definiendo los campos timeoutInSeconds y timeoutPerCellInSeconds en el parámetro DAG.
  • Configure retry y retryIntervalInSeconds para las actividades que podrían producir errores debido a problemas transitorios, como tiempos de espera de red o falta de disponibilidad temporal del servicio.
  • Los cuadernos paralelos comparten recursos de proceso dentro de una sola sesión de Spark. Supervisar el uso de recursos para evitar la sobrecarga de memoria y la contención de CPU.

Salir de un cuaderno

El exit() método sale de un cuaderno con un valor . Puede ejecutar llamadas de funciones anidadas en un notebook de manera interactiva o en una canalización.

  • Cuando se llama a la función exit() desde un cuaderno de forma interactiva, el cuaderno de Fabric lanza una excepción, omite la ejecución de celdas posteriores y mantiene activa la sesión de Spark.

  • Al orquestar un cuaderno en una canalización que llama a la función exit(), la actividad del cuaderno devuelve un valor de salida. Esto completa la ejecución del pipeline y detiene la sesión de Spark.

  • Cuando se llama a una exit() función de un cuaderno al que se hace referencia, Fabric Spark detiene la ejecución posterior del cuaderno al que se hace referencia y continúa ejecutando las celdas siguientes del cuaderno principal que llama a la run() función. Por ejemplo: Notebook1 tiene tres celdas y llama a una función exit() en la segunda celda. Notebook2 tiene cinco celdas y ejecuta run(notebook1) en la tercera celda. Al ejecutar Notebook2, Notebook1 se detiene en la segunda celda al presionar la exit() función. Notebook2 sigue ejecutando su cuarta y quinta celda.

notebookutils.notebook.exit("value string")

Comportamiento de retorno

El exit() método no devuelve un valor. Finaliza el bloc de notas actual y pasa la cadena proporcionada al bloc de notas o la canalización que lo llamó.

Nota:

La exit() función sobrescribe la salida de la celda actual. Para evitar perder la salida de otras instrucciones de código, llame a notebookutils.notebook.exit() en una celda independiente.

Importante

No llames notebookutils.notebook.exit() dentro de un try-catch bloque. La salida no surtirá efecto cuando se encapsula en el control de excepciones. La exit() llamada debe estar en el nivel superior del código para que funcione correctamente.

Por ejemplo:

El cuaderno Sample1 tiene las dos celdas siguientes:

  • La celda 1 define un parámetro de entrada con el valor predeterminado establecido en 10.

  • La celda 2 sale del cuaderno de notas con entrada como valor de salida.

Captura de pantalla que muestra un cuaderno de muestra de la función exit.

Puede ejecutar Sample1 en otro cuaderno con los valores predeterminados:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Salida:

10

Puede ejecutar Sample1 en otro cuaderno y establecer el valor de input en 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Salida:

20