Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:
Azure Data Factory
Azure Synapse Analytics
Tipp
Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.
In dieser Schnellstartanleitung erstellen Sie eine Datenfactory mithilfe von Python. Die Pipeline in dieser Datenfabrik kopiert Daten aus einem Ordner in einen anderen Ordner im Azure Blob Storage.
Azure Data Factory ist ein cloudbasierter Datenintegrationsdienst, mit dem Sie datengesteuerte Workflows zum Koordinieren und Automatisieren von Datenbewegungen und Datentransformationen erstellen können. Mit Azure Data Factory können Sie datengesteuerte Workflows erstellen und planen, die als Pipelines bezeichnet werden.
Pipelines können Daten aus unterschiedlichen Datenspeichern erfassen. Pipelines verarbeiten oder transformieren Daten mithilfe von Computediensten wie Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics und Azure Machine Learning. Pipelines veröffentlichen Ausgabedaten in Datenspeichern wie Azure Synapse Analytics für Business Intelligence(BI)-Anwendungen.
Voraussetzungen
Ein Azure Konto mit einem aktiven Abonnement. Erstellen Sie ein kostenloses Konto.
Azure Storage-Explorer (optional).
Eine Anwendung in Microsoft Entra ID. Erstellen Sie die Anwendung, indem Sie die Schritte unter diesem Link ausführen. Verwenden Sie dabei Authentifizierungsoption 2 (Anwendungsgeheimnis), und weisen Sie die Anwendung der Rolle Mitwirkender zu, indem Sie die Anweisungen im gleichen Artikel befolgen. Notieren Sie sich die folgenden Werte aus dem Artikel zur späteren Verwendung: Anwendungs-ID (Client), Wert des geheimen Clientschlüssels und Mandanten-ID.
Erstellen und Hochladen einer Eingabedatei
Starten Sie den Editor. Kopieren Sie den folgenden Text, und speichern Sie ihn als input.txt-Datei auf Ihrem Datenträger.
John|Doe Jane|DoeVerwenden Sie Tools wie Azure Storage-Explorer, um den Ordner adfv2tutorial container und input im Container zu erstellen. Laden Sie anschließend die Datei input.txt in den Ordner input hoch.
Installieren des Python-Pakets
Öffnen Sie ein Terminal oder eine Eingabeaufforderung mit Administratorberechtigungen.
Installieren Sie zunächst das Python-Paket für Azure-Verwaltungsressourcen:
pip install azure-mgmt-resourceFühren Sie den folgenden Befehl aus, um das Python-Paket für Data Factory zu installieren:
pip install azure-mgmt-datafactoryDas Python SDK für Data Factory unterstützt Python 2.7 und 3.6+.
Führen Sie den folgenden Befehl aus, um das Python Paket für Azure Identitätsauthentifizierung zu installieren:
pip install azure-identityHinweis
Das Paket „azure-identity“ steht bei einigen gemeinsamen Abhängigkeiten unter Umständen in Konflikt mit „azure-cli“. Wenn ein Authentifizierungsproblem auftritt, entfernen Sie „azure-cli“ und die zugehörigen Abhängigkeiten, oder verwenden Sie einen neu installierten Computer ohne Installation des Pakets „azure-cli“, damit der Vorgang erfolgreich ist. Für Sovereign Clouds müssen Sie die entsprechenden cloudspezifischen Konstanten verwenden. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit allen Regionen unter Verwendung der Azure-Bibliotheken für Python (mehrere Clouds) | Microsoft-Dokumentation mit Anweisungen zum Herstellen einer Verbindung mit Python in Sovereign Clouds.
Erstellen eines Data Factory-Clients
Erstellen Sie eine Datei mit dem Namen datafactory.py. Fügen Sie die folgenden Anweisungen ein, um Verweise auf Namespaces hinzuzufügen.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeFügen Sie die folgenden Funktionen hinzu, die Informationen ausgeben.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))Fügen Sie der Main-Methode den folgenden Code hinzu, der eine Instanz der DataFactoryManagementClient-Klasse erstellt. Sie verwenden dieses Objekt, um die Data Factory, einen verknüpften Dienst, Datasets und eine Pipeline zu erstellen. Sie verwenden dieses Objekt ebenfalls zum Überwachen der Ausführungsdetails der Pipeline. Legen Sie subscription_id Variable auf die ID Ihres Azure-Abonnements fest. Wählen Sie für eine Liste Azure Regionen, in denen Data Factory zurzeit verfügbar ist, die Regionen aus, die Sie auf der folgenden Seite interessieren, und erweitern Sie dann Analytics, um Data Factory: Products available by region zu finden. Die Datenspeicher (Azure Storage, Azure SQL-Datenbank, etc.) und Berechnungen (HDInsight, usw.), die von der Data Factory verwendet werden, können sich in anderen Regionen befinden.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Erstellen einer Data Factory
Fügen Sie der Main-Methode den folgenden Code hinzu, der eine Data Factory erstellt. Wenn die Ressourcengruppe bereits vorhanden ist, kommentieren Sie die erste create_or_update-Anweisung aus.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Erstellen eines verknüpften Diensts
Fügen Sie der methode Main den folgenden Code hinzu, der einen Azure Storage verknüpften Dienst erstellt.
Um Ihre Datenspeicher und Compute Services mit der Data Factory zu verknüpfen, können Sie verknüpfte Dienste in einer Data Factory erstellen. In diesem Schnellstart müssen Sie nur einen verknüpften Azure Storage-Dienst als Kopierquelle und Senkenspeicher erstellen (in diesem Beispiel AzureStorageLinkedService). Ersetzen Sie <storageaccountname> und <storageaccountkey> durch Namen und Schlüssel Ihres Azure Storage Kontos.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Erstellen von Datasets
In diesem Abschnitt erstellen Sie zwei Datasets: eines für die Quelle und das andere für die Senke.
Erstellen eines Datasets für das Azure-Quellblob
Fügen Sie der Main-Methode den folgenden Code hinzu, der ein Azure Blob-Dataset erstellt. Informationen zu eigenschaften des Azure Blob-Datasets finden Sie im Artikel Azure blob connector.
Sie definieren ein Dataset, das die Quelldaten in Azure Blob darstellt. Dieses Blob-Dataset bezieht sich auf den Azure Storage verknüpften Dienst, den Sie im vorherigen Schritt erstellen.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Erstellen eines Datasets für eine Azure Blobsenke
Fügen Sie der Main-Methode den folgenden Code hinzu, der ein Azure Blob-Dataset erstellt. Informationen zu eigenschaften des Azure Blob-Datasets finden Sie im Artikel Azure blob connector.
Sie definieren ein Dataset, das die Quelldaten in Azure Blob darstellt. Dieses Blob-Dataset bezieht sich auf den Azure Storage verknüpften Dienst, den Sie im vorherigen Schritt erstellen.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Erstellen einer Pipeline
Fügen Sie der Main-Methode den folgenden Code hinzu, der eine Pipeline mit einer Kopieraktivität erstellt.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Erstellen einer Pipelineausführung
Fügen Sie der Main-Methode den folgenden Code hinzu, der eine Pipelineausführung auslöst.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Überwachen eines Pipeline-Laufs
Um die Pipelineausführung zu überwachen, fügen Sie der Main-Methode den folgenden Code hinzu:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Fügen Sie nun die folgende Anweisung hinzu, um die main-Methode bei Ausführung des Programms aufzurufen:
# Start the main method
main()
Vollständiges Skript
Hier ist der vollständige Python Code:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
Ausführen des Codes
Erstellen und starten Sie die Anwendung, und überprüfen Sie dann die Pipeline-Ausführung.
Die Konsole druckt den Status der Erstellung der Data Factory, des verknüpften Diensts, der Datasets, der Pipeline und der Pipelineausführung aus. Warten Sie, bis Sie die Ausführungsdetails der Kopieraktivität mit der Größe der gelesenen/geschriebenen Daten sehen. Verwenden Sie dann Tools wie Azure Storage Explorer um zu überprüfen, ob die Blobs in "outputBlobPath" aus "inputBlobPath" kopiert werden, wie Sie in Variablen angegeben haben.
Hier ist die Beispielausgabe:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Bereinigen von Ressourcen
Um die Data Factory zu löschen, fügen Sie den folgenden Code zum Programm hinzu:
adf_client.factories.delete(rg_name, df_name)
Zugehöriger Inhalt
Die Pipeline in diesem Beispiel kopiert Daten von einem Speicherort an einen anderen Speicherort in einem Azure BLOB-Speicher. Arbeiten Sie die Tutorials durch, um zu erfahren, wie Sie Data Factory in anderen Szenarien verwenden können.