Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Use las utilidades del cuaderno para ejecutar un cuaderno, ejecutar varios cuadernos en paralelo o salir de un cuaderno con un valor. Ejecute el siguiente comando para obtener información general sobre los métodos disponibles:
notebookutils.notebook.help()
En la tabla siguiente se enumeran los métodos de ejecución y orquestación de cuadernos disponibles:
| Método | Signature | Descripción |
|---|---|---|
run |
run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str |
Ejecuta un cuaderno y devuelve su valor de salida. |
runMultiple |
runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] |
Ejecuta varios cuadernos simultáneamente con compatibilidad con las relaciones de dependencia. |
validateDAG |
validateDAG(dag: Any): bool |
Valida si una definición de DAG está estructurada correctamente. |
exit |
exit(value: str): None |
Sale del cuaderno actual con un valor. |
Para las operaciones CRUD de cuadernos (crear, obtener, actualizar, eliminar, enumerar), consulte Administración de artefactos de cuadernos.
Nota:
El parámetro config en runMultiple() solo está disponible en Python. Scala y R no admiten este parámetro.
Nota:
Las utilidades de cuaderno no son aplicables a las definiciones de trabajo de Apache Spark (SJD).
Referencia a un cuaderno
El run() método hace referencia a un cuaderno y devuelve su valor de salida. Puede ejecutar llamadas de funciones anidadas en un notebook de manera interactiva o en una canalización. El cuaderno al que se hace referencia se ejecuta en el grupo de Spark del cuaderno que invoca esta función.
notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)
Por ejemplo:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Valor de retorno
El run() método devuelve la cadena exacta que se pasa al notebookutils.notebook.exit(value) bloc de notas secundario. Si exit() no se llama en el notebook secundario, se devuelve una cadena vacía ("").
Los cuadernos de Fabric también admiten la referencia a cuadernos entre áreas de trabajo especificando el identificador del área de trabajo.
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
Abra el enlace de captura en la salida de la celda para revisar la ejecución de referencia. La instantánea captura los resultados de ejecución y le ayuda a depurar el cuaderno al que se hace referencia.
Configura cuadernos hijos para recibir parámetros
Al crear un cuaderno secundario al que se llama a través de run() o runMultiple(), configure una celda de parámetros para que el cuaderno pueda recibir argumentos del cuaderno principal.
- Cree una celda de código con valores de parámetro predeterminados.
- Marque la celda como celda de parámetro seleccionando Marcar celda como parámetro en la interfaz de usuario del notebook.
- Durante la ejecución, los valores de celda del parámetro se reemplazan con los argumentos pasados desde el elemento principal.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"
Sugerencia
Los valores de salida siempre son cadenas. Si necesita un valor numérico en el cuaderno primario, convierta el resultado después de la recuperación (por ejemplo, int(result)).
Consideraciones
- El cuaderno de referencia entre áreas de trabajo es compatible con la versión 1.2 y posteriores del tiempo de ejecución.
- Si utiliza los archivos del Recurso del Cuaderno, use
notebookutils.nbResPathen el cuaderno de referencia para asegurarse de que apunta a la misma carpeta que en la ejecución interactiva. - La ejecución de referencia permite que los cuadernos secundarios se ejecuten solo si usan el mismo lakehouse que el principal, heredan el lakehouse del principal, o ninguno de los dos define uno. La ejecución se bloquea si el elemento secundario especifica una instancia de lakehouse diferente a la del cuaderno primario. Para omitir esta comprobación, establezca
useRootDefaultLakehouse: Trueen los argumentos . - No llames
notebookutils.notebook.exit(value)dentro de untry-catchbloque. La llamada de salida no surtirá efecto cuando esté envuelta en el manejo de excepciones.
Referencia para ejecutar múltiples notebooks en paralelo
Use notebookutils.notebook.runMultiple() para ejecutar varios cuadernos en paralelo o en una estructura topológica predefinida. La API usa una implementación multiproceso dentro de una sesión de Spark, lo que significa que el cuaderno al que se hace referencia comparte recursos de proceso.
Con notebookutils.notebook.runMultiple(), puede:
Ejecute varios cuadernos simultáneamente, sin esperar a que finalice cada uno.
Especifique las dependencias y el orden de ejecución de los cuadernos mediante un formato JSON simple.
Optimice el uso de recursos de proceso de Spark y reduzca el costo de los proyectos de Fabric.
Vea las instantáneas del registro de ejecución de cada cuaderno en la salida y supervise o depure las tareas del cuaderno de manera conveniente.
Obtenga el valor de salida de cada actividad ejecutiva y úselos en tareas descendentes.
Ejecute notebookutils.notebook.help("runMultiple") para ver más ejemplos y detalles de uso.
Ejecución de una lista sencilla de cuadernos
En el ejemplo siguiente se ejecuta una lista de cuadernos en paralelo:
El resultado de la ejecución del cuaderno raíz es el siguiente:
Valor de retorno
El runMultiple() método devuelve un diccionario donde cada clave es el nombre de la actividad y cada valor es un diccionario con las claves siguientes:
-
exitVal: la cadena devuelta por la llamada del bloc de notas secundarioexit()o una cadena vacía si no se llamó aexit(). -
exception: objeto de error si se produjo un error en la actividad oNonesi se realizó correctamente.
Ejecutar cuadernos con una estructura DAG
En el ejemplo siguiente se ejecutan cuadernos en una estructura DAG mediante notebookutils.notebook.runMultiple().
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "Process_1", # activity name, must be unique
"path": "NotebookSimple", # notebook item name
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
"workspace":"WorkspaceName" # both name and id are supported
},
{
"name": "Process_2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200},
"workspace":"id" # both name and id are supported
},
{
"name": "Process_1.1",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["Process_1"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
El resultado de la ejecución del cuaderno raíz es el siguiente:
Referencia de parámetros DAG
En la tabla siguiente se describe cada campo que puede usar en la definición de DAG:
| Campo | Level | Obligatorio | Descripción |
|---|---|---|---|
activities |
Raíz | Sí | Lista de objetos de actividad que definen los cuadernos que se van a ejecutar. |
timeoutInSeconds |
Raíz | No | Tiempo de espera máximo para todo el DAG. El valor predeterminado es 43200 (12 horas). |
concurrency |
Raíz | No | Número máximo de cuadernos que se van a ejecutar simultáneamente. El valor predeterminado es 3 veces el número de núcleos de CPU disponibles. Establezca este valor explícitamente si necesita un control más estricto o use 0 para la simultaneidad ilimitada. |
name |
Actividad | Sí | Un nombre único para la actividad. Se usa para identificar los resultados y definir dependencias. |
path |
Actividad | Sí | Nombre o ruta de acceso del elemento del cuaderno a ejecutar. |
timeoutPerCellInSeconds |
Actividad | No | Tiempo de espera máximo para cada celda del cuaderno secundario. El valor predeterminado es de 90 segundos. |
args |
Actividad | No | Diccionario de parámetros que se van a pasar al cuaderno secundario. |
workspace |
Actividad | No | Nombre o identificador del área de trabajo donde reside el cuaderno. De forma predeterminada, el cuaderno secundario se ejecuta en la misma área de trabajo que el autor de la llamada. |
retry |
Actividad | No | Número de reintentos si se produce un error en la actividad. El valor predeterminado es 0. |
retryIntervalInSeconds |
Actividad | No | Tiempo de espera en segundos entre los intentos de reintento. El valor predeterminado es 0. |
dependencies |
Actividad | No | Lista de nombres de actividad que deben completarse antes de que se inicie esta actividad. |
Valores de salida de referencia entre actividades
Puede hacer referencia al valor de salida de una actividad de dependencia en el args campo mediante la @activity() expresión . Este patrón le permite pasar datos entre cuadernos de un DAG.
DAG = {
"activities": [
{
"name": "Extract",
"path": "ExtractData",
"timeoutPerCellInSeconds": 120,
"args": {"source": "prod_db"}
},
{
"name": "Transform",
"path": "TransformData",
"timeoutPerCellInSeconds": 180,
"args": {
"data_path": "@activity('Extract').exitValue()"
},
"dependencies": ["Extract"]
}
]
}
results = notebookutils.notebook.runMultiple(DAG)
Sugerencia
Use la @activity('activity_name').exitValue() expresión en el args campo para pasar los resultados de una actividad a otra dentro de un DAG.
Creación de un DAG dinámico
Puede generar estructuras DAG mediante programación para escenarios como el procesamiento de distribución ramificada en varias particiones:
def create_fan_out_dag(partitions):
activities = []
for partition in partitions:
activities.append({
"name": f"Process_{partition}",
"path": "ProcessPartition",
"timeoutPerCellInSeconds": 180,
"args": {"partition": partition}
})
activities.append({
"name": "Aggregate",
"path": "AggregateResults",
"timeoutPerCellInSeconds": 120,
"dependencies": [f"Process_{p}" for p in partitions]
})
return {"activities": activities, "concurrency": 25}
partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)
results = notebookutils.notebook.runMultiple(dag)
Validar un DAG
Use validateDAG() para comprobar que la estructura DAG es válida antes de la ejecución. Detecta problemas como nombres de actividad duplicados, dependencias que faltan y referencias circulares.
Valor de retorno
El validateDAG() método devuelve True si la estructura DAG es válida o genera una excepción si se produce un error en la validación.
Sugerencia
Llame siempre a validateDAG() antes de runMultiple() en los flujos de trabajo de producción para detectar los errores estructurales a tiempo.
Manejo de fallos de runMultiple
El runMultiple() método devuelve un diccionario donde cada clave es el nombre de la actividad y cada valor contiene una exitVal (cadena) y un exception (objeto de error o None). Puede inspeccionar los resultados parciales incluso cuando se produce un error en algunas actividades:
from notebookutils.common.exceptions import RunMultipleFailedException
try:
results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
results = ex.result
for activity_name, result in results.items():
if result["exception"]:
print(f"{activity_name} failed: {result['exception']}")
else:
print(f"{activity_name} succeeded: {result['exitVal']}")
Consideraciones
- El grado de paralelismo de la ejecución de varios cuadernos está restringido al recurso de proceso total disponible de una sesión de Spark.
- El número predeterminado de cuadernos simultáneos es 3 veces el número de núcleos de CPU disponibles. Puede personalizar este valor, pero el paralelismo excesivo podría provocar problemas de estabilidad y rendimiento debido al uso elevado de recursos de proceso. Si surgen problemas, considere dividir los cuadernos en múltiples llamadas de
runMultipleo disminuir la simultaneidad ajustando el campo de simultaneidad en el parámetro DAG. - El tiempo de espera predeterminado para todo el DAG es de 12 horas y el tiempo de espera predeterminado para cada celda de un cuaderno secundario es de 90 segundos. Puede cambiar el tiempo de expiración definiendo los campos timeoutInSeconds y timeoutPerCellInSeconds en el parámetro DAG.
- Configure
retryyretryIntervalInSecondspara las actividades que podrían producir errores debido a problemas transitorios, como tiempos de espera de red o falta de disponibilidad temporal del servicio. - Los cuadernos paralelos comparten recursos de proceso dentro de una sola sesión de Spark. Supervisar el uso de recursos para evitar la sobrecarga de memoria y la contención de CPU.
Salir de un cuaderno
El exit() método sale de un cuaderno con un valor . Puede ejecutar llamadas de funciones anidadas en un notebook de manera interactiva o en una canalización.
Cuando se llama a la función
exit()desde un cuaderno de forma interactiva, el cuaderno de Fabric lanza una excepción, omite la ejecución de celdas posteriores y mantiene activa la sesión de Spark.Al orquestar un cuaderno en una canalización que llama a la función
exit(), la actividad del cuaderno devuelve un valor de salida. Esto completa la ejecución del pipeline y detiene la sesión de Spark.Cuando se llama a una
exit()función de un cuaderno al que se hace referencia, Fabric Spark detiene la ejecución posterior del cuaderno al que se hace referencia y continúa ejecutando las celdas siguientes del cuaderno principal que llama a larun()función. Por ejemplo: Notebook1 tiene tres celdas y llama a una funciónexit()en la segunda celda. Notebook2 tiene cinco celdas y ejecutarun(notebook1)en la tercera celda. Al ejecutar Notebook2, Notebook1 se detiene en la segunda celda al presionar laexit()función. Notebook2 sigue ejecutando su cuarta y quinta celda.
Comportamiento de retorno
El exit() método no devuelve un valor. Finaliza el bloc de notas actual y pasa la cadena proporcionada al bloc de notas o la canalización que lo llamó.
Nota:
La exit() función sobrescribe la salida de la celda actual. Para evitar perder la salida de otras instrucciones de código, llame a notebookutils.notebook.exit() en una celda independiente.
Importante
No llames notebookutils.notebook.exit() dentro de un try-catch bloque. La salida no surtirá efecto cuando se encapsula en el control de excepciones. La exit() llamada debe estar en el nivel superior del código para que funcione correctamente.
Por ejemplo:
El cuaderno Sample1 tiene las dos celdas siguientes:
La celda 1 define un parámetro de entrada con el valor predeterminado establecido en 10.
La celda 2 sale del cuaderno de notas con entrada como valor de salida.
Puede ejecutar Sample1 en otro cuaderno con los valores predeterminados:
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
Salida:
10
Puede ejecutar Sample1 en otro cuaderno y establecer el valor de input en 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Salida:
20