Поделиться через


Руководство. Прием данных в пул данных SQL Server с заданиями Spark

Область применения: SQL Server 2019 (15.x)

Important

Кластеры больших данных Microsoft SQL Server 2019 прекращены. Поддержка кластеров больших данных SQL Server 2019 закончилась с 28 февраля 2025 г. Дополнительные сведения см. в записи блога объявлений и параметрах больших данных на платформе Microsoft SQL Server.

В этом руководстве показано, как использовать задания Spark для загрузки данных в пул данных кластера больших данных SQL Server 2019.

В этом руководстве вы узнаете, как:

  • Создайте внешнюю таблицу в пуле данных.
  • Создайте задание Spark для загрузки данных из HDFS.
  • Запрос результатов во внешней таблице.

Tip

Если вы предпочитаете, вы можете скачать и запустить скрипт для команд, приведенных в этом руководстве. Инструкции см. в примерах пулов данных на сайте GitHub.

Prerequisites

Создание внешней таблицы в пуле данных

Следующие действия создают внешнюю таблицу в пуле данных с именем web_clickstreams_spark_results. Затем эту таблицу можно использовать в качестве расположения для приема данных в кластер больших данных.

  1. В Azure Data Studio подключитесь к главному экземпляру SQL Server кластера больших данных. Дополнительные сведения см. в разделе "Подключение к главному экземпляру SQL Server".

  2. Дважды щелкните на подключении в окне «Серверы», чтобы отобразить панель мониторинга сервера для основного экземпляра SQL Server. Выберите Создать запрос.

    Запрос главного экземпляра SQL Server

  3. Создание разрешений для соединителя MSSQL-Spark.

    USE Sales
    CREATE LOGIN sample_user  WITH PASSWORD ='<password>' 
    CREATE USER sample_user FROM LOGIN sample_user
    
    -- To create external tables in data pools
    GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
    
    -- To create external tables
    GRANT CREATE TABLE TO sample_user;
    GRANT ALTER ANY SCHEMA TO sample_user;
    
    -- To view database state for Sales
    GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
    
    ALTER ROLE [db_datareader] ADD MEMBER sample_user
    ALTER ROLE [db_datawriter] ADD MEMBER sample_user
    
  4. Создайте внешний источник данных в пуле данных, если он еще не существует.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
      CREATE EXTERNAL DATA SOURCE SqlDataPool
      WITH (LOCATION = 'sqldatapool://controller-svc/default');
    
  5. Создайте внешнюю таблицу с именем web_clickstreams_spark_results в пуле данных.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
       CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
       ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
       WITH
       (
          DATA_SOURCE = SqlDataPool,
          DISTRIBUTION = ROUND_ROBIN
       );
    
  6. Создайте имя входа для пулов данных и предоставьте пользователю разрешения.

    EXECUTE( ' Use Sales; CREATE LOGIN sample_user  WITH PASSWORD = ''<password>;'') AT  DATA_SOURCE SqlDataPool;
    
    EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user;  ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
    

Создание внешней таблицы пула данных — это операция блокировки. Управление возвращается после создания указанной таблицы на всех узлах серверного уровня пула данных. Если во время операции создания произошел сбой, вызывающему объекту возвращается сообщение об ошибке.

Запуск задания потоковой передачи Spark

Далее необходимо создать задание потоковой передачи Spark, которое загружает данные кликов из пула хранения (HDFS) в внешнюю таблицу, которую вы создали в пуле данных. Эти данные были добавлены в /clickstream_data в загрузке примеров данных в кластер больших данных.

  1. В Azure Data Studio подключитесь к главному экземпляру кластера больших данных. Дополнительные сведения см. в разделе "Подключение к кластеру больших данных".

  2. Создайте новую записную книжку и выберите Spark | Scala в качестве ядра.

  3. Запустите задание на поглощение данных в Spark

    1. Настройка параметров соединителя Spark-SQL

    Note

    Если кластер больших данных развернут с интеграцией Active Directory, замените значение имени узла ниже, чтобы включить полное доменное имя , добавленное к имени службы. Например , hostname=master-p-svc.<domainName>.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
    
    // Change per your installation
    val user= "username"
    val password= "<password>"
    val database =  "MyTestDatabase"
    val sourceDir = "/clickstream_data"
    val datapool_table = "web_clickstreams_spark_results"
    val datasource_name = "SqlDataPool"
    val schema = StructType(Seq(
    StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), 
    StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
    StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
    ))
    
    val hostname = "master-p-svc"
    val port = 1433
    val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
    
    1. Определение и запуск задания Spark
      • Каждое задание состоит из двух частей: readStream и writeStream. Далее мы создадим датасет, используя схему, определенную выше, а затем запишем во внешнюю таблицу в хранилище данных.
      import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
      
      val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
      val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                 .format("com.microsoft.sqlserver.jdbc.spark")
                 .mode("append")
                  .option("url", url)
                  .option("dbtable", datapool_table)
                  .option("user", user)
                  .option("password", password)
                  .option("dataPoolDataSource",datasource_name).save()
               }.start()
      
      query.awaitTermination(40000)
      query.stop()
      

Запрос данных

Ниже показано, что задание потоковой передачи Spark загружает данные из HDFS в пул данных.

  1. Прежде чем запрашивать загруженные данные, просмотрите состояние выполнения Spark, включая идентификатор приложения Yarn, пользовательский интерфейс Spark и журналы драйвера. Эти сведения будут отображаться в записной книжке при первом запуске приложения Spark.

    Сведения о выполнении Spark

  2. Вернитесь к окну запроса главного экземпляра SQL Server, которое было открыто в начале этого руководства.

  3. Выполните следующий запрос, чтобы инспектировать загруженные данные.

    USE Sales
    GO
    SELECT count(*) FROM [web_clickstreams_spark_results];
    SELECT TOP 10 * FROM [web_clickstreams_spark_results];
    
  4. Данные также можно запрашивать в Spark. Например, приведенный ниже код выводит количество записей в таблице:

    def df_read(dbtable: String,
                 url: String,
                 dataPoolDataSource: String=""): DataFrame = {
         spark.read
              .format("com.microsoft.sqlserver.jdbc.spark")
              .option("url", url)
              .option("dbtable", dbtable)
              .option("user", user)
              .option("password", password)
              .option("dataPoolDataSource", dataPoolDataSource)
              .load()
              }
    
    val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
    println("Number of rows is " +  new_df.count)
    

Clean up

Используйте следующую команду, чтобы удалить объекты базы данных, созданные в этом руководстве.

DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];

Next steps

Узнайте, как запустить пример записной книжки в Azure Data Studio: