你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用工作流编排管理器运行现有管道

适用于:Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

数据工厂管道提供 100 多个数据源连接器,这些连接器提供可缩放且可靠的数据集成/数据流。 在某些情况下,你想要从 Apache Airflow DAG 运行现有的数据工厂管道。 本教程将向您展示如何操作。

重要

工作流业务流程管理器(由 Apache Airflow 提供支持)将于 2025 年 12 月 31 日在 Azure 数据工厂中永久停用。 此功能现已在 Microsoft Fabric 中提供。 了解详细信息

建议将所有工作流业务流程管理器(Azure 数据工厂中的 Apache Airflow)工作负载迁移到数据工作流(Microsoft Fabric 中的 Apache Airflow),以便从 2025 年 12 月 31 日之前的扩展功能中受益。

有关迁移到 Microsoft Fabric 中的 Apache Airflow 期间的详细信息或支持,请联系Microsoft支持部门。

先决条件

  • Azure 订阅。 如果还没有 Azure 订阅,可以在开始前创建一个免费 Azure 帐户
  • Azure 存储帐户。 如果没有存储帐户,请参阅创建 Azure 存储帐户以获取创建步骤。 确保存储帐户仅允许来自选定的网络的访问。
  • Azure 数据工厂管道。 可以按照任何这些教程新建一个数据工厂管道(如果还没有),或者在开始试用第一个数据工厂管道中一键选择创建一个管道。
  • 设置服务主体。 需要创建一个新的服务主体或使用现有服务主体,并授予其运行管道的权限(例如,现有管道所在的数据工厂中的参与者角色),即使工作流编排管理器环境和管道存在于同一数据工厂中也是如此。 需要获取服务主体的客户端 ID 和客户端密码(API 密钥)。

步骤

  1. 使用以下内容新建一个 Python 文件 adf.py:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    必须使用工作流编排管理器 UI“管理员”->“连接”->“+”-> 选择“连接类型”作为“Azure 数据工厂”创建连接,然后填写 client_id、client_secret、tenant_id、subscription_id、resource_group_name、data_factory_name 和 pipeline_name

  2. 将 adf.py 文件上传到名为 DAGS 的文件夹中的 blob 存储。

  3. 将 DAGS 文件夹导入工作流编排管理器环境。 如果没有,请新建一个

    显示“数据工厂管理”选项卡的屏幕截图,其中选择了“Airflow”部分。