Freigeben über


Umwandeln von Daten mithilfe von dbt

Hinweis

Ein Apache Airflow-Auftrag wird von Apache Airflow unterstützt.

Derzeit werden private Netzwerke und virtuelle Netzwerke mit Fabric Apache Airflow-Aufträgen nicht unterstützt.

dbt (Data Build Tool) ist eine Open-Source-Befehlszeilenschnittstelle (CLI), mit der Sie Daten in Data Warehouses transformieren und modellieren können. Es verwaltet komplexen SQL-Code auf strukturierte, verwendbare Weise, sodass Datenteams zuverlässige, testbare Transformationen für ihre analytischen Pipelines erstellen können.

Wenn Sie dbt mit Apache Airflow kombinieren, erhalten Sie das Beste aus beiden Welten. dbt behandelt die Transformationen, während Airflow die Planung, Orchestrierung und Aufgabenverwaltung verwaltet. Dieser Ansatz schafft effiziente und robuste Pipelines, die zu schnelleren und aussagekräftigeren datengesteuerten Entscheidungen führen.

In diesem Lernprogramm erfahren Sie, wie Sie einen Apache Airflow DAG erstellen, der dbt zum Transformieren von Daten verwendet, die in Microsoft Fabric Data Warehouse gespeichert sind.

Voraussetzungen

Bevor Sie beginnen, erfüllen Sie diese Voraussetzungen:

Transformieren von Daten in Fabric Warehouse mithilfe von dbt

Führen Sie die folgenden Schritte aus, um die dbt-Transformation einzurichten:

  1. Angeben der Anforderungen
  2. Erstellen eines dbt-Projekts im vom Apache Airflow-Auftrag bereitgestellten fabric-verwalteten Speicher
  3. Erstellen eines Apache Airflow DAG für die Orchestrierung

Angeben der Anforderungen

  1. Erstellen Sie eine Datei namens requirements.txt im Ordner dags.

  2. Fügen Sie die folgenden Pakete als Apache Airflow-Anforderungen hinzu:

    astronomer-cosmos==1.10.1
    dbt-fabric==1.9.5   
    

Erstellen eines dbt-Projekts im vom Apache Airflow-Auftrag bereitgestellten fabric-verwalteten Speicher

  1. Erstellen Sie ein Dbt-Beispielprojekt im Apache Airflow Job für das Dataset nyc_taxi_green mit dieser Verzeichnisstruktur:

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Erstellen Sie einen Ordner namens nyc_taxi_green im dags Ordner mit einer profiles.yml Datei. Dieser Ordner enthält alle Dateien, die Sie für Ihr dbt-Projekt benötigen. Screenshot des Erstellens von Dateien für das dbt-Projekt.

  3. Kopieren Sie den folgenden Inhalt in die profiles.yml Datei. Diese Konfigurationsdatei enthält Datenbankverbindungsdetails und Profile, die dbt verwendet. Aktualisieren Sie die Platzhalterwerte und speichern Sie die Datei.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. Erstellen Sie die Datei dbt_project.yml und kopieren Sie den folgenden Inhalt. Diese Datei gibt die Konfiguration auf Projektebene an.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. Erstellen Sie den Ordner models im Ordner nyc_taxi_green. Erstellen Sie für dieses Lernprogramm ein Beispielmodell in einer Datei mit dem Namen nyc_trip_count.sql , in der eine Tabelle mit der Anzahl der Reisen pro Tag pro Anbieter erstellt wird. Kopieren Sie den folgenden Inhalt in die Datei.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    Screenshot der Modelle für das dbt-Projekt.

Erstellen eines Apache Airflow DAG für die Orchestrierung

  1. Erstellen Sie eine Datei mit dem Namen my_cosmos_dag.py im Ordner dags und fügen Sie den folgenden Inhalt in sie ein:

     import os
     from pathlib import Path
     from datetime import datetime
     from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
     from airflow import DAG
    
     DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
     DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
     profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
     )
    
     dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
     )
    

Führen Sie einen gerichteten azyklischen Graphen (DAG) aus

  1. Führen Sie den DAG im Apache Airflow-Job aus. Screenshot der Ausführung des DAG.

  2. Wählen Sie "Monitor" in Apache Airflow aus, um Ihren geladenen DAG in der Apache Airflow-Benutzeroberfläche anzuzeigen. Screenshot der Überwachung eines dbt-DAG. Screenshot einer erfolgreichen DAG-Ausführung.

  3. Überprüfen Sie nach einer erfolgreichen Ausführung Ihre Daten, indem Sie die neue Tabelle mit dem Namen "nyc_trip_count.sql" in Ihrem Fabric-Data Warehouse überprüfen. Screenshot eines erfolgreichen dbt-DAGs.

Schnellstart: Erstellen eines Apache Airflow-Auftrags