Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
DOTYCZY:
Azure Data Factory
Azure Synapse Analytics
Napiwek
Data Factory w usłudze Microsoft Fabric jest następną generacją Azure Data Factory z prostszą architekturą, wbudowaną sztuczną inteligencją i nowymi funkcjami. Jeśli dopiero zaczynasz integrować dane, zacznij od Fabric Data Factory. Istniejące obciążenia ADF można zaktualizować do Fabric, aby uzyskać dostęp do nowych możliwości w zakresie nauki o danych, analiz w czasie rzeczywistym oraz raportowania.
W tym samouczku użyjesz Azure PowerShell do utworzenia potoku usługi Data Factory, który przekształca dane przy użyciu działania platformy Spark i połączonej usługi HDInsight na żądanie. W tym samouczku wykonasz następujące kroki:
- Tworzenie fabryki danych.
- Tworzenie i wdrażanie połączonych usług
- Tworzenie i wdrażanie potoku.
- Uruchom potok.
- Monitoruj przebieg potoku.
Jeśli nie masz subskrypcji Azure, przed rozpoczęciem utwórz konto free.
Wymagania wstępne
Uwaga
Zalecamy użycie modułu Azure Az programu PowerShell do interakcji z Azure. Aby rozpocząć, zobacz Install Azure PowerShell. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az programu PowerShell, zobacz Migrate Azure PowerShell z modułu AzureRM do modułu Az.
- konto Azure Storage. Utworzysz skrypt Python i plik wejściowy, a następnie przekażesz go do magazynu Azure. Dane wyjściowe programu Spark są przechowywane w tym koncie magazynowym. Klaster platformy Spark na żądanie używa tego samego konta magazynowego do przechowywania podstawowych danych.
- Azure PowerShell. Postępuj zgodnie z instrukcjami w Jak zainstalować i skonfigurować Azure PowerShell.
Przekazywanie skryptu Python do konta Blob Storage
Utwórz plik Python o nazwie WordCount_Spark.py z następującą zawartością:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()Zastąp <storageAccountName> nazwą konta Azure Storage. Następnie zapisz plik.
W Azure Blob Storage utwórz kontener o nazwie adftutorial jeśli nie istnieje.
Utwórz folder o nazwie spark.
Utwórz podfolder o nazwie script w folderze spark.
Przekaż plik WordCount_Spark.py do podfolderu script.
Przekazywanie pliku wejściowego
- Utwórz plik o nazwie minecraftstory.txt zawierający tekst. Program platformy Spark zlicza liczbę słów w tym tekście.
- Utwórz podfolder o nazwie
inputfilesw folderzespark. - Przekaż
minecraftstory.txtdo podfolderuinputfiles.
Usługi powiązane przez autora
W tej sekcji utworzysz dwie Usługi Połączone.
- Połączona usługa Azure Storage, która łączy konto Azure Storage z fabryką danych. Ta pamięć masowa jest wykorzystywana przez klaster HDInsight na żądanie. Zawiera on także skrypt platformy Spark do wykonania.
- Usługa powiązana HDInsight dostępna na żądanie. Azure Data Factory automatycznie tworzy klaster usługi HDInsight, uruchamia program Spark, a następnie usuwa go po bezczynności przez wstępnie skonfigurowany czas.
połączona usługa Azure Storage
Utwórz plik JSON przy użyciu preferowanego edytora, skopiuj następującą definicję JSON połączonej usługi Azure Storage, a następnie zapisz plik jako MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Zaktualizuj <storageAccountName> i <storageAccountKey> nazwą i kluczem konta Azure Storage.
Połączona usługa HDInsight na żądanie
Utwórz plik JSON przy użyciu preferowanego edytora, skopiuj następującą definicję JSON połączonej usługi Azure HDInsight i zapisz plik jako MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Zaktualizuj wartości następujących właściwości w definicji połączonej usługi:
- hostSubscriptionId. Zastąp <subscriptionID> identyfikatorem subskrypcji Azure. Klaster usługi HDInsight na żądanie jest tworzony w tej subskrypcji.
- tenant. Zastąp <tenantID> ID twojego najemcy w Azure.
- servicePrincipalId, servicePrincipalKey. Zastąp <servicePrincipalID> i <servicePrincipalKey> identyfikatorem i kluczem głównego użytkownika usługi w Microsoft Entra ID. Jednostka usługi musi być członkiem roli współautora subskrypcji lub grupy zasobów, gdzie został utworzony klaster. Aby uzyskać szczegółowe informacje, zobacz tworzenie aplikacji Microsoft Entra i jednostki usługi. Identyfikator jednostki usługi jest odpowiednikiem identyfikatora aplikacji, a klucz jednostki usługi jest odpowiednikiem wartości klucza tajnego klienta.
- clusterResourceGroup. Zastąp właściwość <resourceGroupOfHDICluster> nazwą grupy zasobów, w której ma zostać utworzony klaster usługi HDInsight.
Uwaga
Azure HDInsight ma ograniczenie całkowitej liczby rdzeni, których można używać w każdym Azure regionie, który obsługuje. W przypadku połączonej usługi HDInsight na żądanie, klaster HDInsight zostanie utworzony w tej samej lokalizacji co Azure Storage używany jako jego podstawowy magazyn. Upewnij się, że masz wystarczająco duże limity przydziału dla klastra, aby można go było pomyślnie utworzyć. Aby uzyskać więcej informacji, zobacz Set up clusters in HDInsight with Hadoop, Spark, Kafka and more (Konfigurowanie klastrów w usłudze HDInsight za pomocą platform Hadoop, Spark, Kafka i innych).
Tworzenie potoku
W tym kroku utworzysz nowy potok przy użyciu aktywności Spark. Działanie wykorzystuje próbkę liczby słów. Jeśli jeszcze tego nie zrobiono, pobierz zawartość z tej lokalizacji.
Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą definicji potoku, a następnie zapisz go jako MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Należy uwzględnić następujące informacje:
- Właściwość rootPath wskazuje na folder spark kontenera adftutorial.
- Właściwość entryFilePath wskazuje na plik WordCount_Spark.py w podfolderze script folderu spark.
Tworzenie fabryki danych
Napisałeś definicje połączonej usługi i potoków w plikach JSON. Teraz utwórzmy Data Factory i wdrożmy powiązane pliki JSON usługi oraz potoku za pomocą poleceń cmdlet programu PowerShell. Uruchom następujące polecenia programu PowerShell jedno po drugim:
Ustaw zmienne jedną po drugiej.
Nazwa grupy zasobów
$resourceGroupName = "ADFTutorialResourceGroup"Nazwa fabryki danych. Musi ona być unikatowa w skali globalnej
$dataFactoryName = "MyDataFactory09102017"Nazwa potoku
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipelineUruchom program PowerShell. Pozostaw Azure PowerShell otwarte do końca tego szybkiego startu. Jeśli go zamkniesz i otworzysz ponownie, musisz uruchomić te polecenia jeszcze raz. Aby uzyskać listę regionów Azure, w których usługa Data Factory jest obecnie dostępna, wybierz regiony, które Cię interesują na następującej stronie, a następnie rozwiń węzeł Analytics aby zlokalizować Data Factory: Products available by region. Magazyny danych (Azure Storage, Azure SQL Database itp.) i obliczenia (HDInsight itp.) używane przez fabrykę danych mogą znajdować się w innych regionach.
Uruchom następujące polecenie, a następnie wprowadź nazwę użytkownika i hasło używane do logowania się do portalu Azure:
Connect-AzAccountUruchom poniższe polecenie, aby wyświetlić wszystkie subskrypcje dla tego konta:
Get-AzSubscriptionUruchom poniższe polecenie, aby wybrać subskrypcję, z którą chcesz pracować. Zastąp SubscriptionId identyfikatorem subskrypcji Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"Utwórz grupę zasobów: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"Utwórz fabrykę danych.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupNameWykonaj następujące polecenie, aby wyświetlić dane wyjściowe:
$dfPrzejdź do folderu, w którym utworzono pliki JSON, i uruchom następujące polecenie, aby wdrożyć połączoną usługę Azure Storage:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"Uruchom następujące polecenie, aby wdrożyć połączoną usługę Spark na żądanie:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"Uruchom następujące polecenie, aby wdrożyć potok przetwarzania:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Uruchamianie i monitorowanie działania potoku
Uruchom potok. Pozwala również na przechwycenie identyfikatora uruchomienia potoku do przyszłego monitorowania.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineNameUruchom następujący skrypt, aby stale sprawdzać stan uruchomienia potoku do momentu zakończenia jego działania.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"Oto wynik przykładowego uruchomienia:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"Potwierdź, że folder o nazwie
outputfilesjest tworzony w folderzesparkkontenera adftutorial zawierającym dane wyjściowe z programu platformy Spark.
Powiązana zawartość
Przepływ pracy w tym przykładzie kopiuje dane z jednej lokalizacji do drugiej w magazynie obiektów blob Azure. Nauczyłeś się jak:
- Tworzenie fabryki danych.
- Tworzenie i wdrażanie połączonych usług
- Tworzenie i wdrażanie potoku.
- Uruchom potok.
- Monitoruj przebieg potoku.
Przejdź do następnego samouczka, aby dowiedzieć się, jak przekształcać dane, uruchamiając skrypt Programu Hive w klastrze Azure HDInsight, który znajduje się w sieci wirtualnej.