Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Gilt für:
Azure Data Factory
Azure Synapse Analytics
Tipp
Data Factory in Microsoft Fabric ist die nächste Generation von Azure Data Factory mit einer einfacheren Architektur, integrierter KI und neuen Features. Wenn Sie mit der Datenintegration noch nicht vertraut sind, beginnen Sie mit Fabric Data Factory. Vorhandene ADF-Workloads können auf Fabric aktualisiert werden, um auf neue Funktionen in der Datenwissenschaft, Echtzeitanalysen und Berichterstellung zuzugreifen.
In diesem Lernprogramm verwenden Sie Azure PowerShell, um eine Data Factory-Pipeline zu erstellen, die Daten mithilfe von Spark Activity und einem on-demand HDInsight-verknüpften Dienst transformiert. In diesem Tutorial führen Sie die folgenden Schritte aus:
- Erstellen einer Data Factory.
- Erstellen und Bereitstellen von verknüpften Diensten
- Erstellen und Bereitstellen einer Pipeline.
- Starten einer Pipelineausführung
- Überwachen der Pipelineausführung.
Wenn Sie nicht über ein Azure-Abonnement verfügen, erstellen Sie ein free Konto, bevor Sie beginnen.
Voraussetzungen
Hinweis
Es wird empfohlen, das Azure Az PowerShell-Modul für die Interaktion mit Azure zu verwenden. Informationen zu den ersten Schritten finden Sie unter Install Azure PowerShell. Informationen zum Migrieren zum Az PowerShell-Modul finden Sie unter Migrate Azure PowerShell von AzureRM zu Az.
- Azure Storage-Konto. Sie erstellen ein Python Skript und eine Eingabedatei, und laden sie in den Azure Speicher hoch. Die Ausgabe des Spark-Programms wird in diesem Speicherkonto gespeichert. Der bedarfsgesteuerte Spark-Cluster verwendet dasselbe Storage-Konto wie seinen primären Speicher.
-
Azure PowerShell. Befolgen Sie die Anweisungen in
Wie sie Azure PowerShell .
Hochladen Python Skripts in Ihr Blob Storage Konto
Erstellen Sie eine Python Datei mit dem Namen WordCount_Spark.py mit folgendem Inhalt:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()Ersetzen Sie <storageAccountName> durch den Namen Ihres Azure Storage Kontos. Speichern Sie dann die Datei.
Erstellen Sie in Ihrem Azure Blob Storage einen Container mit dem Namen adftutorial wenn er nicht vorhanden ist.
Erstellen Sie einen Ordner mit dem Namen spark.
Erstellen Sie unterhalb des Ordners spark einen Unterordner mit dem Namen script.
Laden Sie die Datei WordCount_Spark.py in den Unterordner script hoch.
Hochladen der Eingabedatei
- Erstellen Sie eine Datei mit dem Namen minecraftstory.txt und etwas Text darin. Das Spark-Programm zählt die Wörter in diesem Text.
- Erstellen Sie im Ordner
inputfileseinen Unterordner mit dem Namenspark. - Laden Sie
minecraftstory.txtin den Unterordnerinputfileshoch.
Verknüpfte Dienste autorisieren
In diesem Abschnitt erstellen Sie zwei verknüpfte Dienste:
- Ein verknüpfter Azure Storage-Dienst, der ein Azure Storage-Konto mit der Datenfabrik verbindet. Dieser Speicher wird vom bedarfsgesteuerten HDInsight-Cluster verwendet. Er enthält auch das Spark-Skript, das ausgeführt werden soll.
- Ein bedarfsorientierter verknüpfter HDInsight-Dienst. Azure Data Factory erstellt automatisch einen HDInsight-Cluster, führt das Spark-Programm aus und löscht den HDInsight-Cluster, wenn er eine vorkonfigurierte Zeit im Leerlauf ist.
Mit Azure Storage verknüpfter Dienst
Erstellen Sie eine JSON-Datei mit Ihrem bevorzugten Editor, kopieren Sie die folgende JSON-Definition eines Azure Storage verknüpften Diensts, und speichern Sie die Datei dann als MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Aktualisieren Sie die <storageAccountName> und <storageAccountKey> mit dem Namen und dem Schlüssel Ihres Azure Storage Kontos.
Bedarfsgesteuerter HDInsight-verknüpfter Dienst
Erstellen Sie eine JSON-Datei mit Ihrem bevorzugten Editor, kopieren Sie die folgende JSON-Definition eines Azure HDInsight verknüpften Diensts, und speichern Sie die Datei als MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Aktualisieren Sie die Werte der folgenden Eigenschaften in der Definition des verknüpften Diensts:
- hostSubscriptionId. Ersetzen Sie <subscriptionID> durch die ID Ihres Azure-Abonnements. Der bedarfsgesteuerte HDInsight-Cluster wird in diesem Abonnement erstellt.
- Mandant. Ersetzen Sie <tenantID> durch die ID Ihres Azure Mandanten.
- servicePrincipalId, servicePrincipalKey. Ersetzen Sie <servicePrincipalID> und <servicePrincipalKey> durch ID und Schlüssel Ihres Dienstprinzipals im Microsoft Entra ID. Dieser Dienstprinzipal muss Mitglied der Rolle „Mitwirkender“ in dem Abonnement oder der Ressourcengruppe sein, in dem bzw. der der Cluster erstellt wird. Weitere Informationen finden Sie unter Erstellen einer Microsoft Entra-Anwendung und eines Dienstprinzipals. Die Dienstprinzipal-ID entspricht der Anwendungs-ID und ein Dienstprinzipalschlüssel dem Wert für ein Clientgeheimnis.
- clusterResourceGroup. Ersetzen Sie <resourceGroupOfHDICluster> durch den Namen der Ressourcengruppe, in der der HDInsight-Cluster erstellt werden muss.
Hinweis
Azure HDInsight beschränkt sich auf die Gesamtanzahl der Kerne, die Sie in jeder unterstützten Azure Region verwenden können. Für den On-Demand-HDInsight-Linked-Service wird der HDInsight-Cluster am gleichen Speicherort erstellt, an dem das Azure Storage als primärer Speicher genutzt wird. Stellen Sie sicher, dass Sie über genügend Kernkontingente verfügen, sodass der Cluster erfolgreich erstellt werden kann. Weitere Informationen finden Sie unter Einrichten von Clustern in HDInsight mit Hadoop, Spark, Kafka usw.
Erstellen einer Pipeline
In diesem Schritt erstellen Sie eine neue Pipeline mit einer Spark-Aktivität. Die Aktivität verwendet das Beispiel word count. Laden Sie die Inhalte von diesem Speicherort herunter, falls Sie dies nicht bereits getan haben.
Erstellen Sie in Ihrem bevorzugten Editor eine JSON-Datei, kopieren Sie die folgende JSON-Definition einer Pipelinedefinition hinein, und speichern Sie die Datei als MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Beachten Sie folgende Punkte:
- „rootPath“ verweist auf den Spark-Ordner des Containers „adftutorial“.
- „entryFilePath“ zeigt auf die Datei „WordCount_Spark.py“ im Unterordner „script“ des Ordners „spark“.
Erstellen einer Data Factory
Sie haben in JSON-Dateien Definitionen für einen verknüpften Dienst und eine Pipeline erstellt. Jetzt erstellen Sie eine Data Factory und stellen die JSON-Dateien für den verknüpften Dienst und die Pipeline mithilfe von PowerShell-Cmdlets bereit. Führen Sie nacheinander die folgenden PowerShell-Befehle aus:
Legen Sie nacheinander die Variablen fest.
Ressourcengruppenname
$resourceGroupName = "ADFTutorialResourceGroup"Data Factory-Name. Muss global eindeutig sein
$dataFactoryName = "MyDataFactory09102017"Pipelinename
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelineStarten Sie PowerShell. Lassen Sie Azure PowerShell bis zum Ende dieser Schnellstartanleitung geöffnet. Wenn Sie PowerShell schließen und erneut öffnen, müssen Sie die Befehle erneut ausführen. Wählen Sie für eine Liste Azure Regionen, in denen Data Factory zurzeit verfügbar ist, die Regionen aus, die Sie auf der folgenden Seite interessieren, und erweitern Sie dann Analytics, um Data Factory: Products available by region zu finden. Die Datenspeicher (Azure Storage, Azure SQL-Datenbank, etc.) und Berechnungen (HDInsight, usw.), die von der Data Factory verwendet werden, können sich in anderen Regionen befinden.
Führen Sie den folgenden Befehl aus, und geben Sie den Benutzernamen und das Kennwort ein, mit dem Sie sich beim Azure Portal anmelden:
Connect-AzAccountFühren Sie den folgenden Befehl aus, um alle Abonnements für dieses Konto anzuzeigen:
Get-AzSubscriptionFühren Sie den folgenden Befehl aus, um das gewünschte Abonnement auszuwählen: Ersetzen Sie SubscriptionId durch die ID Ihres Azure-Abonnements:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"Erstellen Sie die Ressourcengruppe: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"Erstellen Sie die Data Factory.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameFühren Sie den folgenden Befehl aus, um die Ausgabe anzuzeigen:
$dfWechseln Sie zu dem Ordner, in dem Sie JSON-Dateien erstellt haben, und führen Sie den folgenden Befehl aus, um einen Azure Storage verknüpften Dienst bereitzustellen:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"Führen Sie den folgenden Befehl aus, um einen bedarfsgesteuerten verknüpften Spark-Dienst bereitzustellen:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"Führen Sie den folgenden Befehl aus, um eine Pipeline bereitzustellen:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Starten und Überwachen einer Pipeline-Ausführung
Starten einer Pipelineausführung Die ID der Pipelineausführung wird für die zukünftige Überwachung ebenfalls erfasst.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameFühren Sie das folgende Skript aus, um den Status der Pipelineausführung kontinuierlich zu überwachen, bis sie beendet ist.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"Hier ist die Ausgabe der Beispielausführung:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"Vergewissern Sie sich, dass im Ordner
outputfilesdes Containers „adftutorial“ ein Ordner mit dem Namensparkund der Ausgabe des Spark-Programms erstellt wurde.
Zugehöriger Inhalt
Die Pipeline in diesem Beispiel kopiert Daten von einem Speicherort an einen anderen Speicherort in einem Azure BLOB-Speicher. Sie haben Folgendes gelernt:
- Erstellen einer Data Factory.
- Erstellen und Bereitstellen von verknüpften Diensten
- Erstellen und Bereitstellen einer Pipeline.
- Starten einer Pipelineausführung
- Überwachen der Pipelineausführung.
Wechseln Sie zum nächsten Lernprogramm, um zu erfahren, wie Sie Daten transformieren, indem Sie das Hive-Skript auf einem Azure HDInsight Cluster ausführen, der sich in einem virtuellen Netzwerk befindet.