Udostępnij za pośrednictwem


Przekształcanie danych w chmurze przy użyciu działania platformy Spark w Azure Data Factory

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Data Factory w usłudze Microsoft Fabric jest następną generacją Azure Data Factory z prostszą architekturą, wbudowaną sztuczną inteligencją i nowymi funkcjami. Jeśli dopiero zaczynasz integrować dane, zacznij od Fabric Data Factory. Istniejące obciążenia ADF można zaktualizować do Fabric, aby uzyskać dostęp do nowych możliwości w zakresie nauki o danych, analiz w czasie rzeczywistym oraz raportowania.

W tym samouczku użyjesz Azure PowerShell do utworzenia potoku usługi Data Factory, który przekształca dane przy użyciu działania platformy Spark i połączonej usługi HDInsight na żądanie. W tym samouczku wykonasz następujące kroki:

  • Tworzenie fabryki danych.
  • Tworzenie i wdrażanie połączonych usług
  • Tworzenie i wdrażanie potoku.
  • Uruchom potok.
  • Monitoruj przebieg potoku.

Jeśli nie masz subskrypcji Azure, przed rozpoczęciem utwórz konto free.

Wymagania wstępne

Uwaga

Zalecamy użycie modułu Azure Az programu PowerShell do interakcji z Azure. Aby rozpocząć, zobacz Install Azure PowerShell. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az programu PowerShell, zobacz Migrate Azure PowerShell z modułu AzureRM do modułu Az.

  • konto Azure Storage. Utworzysz skrypt Python i plik wejściowy, a następnie przekażesz go do magazynu Azure. Dane wyjściowe programu Spark są przechowywane w tym koncie magazynowym. Klaster platformy Spark na żądanie używa tego samego konta magazynowego do przechowywania podstawowych danych.
  • Azure PowerShell. Postępuj zgodnie z instrukcjami w Jak zainstalować i skonfigurować Azure PowerShell.

Przekazywanie skryptu Python do konta Blob Storage

  1. Utwórz plik Python o nazwie WordCount_Spark.py z następującą zawartością:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Zastąp <storageAccountName> nazwą konta Azure Storage. Następnie zapisz plik.

  3. W Azure Blob Storage utwórz kontener o nazwie adftutorial jeśli nie istnieje.

  4. Utwórz folder o nazwie spark.

  5. Utwórz podfolder o nazwie script w folderze spark.

  6. Przekaż plik WordCount_Spark.py do podfolderu script.

Przekazywanie pliku wejściowego

  1. Utwórz plik o nazwie minecraftstory.txt zawierający tekst. Program platformy Spark zlicza liczbę słów w tym tekście.
  2. Utwórz podfolder o nazwie inputfiles w folderze spark.
  3. Przekaż minecraftstory.txt do podfolderu inputfiles.

Usługi powiązane przez autora

W tej sekcji utworzysz dwie Usługi Połączone.

  • Połączona usługa Azure Storage, która łączy konto Azure Storage z fabryką danych. Ta pamięć masowa jest wykorzystywana przez klaster HDInsight na żądanie. Zawiera on także skrypt platformy Spark do wykonania.
  • Usługa powiązana HDInsight dostępna na żądanie. Azure Data Factory automatycznie tworzy klaster usługi HDInsight, uruchamia program Spark, a następnie usuwa go po bezczynności przez wstępnie skonfigurowany czas.

połączona usługa Azure Storage

Utwórz plik JSON przy użyciu preferowanego edytora, skopiuj następującą definicję JSON połączonej usługi Azure Storage, a następnie zapisz plik jako MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Zaktualizuj <storageAccountName> i <storageAccountKey> nazwą i kluczem konta Azure Storage.

Połączona usługa HDInsight na żądanie

Utwórz plik JSON przy użyciu preferowanego edytora, skopiuj następującą definicję JSON połączonej usługi Azure HDInsight i zapisz plik jako MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Zaktualizuj wartości następujących właściwości w definicji połączonej usługi:

  • hostSubscriptionId. Zastąp <subscriptionID> identyfikatorem subskrypcji Azure. Klaster usługi HDInsight na żądanie jest tworzony w tej subskrypcji.
  • tenant. Zastąp <tenantID> ID twojego najemcy w Azure.
  • servicePrincipalId, servicePrincipalKey. Zastąp <servicePrincipalID> i <servicePrincipalKey> identyfikatorem i kluczem głównego użytkownika usługi w Microsoft Entra ID. Jednostka usługi musi być członkiem roli współautora subskrypcji lub grupy zasobów, gdzie został utworzony klaster. Aby uzyskać szczegółowe informacje, zobacz tworzenie aplikacji Microsoft Entra i jednostki usługi. Identyfikator jednostki usługi jest odpowiednikiem identyfikatora aplikacji, a klucz jednostki usługi jest odpowiednikiem wartości klucza tajnego klienta.
  • clusterResourceGroup. Zastąp właściwość <resourceGroupOfHDICluster> nazwą grupy zasobów, w której ma zostać utworzony klaster usługi HDInsight.

Uwaga

Azure HDInsight ma ograniczenie całkowitej liczby rdzeni, których można używać w każdym Azure regionie, który obsługuje. W przypadku połączonej usługi HDInsight na żądanie, klaster HDInsight zostanie utworzony w tej samej lokalizacji co Azure Storage używany jako jego podstawowy magazyn. Upewnij się, że masz wystarczająco duże limity przydziału dla klastra, aby można go było pomyślnie utworzyć. Aby uzyskać więcej informacji, zobacz Set up clusters in HDInsight with Hadoop, Spark, Kafka and more (Konfigurowanie klastrów w usłudze HDInsight za pomocą platform Hadoop, Spark, Kafka i innych).

Tworzenie potoku

W tym kroku utworzysz nowy potok przy użyciu aktywności Spark. Działanie wykorzystuje próbkę liczby słów. Jeśli jeszcze tego nie zrobiono, pobierz zawartość z tej lokalizacji.

Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą definicji potoku, a następnie zapisz go jako MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Należy uwzględnić następujące informacje:

  • Właściwość rootPath wskazuje na folder spark kontenera adftutorial.
  • Właściwość entryFilePath wskazuje na plik WordCount_Spark.py w podfolderze script folderu spark.

Tworzenie fabryki danych

Napisałeś definicje połączonej usługi i potoków w plikach JSON. Teraz utwórzmy Data Factory i wdrożmy powiązane pliki JSON usługi oraz potoku za pomocą poleceń cmdlet programu PowerShell. Uruchom następujące polecenia programu PowerShell jedno po drugim:

  1. Ustaw zmienne jedną po drugiej.

    Nazwa grupy zasobów

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nazwa fabryki danych. Musi ona być unikatowa w skali globalnej

    $dataFactoryName = "MyDataFactory09102017"
    

    Nazwa potoku

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Uruchom program PowerShell. Pozostaw Azure PowerShell otwarte do końca tego szybkiego startu. Jeśli go zamkniesz i otworzysz ponownie, musisz uruchomić te polecenia jeszcze raz. Aby uzyskać listę regionów Azure, w których usługa Data Factory jest obecnie dostępna, wybierz regiony, które Cię interesują na następującej stronie, a następnie rozwiń węzeł Analytics aby zlokalizować Data Factory: Products available by region. Magazyny danych (Azure Storage, Azure SQL Database itp.) i obliczenia (HDInsight itp.) używane przez fabrykę danych mogą znajdować się w innych regionach.

    Uruchom następujące polecenie, a następnie wprowadź nazwę użytkownika i hasło używane do logowania się do portalu Azure:

    Connect-AzAccount
    

    Uruchom poniższe polecenie, aby wyświetlić wszystkie subskrypcje dla tego konta:

    Get-AzSubscription
    

    Uruchom poniższe polecenie, aby wybrać subskrypcję, z którą chcesz pracować. Zastąp SubscriptionId identyfikatorem subskrypcji Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Utwórz grupę zasobów: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Utwórz fabrykę danych.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Wykonaj następujące polecenie, aby wyświetlić dane wyjściowe:

    $df
    
  5. Przejdź do folderu, w którym utworzono pliki JSON, i uruchom następujące polecenie, aby wdrożyć połączoną usługę Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Uruchom następujące polecenie, aby wdrożyć połączoną usługę Spark na żądanie:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Uruchom następujące polecenie, aby wdrożyć potok przetwarzania:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Uruchamianie i monitorowanie działania potoku

  1. Uruchom potok. Pozwala również na przechwycenie identyfikatora uruchomienia potoku do przyszłego monitorowania.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Uruchom następujący skrypt, aby stale sprawdzać stan uruchomienia potoku do momentu zakończenia jego działania.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Oto wynik przykładowego uruchomienia:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Potwierdź, że folder o nazwie outputfiles jest tworzony w folderze spark kontenera adftutorial zawierającym dane wyjściowe z programu platformy Spark.

Przepływ pracy w tym przykładzie kopiuje dane z jednej lokalizacji do drugiej w magazynie obiektów blob Azure. Nauczyłeś się jak:

  • Tworzenie fabryki danych.
  • Tworzenie i wdrażanie połączonych usług
  • Tworzenie i wdrażanie potoku.
  • Uruchom potok.
  • Monitoruj przebieg potoku.

Przejdź do następnego samouczka, aby dowiedzieć się, jak przekształcać dane, uruchamiając skrypt Programu Hive w klastrze Azure HDInsight, który znajduje się w sieci wirtualnej.