本教程演示如何导入第 3 部分中生成的已注册 LightGBMClassifier 模型。 本教程使用 Microsoft Fabric MLflow 模型注册表来训练模型,然后对从 Lakehouse 加载的测试数据集执行批处理预测。
使用 Microsoft Fabric,可以将机器学习模型运用于可缩放的 PREDICT 函数。 该函数支持任何计算引擎中的批处理评分。 可以直接从 Microsoft Fabric 笔记本或给定模型的项页生成批处理预测。 有关 PREDICT 函数的详细信息,请访问 此 资源。
若要对测试数据集生成批量预测,请使用定型 LightGBM 模型的版本 1。 该版本显示了所有已训练的机器学习模型中的最佳性能。 将测试数据集加载到 spark DataFrame 中,并创建 MLFlowTransformer 对象以生成批处理预测。 然后,可以使用以下方法之一调用 PREDICT 函数:
- SynapseML 中的转换器 API
- Spark SQL API
- PySpark 用户定义的函数 (UDF)
先决条件
获取 Microsoft Fabric 订阅。 或者,注册免费的 Microsoft Fabric 试用版。
登录 Microsoft Fabric。
使用主页左下侧的体验切换器切换到 Fabric。
这是五个部分教程系列的第 4 部分。 若要完成本教程,请先完成:
- 第 1 部分:使用 Apache Spark将数据引入到 Microsoft Fabric Lakehouse 中。
- 第 2 部分:通过使用 Microsoft Fabric 笔记本浏览和可视化数据, 以便更好地了解数据。
- 第 3 部分:训练和注册机器学习模型。
在笔记本中继续操作
4-predict.ipynb 是本教程随附的笔记本。
若要打开本教程随附的笔记本,请按照 为数据科学教程准备系统 中的说明将笔记本导入工作区。
如果要复制并粘贴此页面中的代码,可以 创建新的笔记本。
在开始运行代码之前,请务必将湖屋连接到笔记本。
重要
随附在此系列的其他部分中使用的同一个湖屋。
加载测试数据
在以下代码片段中,加载在第 3 部分保存的测试数据:
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
使用 Transformer API 进行预测
若要从 SynapseML 使用转换器 API,必须先创建 MLFlowTransformer 对象。
实例化 MLFlowTransformer 对象
MLFlowTransformer 对象作为在第 3 部分中注册的 MLFlow 模型的封装器。 它允许针对给定数据帧生成批处理预测。 若要实例化 MLFlowTransformer 对象,必须提供以下参数:
- 模型需要作为输入的测试 DataFrame 列(在本例中,模型需要所有这些列)
- 新输出列的名称(在本例中 为预测)
- 生成预测的正确模型名称和模型版本(在本例中,模型名称为
lgbm_sm,版本为1)
以下代码片段处理以下步骤:
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
有了 MLFlowTransformer 对象后,可以使用它生成批处理预测,如以下代码片段所示:
import pandas
predictions = model.transform(df_test)
display(predictions)
使用 Spark SQL API 预测
以下代码片段使用 Spark SQL API 调用 PREDICT 函数:
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
使用用户定义函数 (UDF) 的 PREDICT
以下代码片段使用 PySpark UDF 调用 PREDICT 函数:
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
还可以从模型的项页生成 PREDICT 代码。 有关 PREDICT 函数的详细信息,请访问 此 资源。
将模型预测结果写入湖屋
生成批量预测后,将模型预测结果写回到数据湖屋,如以下代码片段所示:
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
下一步
继续转到: