Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Область применения: SQL Server 2019 (15.x)
Important
Кластеры больших данных Microsoft SQL Server 2019 прекращены. Поддержка кластеров больших данных SQL Server 2019 закончилась с 28 февраля 2025 г. Дополнительные сведения см. в записи блога объявлений и параметрах больших данных на платформе Microsoft SQL Server.
В этом руководстве показано, как использовать задания Spark для загрузки данных в пул данных кластера больших данных SQL Server 2019.
В этом руководстве вы узнаете, как:
- Создайте внешнюю таблицу в пуле данных.
- Создайте задание Spark для загрузки данных из HDFS.
- Запрос результатов во внешней таблице.
Tip
Если вы предпочитаете, вы можете скачать и запустить скрипт для команд, приведенных в этом руководстве. Инструкции см. в примерах пулов данных на сайте GitHub.
Prerequisites
-
Средства работы с большими данными
- kubectl
- Azure Data Studio
- Расширение SQL Server 2019
- Загрузка примеров данных в кластер больших данных
Создание внешней таблицы в пуле данных
Следующие действия создают внешнюю таблицу в пуле данных с именем web_clickstreams_spark_results. Затем эту таблицу можно использовать в качестве расположения для приема данных в кластер больших данных.
В Azure Data Studio подключитесь к главному экземпляру SQL Server кластера больших данных. Дополнительные сведения см. в разделе "Подключение к главному экземпляру SQL Server".
Дважды щелкните на подключении в окне «Серверы», чтобы отобразить панель мониторинга сервера для основного экземпляра SQL Server. Выберите Создать запрос.
Создание разрешений для соединителя MSSQL-Spark.
USE Sales CREATE LOGIN sample_user WITH PASSWORD ='<password>' CREATE USER sample_user FROM LOGIN sample_user -- To create external tables in data pools GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user; -- To create external tables GRANT CREATE TABLE TO sample_user; GRANT ALTER ANY SCHEMA TO sample_user; -- To view database state for Sales GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user ALTER ROLE [db_datawriter] ADD MEMBER sample_userСоздайте внешний источник данных в пуле данных, если он еще не существует.
USE Sales GO IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool') CREATE EXTERNAL DATA SOURCE SqlDataPool WITH (LOCATION = 'sqldatapool://controller-svc/default');Создайте внешнюю таблицу с именем web_clickstreams_spark_results в пуле данных.
USE Sales GO IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results') CREATE EXTERNAL TABLE [web_clickstreams_spark_results] ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT) WITH ( DATA_SOURCE = SqlDataPool, DISTRIBUTION = ROUND_ROBIN );Создайте имя входа для пулов данных и предоставьте пользователю разрешения.
EXECUTE( ' Use Sales; CREATE LOGIN sample_user WITH PASSWORD = ''<password>;'') AT DATA_SOURCE SqlDataPool; EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user; ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
Создание внешней таблицы пула данных — это операция блокировки. Управление возвращается после создания указанной таблицы на всех узлах серверного уровня пула данных. Если во время операции создания произошел сбой, вызывающему объекту возвращается сообщение об ошибке.
Запуск задания потоковой передачи Spark
Далее необходимо создать задание потоковой передачи Spark, которое загружает данные кликов из пула хранения (HDFS) в внешнюю таблицу, которую вы создали в пуле данных. Эти данные были добавлены в /clickstream_data в загрузке примеров данных в кластер больших данных.
В Azure Data Studio подключитесь к главному экземпляру кластера больших данных. Дополнительные сведения см. в разделе "Подключение к кластеру больших данных".
Создайте новую записную книжку и выберите Spark | Scala в качестве ядра.
Запустите задание на поглощение данных в Spark
- Настройка параметров соединителя Spark-SQL
Note
Если кластер больших данных развернут с интеграцией Active Directory, замените значение имени узла ниже, чтобы включить полное доменное имя , добавленное к имени службы. Например , hostname=master-p-svc.<domainName>.
import org.apache.spark.sql.types._ import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame} // Change per your installation val user= "username" val password= "<password>" val database = "MyTestDatabase" val sourceDir = "/clickstream_data" val datapool_table = "web_clickstreams_spark_results" val datasource_name = "SqlDataPool" val schema = StructType(Seq( StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true), StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true) )) val hostname = "master-p-svc" val port = 1433 val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"- Определение и запуск задания Spark
- Каждое задание состоит из двух частей: readStream и writeStream. Далее мы создадим датасет, используя схему, определенную выше, а затем запишем во внешнюю таблицу в хранилище данных.
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame} val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir) val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => batchDF.write .format("com.microsoft.sqlserver.jdbc.spark") .mode("append") .option("url", url) .option("dbtable", datapool_table) .option("user", user) .option("password", password) .option("dataPoolDataSource",datasource_name).save() }.start() query.awaitTermination(40000) query.stop()
Запрос данных
Ниже показано, что задание потоковой передачи Spark загружает данные из HDFS в пул данных.
Прежде чем запрашивать загруженные данные, просмотрите состояние выполнения Spark, включая идентификатор приложения Yarn, пользовательский интерфейс Spark и журналы драйвера. Эти сведения будут отображаться в записной книжке при первом запуске приложения Spark.
Вернитесь к окну запроса главного экземпляра SQL Server, которое было открыто в начале этого руководства.
Выполните следующий запрос, чтобы инспектировать загруженные данные.
USE Sales GO SELECT count(*) FROM [web_clickstreams_spark_results]; SELECT TOP 10 * FROM [web_clickstreams_spark_results];Данные также можно запрашивать в Spark. Например, приведенный ниже код выводит количество записей в таблице:
def df_read(dbtable: String, url: String, dataPoolDataSource: String=""): DataFrame = { spark.read .format("com.microsoft.sqlserver.jdbc.spark") .option("url", url) .option("dbtable", dbtable) .option("user", user) .option("password", password) .option("dataPoolDataSource", dataPoolDataSource) .load() } val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name) println("Number of rows is " + new_df.count)
Clean up
Используйте следующую команду, чтобы удалить объекты базы данных, созданные в этом руководстве.
DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];
Next steps
Узнайте, как запустить пример записной книжки в Azure Data Studio: