教程第 4 部分:执行批量评分并将预测保存到湖屋

本教程演示如何导入第 3 部分中生成的已注册 LightGBMClassifier 模型。 本教程使用 Microsoft Fabric MLflow 模型注册表来训练模型,然后对从 Lakehouse 加载的测试数据集执行批处理预测。

使用 Microsoft Fabric,可以将机器学习模型运用于可缩放的 PREDICT 函数。 该函数支持任何计算引擎中的批处理评分。 可以直接从 Microsoft Fabric 笔记本或给定模型的项页生成批处理预测。 有关 PREDICT 函数的详细信息,请访问 资源。

若要对测试数据集生成批量预测,请使用定型 LightGBM 模型的版本 1。 该版本显示了所有已训练的机器学习模型中的最佳性能。 将测试数据集加载到 spark DataFrame 中,并创建 MLFlowTransformer 对象以生成批处理预测。 然后,可以使用以下方法之一调用 PREDICT 函数:

  • SynapseML 中的转换器 API
  • Spark SQL API
  • PySpark 用户定义的函数 (UDF)

先决条件

这是五个部分教程系列的第 4 部分。 若要完成本教程,请先完成:

在笔记本中继续操作

4-predict.ipynb 是本教程随附的笔记本。

重要

随附在此系列的其他部分中使用的同一个湖屋。

加载测试数据

在以下代码片段中,加载在第 3 部分保存的测试数据:

df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)

使用 Transformer API 进行预测

若要从 SynapseML 使用转换器 API,必须先创建 MLFlowTransformer 对象。

实例化 MLFlowTransformer 对象

MLFlowTransformer 对象作为在第 3 部分中注册的 MLFlow 模型的封装器。 它允许针对给定数据帧生成批处理预测。 若要实例化 MLFlowTransformer 对象,必须提供以下参数:

  • 模型需要作为输入的测试 DataFrame 列(在本例中,模型需要所有这些列)
  • 新输出列的名称(在本例中 为预测
  • 生成预测的正确模型名称和模型版本(在本例中,模型名称为lgbm_sm,版本为1)

以下代码片段处理以下步骤:

from synapse.ml.predict import MLFlowTransformer

model = MLFlowTransformer(
    inputCols=list(df_test.columns),
    outputCol='predictions',
    modelName='lgbm_sm',
    modelVersion=1
)

有了 MLFlowTransformer 对象后,可以使用它生成批处理预测,如以下代码片段所示:

import pandas

predictions = model.transform(df_test)
display(predictions)

使用 Spark SQL API 预测

以下代码片段使用 Spark SQL API 调用 PREDICT 函数:

from pyspark.ml.feature import SQLTransformer 

# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

使用用户定义函数 (UDF) 的 PREDICT

以下代码片段使用 PySpark UDF 调用 PREDICT 函数:

from pyspark.sql.functions import col, pandas_udf, udf, lit

# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns

display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))

还可以从模型的项页生成 PREDICT 代码。 有关 PREDICT 函数的详细信息,请访问 资源。

将模型预测结果写入湖屋

生成批量预测后,将模型预测结果写回到数据湖屋,如以下代码片段所示:

# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")

下一步

继续转到: