Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
GÄLLER FÖR:
Azure Data Factory
Azure Synapse Analytics
Tips
Data Factory i Microsoft Fabric är nästa generations Azure Data Factory, med en enklare arkitektur, inbyggd AI och nya funktioner. Om dataintegrering är nytt för dig börjar du med Fabric Data Factory. Befintliga ADF-arbetsbelastningar kan uppgraderas till Fabric för att få åtkomst till nya funktioner inom datavetenskap, realtidsanalys och rapportering.
I den här snabbstarten skapar du en datafabrik med hjälp av Python. Pipelinen i den här datafabriken kopierar data från en mapp till en annan mapp i Azure Blob Storage.
Azure Data Factory är en molnbaserad dataintegreringstjänst som gör att du kan skapa datadrivna arbetsflöden för orkestrering och automatisering av dataflytt och datatransformering. Med hjälp av Azure Data Factory kan du skapa och schemalägga datadrivna arbetsflöden, så kallade pipelines.
Pipelines kan mata in data från olika datalager. Pipelines bearbetar eller transformerar data med hjälp av beräkningstjänster som Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics och Azure Machine Learning. Pipelines publicerar utdata till datalager, till exempel Azure Synapse Analytics för BI-program (Business Intelligence).
Förutsättningar
Ett Azure konto med en aktiv prenumeration. Skapa en kostnadsfritt.
Ett Azure Storage-konto.
Azure Storage Explorer (valfritt).
Ett program i Microsoft Entra ID. Skapa programmet genom att följa stegen i den här länken med autentiseringsalternativ 2 (programhemlighet) och tilldela programmet till rollen Deltagare genom att följa anvisningarna i samma artikel. Anteckna följande värden enligt beskrivningen i artikeln som ska användas i senare steg: Program-ID (klient)-ID, klienthemlighetsvärde och klient-ID.
Skapa och ladda upp en indatafil
Starta Anteckningsblock. Kopiera följande text och spara den som input.txt på disken.
John|Doe Jane|DoeAnvänd verktyg som Azure Storage Explorer för att skapa containern adfv2tutorial och input i containern. Ladda sedan upp filen input.txt till mappen input.
Installera Python-paketet
Öppna en terminal eller kommandotolk med administratörsbehörighet.
Installera först Python-paketet för Azure hanteringsresurser:
pip install azure-mgmt-resourceKör följande kommando för att installera Python-paketet för Data Factory:
pip install azure-mgmt-datafactoryPython SDK för Data Factory stöder Python 2.7 och 3.6+.
Kör följande kommando för att installera Python-paketet för Azure identitetsautentisering:
pip install azure-identityAnteckning
Paketet "azure-identity" kan ha konflikter med "azure-cli" på några gemensamma beroenden. Om du stöter på något autentiseringsproblem tar du bort "azure-cli" och dess beroenden eller använder en ren dator utan att installera paketet "azure-cli" för att få det att fungera. För nationella moln måste du använda lämpliga molnspecifika konstanter. Se Anslut till alla regioner med hjälp av Azure bibliotek för Python Flera moln | Microsoft Docs instruktioner för att ansluta till Python i nationella moln.
Skapa en datafabriksklient
Skapa en fil med namnet datafactory.py. Lägg till följande instruktioner för att lägga till referenser till namnområden.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import timeLägg till följande funktioner som skriver ut information.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))Lägg till följande kod till Main-metoden som skapar en instans av klassen DataFactoryManagementClient. Du använder det här objektet till att skapa datafabrik, länktjänst, datauppsättningar och pipeline. Du använder också det här objektet för att övervaka detaljer om pipelinekörning. Ange variabeln subscription_id till ID för din Azure-prenumeration. Om du vill ha en lista över Azure regioner där Data Factory är tillgängligt för närvarande väljer du de regioner som intresserar dig på följande sida och expanderar sedan Analytics för att hitta Data Factory: Produkter tillgängliga per region. Datalager (Azure Storage, Azure SQL Database osv.) och beräkningar (HDInsight osv.) som används av datafabriken kan finnas i andra regioner.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Skapa en datafabrik
Lägg till följande kod som skapar en datafabrik till Main-metoden. Om din resursgrupp redan finns, kommentera ut den första create_or_update-instruktionen.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Skapa en länkad tjänst
Lägg till följande kod i metoden Main som skapar en Azure Storage länkad tjänst.
Du skapar länkade tjänster i en datafabrik för att länka dina databutiker och datorresurser till datafabriken. I den här snabbstarten behöver du bara skapa en Azure Storage länkad tjänst som både kopieringskälla och mottagararkiv med namnet "AzureStorageLinkedService" i exemplet. Ersätt <storageaccountname> och <storageaccountkey> med namnet och nyckeln för ditt Azure Storage konto.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Skapa datauppsättningar
I det här avsnittet skapar du två datauppsättningar: en för källan och en för mottagaren.
Skapa en datauppsättning för käll-Azure Blob
Lägg till följande kod i main-metoden som skapar en Azure blobdatauppsättning. Information om egenskaper för Azure Blob-datauppsättning finns i artikeln Azure blob connector.
Du definierar en datauppsättning som representerar källdata i Azure Blob. Den här blobdatauppsättningen refererar till den Azure Storage länkade tjänst som du skapade i föregående steg.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Skapa en datauppsättning för mottagare Azure Blob
Lägg till följande kod i main-metoden som skapar en Azure blobdatauppsättning. Information om egenskaper för Azure Blob-datauppsättning finns i artikeln Azure blob connector.
Du definierar en datauppsättning som representerar källdata i Azure Blob. Den här blobdatauppsättningen refererar till den Azure Storage länkade tjänst som du skapade i föregående steg.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Skapa en pipeline
Lägg till följande kod till Main-metoden som skapar en pipeline med en kopieringsaktivitet.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Skapa en pipelinekörning
Lägg till följande kod i Main-metoden som utlöser en pipelinekörning.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Övervaka en pipelinekörning
Om du vill övervaka pipelinekörningen lägger du till följande kod för Main-metoden:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Lägg nu till följande instruktion för att anropa Main-metoden när programmet körs:
# Start the main method
main()
Fullständigt skript
Här är den fullständiga Python koden:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
Kör koden
Skapa och starta programmet, och kontrollera därefter pipelinekörningen.
Konsolen skriver ut förloppet för skapandet av en datafabrik, en länkad tjänst, datauppsättningar, pipeline och pipelinekörning. Vänta tills du ser detaljer om körningen av kopieringsaktiviteten med storleken på de lästa och skrivna data. Använd sedan verktyg som Azure Storage explorer för att kontrollera att blobarna kopieras till "outputBlobPath" från "inputBlobPath" som du angav i variabler.
Här är exempel på utdata:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Rensa resurser
För att ta bort datafabriken lägger du till följande kod till programmet:
adf_client.factories.delete(rg_name, df_name)
Relaterat innehåll
Pipeline i det här exemplet kopierar data från en plats till en annan i en Azure Blob-lagring. Gå igenom självstudiekurserna om du vill lära dig hur du använder Data Factory i fler scenarier.