Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Пользовательские источники данных PySpark создаются с помощью API Источника данных Python (PySpark), который позволяет читать из пользовательских источников данных и записывать их в пользовательские приемники данных в Apache Spark с помощью Python. Пользовательские источники данных PySpark можно использовать для определения пользовательских подключений к системам данных и реализации дополнительных функций для создания повторно используемых источников данных.
Примечание.
Для пользовательских источников данных PySpark требуется Среда выполнения Databricks 15.4 LTS и более поздней или бессерверная среда версии 2.
Класс DataSource
PySpark DataSource — это базовый класс, который предоставляет методы для создания средств чтения и записи данных.
Реализация подкласса источника данных
В зависимости от варианта использования любой подкласс должен реализовать следующие методы, чтобы источник данных стал доступен для чтения, записи или обоих:
| Свойство или метод | Описание |
|---|---|
name |
Обязательное. Имя источника данных |
schema |
Обязательное. Схема источника данных для чтения или записи |
reader() |
Должен быть возвращен DataSourceReader, чтобы источник данных был доступен для чтения (в режиме пакетной обработки) |
writer() |
Необходимо вернуть DataSourceWriter, чтобы приемник данных стал доступен для записи (пакетная обработка) |
streamReader() или simpleStreamReader() |
Необходимо вернуть DataSourceStreamReader для обеспечения возможности чтения потока данных. |
streamWriter() |
Необходимо вернуть DataSourceStreamWriter, чтобы поток данных можно было сделать доступным для записи (потоковая передача) |
Примечание.
Определённые пользователем DataSource, DataSourceReader, DataSourceWriter, DataSourceStreamReader, DataSourceStreamWriter и их методы должны быть сериализуемыми. Другими словами, они должны представлять собой словарь или вложенный словарь, который содержит примитивный тип.
Регистрация источника данных
После реализации интерфейса необходимо зарегистрировать его, затем можно загрузить или использовать его, как показано в следующем примере:
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
Пример 1. Создание источника данных PySpark для пакетного запроса
Чтобы продемонстрировать возможности чтения PySpark DataSource, создайте источник данных, который создает примеры данных с помощью faker пакета Python. Для получения дополнительной информации о faker см. документацию Faker.
Установите пакет с помощью следующей faker команды:
%pip install faker
Шаг 1. Реализация средства чтения для пакетного запроса
Сначала реализуйте логику чтения для создания примеров данных. Используйте установленную библиотеку faker для заполнения каждого поля в схеме.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
Шаг 2. Определение примера DataSource
Затем определите новый Источник данных PySpark в качестве подкласса DataSource с именем, схемой и читателем. Метод reader() должен быть определен для чтения из источника данных в пакетном запросе.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
Шаг 3. Регистрация и использование примера источника данных
Чтобы использовать источник данных, зарегистрируйте его. По умолчанию FakeDataSource имеет три строки, а схема включает следующие поля string: name, date, zipcode, state. В следующем примере регистрируются, загружаются и выводятся данные из примера источника данных с настройками по умолчанию.
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
Поддерживаются только string поля, но можно указать схему с любыми полями, соответствующими faker полям поставщиков пакетов для создания случайных данных для тестирования и разработки. В следующем примере загружается источник данных с полями name и company.
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
Чтобы загрузить источник данных с пользовательским числом строк, укажите этот параметр numRows. В следующем примере указано 5 строк:
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
Пример 2. Создание GitHub DataSource PySpark с помощью вариантов
Чтобы продемонстрировать использование вариантов в PySpark DataSource, в этом примере создается источник данных, который считывает запросы на извлечение из GitHub.
Примечание.
Варианты поддерживаются пользовательскими источниками данных PySpark в Databricks Runtime 17.1 и выше.
Сведения о вариантах см. в разделе "Запрос вариантов данных".
Шаг 1. Реализуйте ридер для извлечения запросов на вытягивание
Сначала реализуйте логику чтения для извлечения запросов из указанного репозитория GitHub.
class GithubVariantPullRequestReader(DataSourceReader):
def __init__(self, options):
self.token = options.get("token")
self.repo = options.get("path")
if self.repo is None:
raise Exception(f"Must specify a repo in `.load()` method.")
# Every value in this `self.options` dictionary is a string.
self.num_rows = int(options.get("numRows", 10))
def read(self, partition):
header = {
"Accept": "application/vnd.github+json",
}
if self.token is not None:
header["Authorization"] = f"Bearer {self.token}"
url = f"https://api.github.com/repos/{self.repo}/pulls"
response = requests.get(url, headers=header)
response.raise_for_status()
prs = response.json()
for pr in prs[:self.num_rows]:
yield Row(
id = pr.get("number"),
title = pr.get("title"),
user = VariantVal.parseJson(json.dumps(pr.get("user"))),
created_at = pr.get("created_at"),
updated_at = pr.get("updated_at")
)
Шаг 2. Определение GitHub DataSource
Затем определите новый pySpark GitHub DataSource в качестве подкласса DataSource с именем, схемой и методом reader(). Схема включает следующие поля: id, title, user, created_at, updated_at. Поле user определяется как вариант.
import json
import requests
from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal
class GithubVariantDataSource(DataSource):
@classmethod
def name(self):
return "githubVariant"
def schema(self):
return "id int, title string, user variant, created_at string, updated_at string"
def reader(self, schema):
return GithubVariantPullRequestReader(self.options)
Шаг 3. Регистрация и использование источника данных
Чтобы использовать источник данных, зарегистрируйте его. В следующем примере регистрируется, затем загружается источник данных и выводится три строки данных pr репозитория GitHub:
spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id | title | user | created_at | updated_at |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
| 51293 |[SPARK-52586][SQL] Introduce AnyTimeType | {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
| 51292 |[WIP][PYTHON] Arrow UDF for aggregation | {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
| 51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback | {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
Пример 3. Создание PySpark DataSource для потоковой передачи чтения и записи
Чтобы продемонстрировать возможности средства чтения потоков и записи PySpark DataSource, создайте пример источника данных, который создает две строки в каждом микробатче faker с помощью пакета Python. Для получения дополнительной информации о faker см. документацию Faker.
Установите пакет с помощью следующей faker команды:
%pip install faker
Шаг 1. Реализация средства чтения потоков
Во-первых, реализуйте пример средства чтения потоковых данных, который создает две строки в каждом микробатче. Можно реализовать DataSourceStreamReaderили, если источник данных имеет низкую пропускную способность и не требует секционирования, можно реализовать SimpleDataSourceStreamReader вместо этого. Либо simpleStreamReader(), либо streamReader() должны быть реализованы, и simpleStreamReader() вызывается только в том случае, если streamReader() не реализован.
Реализация DataSourceStreamReader
Экземпляр streamReader имеет целочисленное смещение, которое увеличивается на 2 в каждом микробатче, реализованном с помощью интерфейса DataSourceStreamReader.
from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
Реализация SimpleDataSourceStreamReader
Экземпляр SimpleStreamReader совпадает с экземпляром FakeStreamReader , который создает две строки в каждом пакете, но реализуется с интерфейсом SimpleDataSourceStreamReader без секционирования.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
Шаг 2. Реализация модуля записи потоков
Затем реализуйте модуль записи потоковой передачи. Этот модуль для записи потоковых данных записывает метаданные каждого микробатча в локальную директорию.
from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage
class SimpleCommitMessage(WriterCommitMessage):
def __init__(self, partition_id: int, count: int):
self.partition_id = partition_id
self.count = count
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data and then returns the commit message for that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
Шаг 3. Определение примера DataSource
Теперь определите новый PySpark DataSource в качестве подкласса DataSource с именем, схемой и методами streamReader() и streamWriter().
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Шаг 4. Регистрация и использование примера источника данных
Чтобы использовать источник данных, зарегистрируйте его. После регистрации его можно использовать в потоковых запросах в качестве источника или приемника, указав короткое имя или полное имя для format(). В следующем примере регистрируется источник данных, затем запускается запрос, который считывает данные из примера источника и выводит их в консоль.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
Кроме того, следующий код использует пример потока в качестве приемника и указывает выходной путь:
spark.dataSource.register(FakeStreamDataSource)
# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"
query = (
spark.readStream
.format("fakestream")
.load()
.writeStream
.format("fakestream")
.option("path", output_path)
.option("checkpointLocation", checkpoint_path)
.start()
)
Пример 4: Создание стримингового коннектора для Google BigQuery
В следующем примере показано, как создать настраиваемый соединитель потоковой передачи для Google BigQuery (BQ) с помощью PySpark DataSource. Databricks предоставляет соединитель Spark для пакетной загрузки BigQuery, а Федерация Lakehouse также может удаленно подключаться к любому набору данных BigQuery и извлекать данные через создание внешнего каталога, но ни тот, ни другой полностью не поддерживает добавочные или непрерывные стриминг-процессы. Этот соединитель позволяет поэтапно выполнять добавочную миграцию данных и практически в режиме реального времени миграцию из таблиц BigQuery, предоставляемых источниками потоковой передачи с постоянными контрольными точками.
Этот настраиваемый соединитель имеет следующие функции:
- Совместим с Structured Streaming и Lakeflow Spark Declarative Pipelines.
- Поддерживает добавочное отслеживание записей и непрерывное прием потоковой передачи и следует семантике структурированной потоковой передачи.
- Использует API хранилища BigQuery с протоколом на основе RPC для более быстрой, более дешевой передачи данных.
- Записывает перенесенные таблицы непосредственно в каталог Unity.
- Управляет контрольными точками автоматически с помощью добавочного поля на основе даты или метки времени.
- Поддерживает пакетную загрузку с помощью
Trigger.AvailableNow(). - Не требуется промежуточное облачное хранилище.
- Сериализует передачу данных BigQuery в формате Arrow или Avro.
- Обрабатывает автопараллелизм и распределяет работу между исполнителями Spark на основе объёма данных.
- Подходит для миграции слоев Raw и Bronze из BigQuery, с поддержкой миграции для уровней Silver и Gold с использованием шаблонов SCD Type 1 или Type 2.
Необходимые условия
Перед реализацией пользовательского соединителя установите необходимые пакеты:
%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage
Шаг 1. Реализация средства чтения потоков
Сначала реализуйте средство чтения потоковых данных. Подкласс DataSourceStreamReader должен реализовать следующие методы:
initialOffset(self) -> dictlatestOffset(self) -> dictpartitions(self, start: dict, end: dict) -> Sequence[InputPartition]read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]commit(self, end: dict) -> Nonestop(self) -> None
Дополнительные сведения о каждом методе см. в разделе "Методы".
import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging
start_time = time.time()
class RangePartition(InputPartition):
def __init__(self, session: ReadSession, stream_idx: int):
self.session = session
self.stream_idx = stream_idx
class BQStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.project_id = options.get("project_id")
self.dataset = options.get("dataset")
self.table = options.get("table")
self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
self.max_parallel_conn = options.get("max_parallel_conn", 1000)
self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")
self.last_offset = None
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
from datetime import datetime
logging.info("Inside initialOffset!!!!!")
# self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
self.last_offset = '1900-01-01 23:57:12'
return {"offset": str(self.last_offset)}
def latestOffset(self):
"""
Returns the current latest offset that the next microbatch will read to.
"""
from datetime import datetime
from google.cloud import bigquery
if (self.last_offset is None):
self.last_offset = '1900-01-01 23:57:12'
client = bigquery.Client.from_service_account_json(self.json_auth_file)
# max_offset=start["offset"]
logging.info(f"************************last_offset: {self.last_offset}***********************")
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"{x_str}>'{self.last_offset}' or "
f_sql_str = f_sql_str[:-3]
job_query = client.query(
f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
for query in job_query.result():
max_res = query[0]
if (str(max_res).lower() != 'none'):
return {"offset": str(max_res)}
return {"offset": str(self.last_offset)}
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
if (self.last_offset is None):
self.last_offset = end['offset']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
# project_id = self.auth_project_id
client = BigQueryReadClient()
# This example reads baby name data from the public datasets.
table = "projects/{}/datasets/{}/tables/{}".format(
self.project_id, self.dataset, self.table
)
requested_session = bigquery_storage.ReadSession()
requested_session.table = table
if (self.incremental_checkpoint_field != ''):
start_offset = start["offset"]
end_offset = end["offset"]
f_sql_str = ''
for x_str in self.incremental_checkpoint_field.strip().split(","):
f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
f_sql_str = f_sql_str[:-3]
requested_session.read_options.row_restriction = f"{f_sql_str}"
# This example leverages Apache Avro.
requested_session.data_format = bigquery_storage.DataFormat.AVRO
parent = "projects/{}".format(self.project_id)
session = client.create_read_session(
request={
"parent": parent,
"read_session": requested_session,
"max_stream_count": int(self.max_parallel_conn),
},
)
self.last_offset = end['offset']
return [RangePartition(session, i) for i in range(len(session.streams))]
def read(self, partition) -> Iterator[List]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
from datetime import datetime
session = partition.session
stream_idx = partition.stream_idx
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
client_1 = BigQueryReadClient()
# requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
reader = client_1.read_rows(session.streams[stream_idx].name)
reader_iter = []
for message in reader.rows():
reader_iter_in = []
for k, v in message.items():
reader_iter_in.append(v)
# yield(reader_iter)
reader_iter.append(reader_iter_in)
# yield (message['hash'], message['size'], message['virtual_size'], message['version'])
# self.increment_latest_vals.append(max_incr_val)
return iter(reader_iter)
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
Шаг 2. Определение источника данных
Затем определите пользовательский источник данных. Подкласс DataSource должен реализовать следующие методы:
name(cls) -> strschema(self) -> Union[StructType, str]
Дополнительные сведения о каждом методе см. в разделе "Методы".
from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery
class BQStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "bigquery-streaming"
def schema(self):
type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
client = bigquery.Client.from_service_account_json(json_auth_file)
table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
table = client.get_table(table_ref)
original_schema = table.schema
result = []
for schema in original_schema:
col_attr_name = schema.name
if (schema.mode != 'REPEATED'):
col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
else:
col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
result.append(col_attr_name + " " + col_attr_type)
return ",".join(result)
# return "census_tract double,clearance_date string,clearance_status string"
def streamReader(self, schema: StructType):
return BQStreamReader(schema, self.options)
Шаг 3. Настройка и запуск потокового запроса
Наконец, зарегистрируйте соединитель, а затем настройте и запустите запрос потоковой передачи:
spark.dataSource.register(BQStreamDataSource)
# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.
query = (
spark.readStream.format("bigquery-streaming")
.option("project_id", <bq_project_id>)
.option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
.option("dataset", <bq_dataset_name>)
.option("table", <bq_table_name>)
.option("service_auth_json_file_name", <service_account_json_file_name>)
.option("max_parallel_conn", <max_parallel_threads_to_pull_data>) # defaults to max 1000
.load()
)
(
query.writeStream.trigger(processingTime="30 seconds")
.option("checkpointLocation", "checkpoint_path")
.foreachBatch(writeToTable) # your target table write function
.start()
)
Порядок выполнения
Ниже описан порядок выполнения функции в пользовательском потоке.
Для загрузки потокового кадра данных Spark:
name(cls)
schema()
Для микробатча (n) при запуске нового запроса или перезапуске существующего запроса (при наличии новой или существующей контрольной точки):
partitions(end_offset, end_offset) # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset) # plans partitions and distributes to Python workers
read() # user’s source read definition, runs on each Python worker
commit()
Для следующей микробатчи (n+1) выполняющегося запроса на существующей контрольной точке:
latestOffset()
partitions(start_offset, end_offset)
read()
commit()
Примечание.
Функция latestOffset управляет контрольными точками. Совместное использование переменной контрольной точки примитивного типа между функциями и возврат ее в виде словаря. Например: return {"offset": str(self.last_offset)}
Устранение неполадок
Если выходные данные являются следующей ошибкой, вычислительные ресурсы не поддерживают пользовательские источники данных PySpark. Необходимо использовать Databricks Runtime 15.2 или более поздней версии.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000