用于 Fabric 的 NotebookUtils 用户数据功能(UDF)实用工具

notebookutils.udf 模块提供用于将笔记本代码与用户数据函数(UDF)项集成的实用工具。 可以从同一工作区或不同工作区中的 UDF 项访问函数,然后根据需要调用这些函数。 UDF 项可提升代码可重用性、集中维护和团队协作。

使用 UDF 实用工具可以:

  • 函数检索 – 按名称从 UDF 项访问函数。
  • 跨工作区访问 - 使用来自其他工作区中的 UDF 项的函数。
  • 函数发现 – 检查可用的函数及其签名。
  • 灵活的调用 – 调用函数时, 使用符合语言的参数。

注释

需要对目标工作区中的 UDF 项进行读取访问权限才能检索其函数。 UDF 函数中的异常传播到调用笔记本。

下表列出了可用的 UDF 方法:

方法 Signature 说明
getFunctions getFunctions(udf: String, workspaceId: String = ""): UDF 按项目 ID 或名称从 UDF 项检索所有函数。 返回具有可调用函数属性的对象。

返回的对象公开以下属性:

财产 类型 说明
functionDetails 列表 函数元数据字典的列表。 每个字典包括: Name (函数名称)、 Description (函数说明)、 Parameters (参数定义列表)、 FunctionReturnType (返回类型)和 DataSourceConnections (使用的数据源连接)。
itemDetails 字典 UDF 项元数据的字典,其中包含键:Id (工件 ID)、Name (项名称)、WorkspaceId (工作区 ID)和 CapacityId (容量 ID)。
<functionName> Callable UDF 项中的每个函数都将成为返回的对象上的可调用方法。 用 myFunctions.functionName(...) 调用。

小窍门

检索一次 UDF 函数并缓存包装对象。 避免在循环中重复调用 getFunctions() - 缓存结果以最大程度地减少开销。

从 UDF 检索函数

使用 notebookutils.udf.getFunctions() 从 UDF 项目中获取所有函数。 可以为跨工作区访问选择性地指定工作区 ID。

# Get functions from a UDF item in the current workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName')

# Get functions from a UDF item in another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')

调用函数

从 UDF 项检索函数后,按名称调用它们。 Python 支持位置参数和命名参数。 Scala 和 R 示例使用位置参数。

# Positional parameters
myFunctions.functionName('value1', 'value2')

# Named parameters (recommended for clarity)
myFunctions.functionName(parameter1='value1', parameter2='value2')

默认参数值

Fabric用户数据函数支持默认参数值。 调用通过 notebookutils.udf.getFunctions检索的函数时,可以省略具有已定义默认值的任何参数 — 运行时会自动使用默认值。 还可以提供命名参数来替代特定默认值,同时保持其他默认值不变。

# Assume the UDF item defines a function like:
# def score_customer(customerId: str, startDate: datetime = "2025-01-01T00:00:00Z", isActive: bool = True, maxRecords: int = 100) -> dict
# The datetime defaults are specified as strings in the signature; the runtime parses them to datetime at invocation time.

# 1. Call without optional parameters — defaults are used for startDate, isActive, and maxRecords
result = myFunctions.scoreCustomer(customerId='C001')

# 2. Override one default via a named argument, keep the others at their defaults
result = myFunctions.scoreCustomer(customerId='C001', maxRecords=50)

# 3. Pass a date/time in ISO 8601 format for reliable parsing
result = myFunctions.scoreCustomer(customerId='C001', startDate='2025-12-31T23:59:59Z')

支持的默认输入类型

支持以下类型作为默认参数值:

默认类型 Notes
String 任何 JSON 可序列化字符串。
日期时间字符串 在函数签名中指定为字符串。 运行时在调用时将其分析为 datetime 。 使用一致的格式,例如 ISO 8601(例如 2025-12-31T23:59:59Z)。
布尔 TrueFalse
Integer 任何整数值。
Float 任何浮点值。
列表 必须是 JSON 可序列化的;首选 None 在签名中,并在函数内部分配以避免可变的默认陷阱。
字典 必须是 JSON 可序列化的;首选 None 在签名中,并在函数内部分配。
pandas 数据框架 (DataFrame) 作为 SDK 转换为 pandas 类型的 JSON 对象提供。 fabric-user-data-functions需要版本 1.0.0 或更高版本。
pandas 系列 作为 JSON 对象数组提供,SDK 会将其转换为 pandas 类型。 fabric-user-data-functions需要版本 1.0.0 或更高版本。

限制和指南

默认值必须为 JSON 可序列化(不支持集和元组)。 对于列表或字典默认值,在 None 签名中使用,并在函数中分配实际默认值,以避免共享可变默认值。 对于日期/时间默认值,请使用 ISO 8601 格式(例如,2025-12-31T23:59:59Z)。 将 pandas DataFrame 或 Series 用作默认版本需要 fabric-user-data-functions 1.0.0 或更高版本。

显示详细信息

可以编程方式检查 UDF 项元数据和函数签名。

显示 UDF 项目详细信息

display(myFunctions.itemDetails)

显示函数详细信息

display(myFunctions.functionDetails)

小窍门

使用新的 UDF 项时,请始终检查 functionDetails 。 这有助于在调用之前验证可用函数及其预期参数类型。

错误处理

将 UDF 调用包装在相应语言的异常处理中,以优雅地管理缺少函数或意外的参数类型。 在调用该函数之前,请始终验证 UDF 项中是否存在函数。

import json

try:
    validators = notebookutils.udf.getFunctions('DataValidators')

    # Check if function exists before calling
    functions_info = json.loads(validators.functionDetails)
    function_names = [f['Name'] for f in functions_info]

    if 'validateSchema' in function_names:
        is_valid = validators.validateSchema(
            schema='sales_schema',
            data_path='Files/data/sales.csv'
        )
        print(f"Schema validation: {'passed' if is_valid else 'failed'}")
    else:
        print("validateSchema function not available in this UDF item")
        print(f"Available functions: {', '.join(function_names)}")

except AttributeError as e:
    print(f"Function not found: {e}")
except TypeError as e:
    print(f"Parameter type mismatch: {e}")
except Exception as e:
    print(f"Error invoking UDF: {e}")

在数据管道中使用 UDF 函数

可以编写 UDF 函数以生成可重用的 ETL 步骤:

etl_functions = notebookutils.udf.getFunctions('ETLUtilities')

df = spark.read.csv('Files/raw/sales.csv', header=True)
cleaned_df = etl_functions.removeOutliers(df, columns=['amount'])
enriched_df = etl_functions.addCalculatedColumns(cleaned_df)
validated_df = etl_functions.validateAndFilter(enriched_df)

validated_df.write.mode('overwrite').parquet('Files/processed/sales.parquet')
print("ETL pipeline completed using UDF functions")

重要

UDF 调用具有开销。 如果重复使用相同的参数调用同一函数,请考虑缓存结果。 尽可能避免在紧密循环中调用 UDF 函数。