Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Na tej stronie opisano sposób tworzenia profilu danych w usłudze Databricks przy użyciu zestawu SDK usługi Databricks i opisano parametry używane w wywołaniach interfejsu API. Możesz również utworzyć profil danych i zarządzać nim przy użyciu interfejsu API REST.
Aby uzyskać informacje referencyjne, zobacz dokumentację zestawu SDK profilowania danych i dokumentację interfejsu API REST.
Można utworzyć profil na dowolnej zarządzanej lub zewnętrznej tabeli Delta zarejestrowanej w Unity Catalog. W metamagazynie Katalogu Unity można utworzyć tylko jeden profil dla każdej tabeli.
Uwaga / Notatka
Aby uzyskać informacje o przestarzałym quality_monitors interfejsie API, zobacz Tworzenie profilu danych przy użyciu interfejsu quality_monitors API (przestarzałe).
Wymagania
Aby użyć najnowszej wersji interfejsu API, użyj następującego polecenia na początku notesu, aby zainstalować klienta języka Python:
%pip install "databricks-sdk>=0.68.0"
Aby uwierzytelnić się w celu korzystania z zestawu SDK usługi Databricks w danym środowisku, zobacz Uwierzytelnianie.
Typy profilów
Podczas tworzenia profilu wybierasz jeden z następujących typów profilów: TimeSeries, InferenceLoglub Snapshot. W tej sekcji krótko opisano każdą opcję. Aby uzyskać szczegółowe informacje, zobacz dokumentację zestawu SDK profilowania danych lub dokumentację interfejsu API REST.
Uwaga / Notatka
- Po pierwszym utworzeniu szeregu czasowego lub profilu wnioskowania usługa Databricks analizuje tylko dane z 30 dni przed jego utworzeniem. Po utworzeniu profilu wszystkie nowe dane są przetwarzane.
- Profile zdefiniowane na zmaterializowanych widokach nie obsługują przetwarzania przyrostowego.
Wskazówka
W przypadku profilów TimeSeries i Inference najlepszym rozwiązaniem jest włączenie strumienia danych zmian (CDF) w tabeli. Po włączeniu usługi CDF przetwarzane są tylko nowo dołączone dane, a nie ponowne przetwarzanie całej tabeli przy każdym odświeżeniu. Dzięki temu wykonywanie jest bardziej wydajne i zmniejsza koszty w miarę skalowania w wielu tabelach.
TimeSeries profil
Profil TimeSeries porównuje dystrybucje danych w oknach czasowych.
TimeSeries W przypadku profilu należy podać następujące informacje:
- Kolumna znacznika czasu (
timestamp_column). Typ danych kolumny sygnatury czasowej musi być typuTIMESTAMPlub typu, który można przekonwertować na znaczniki czasu przy użyciuto_timestampfunkcji PySpark. - Zestaw, dla którego mają
granularitiesbyć obliczane metryki. Dostępne są następujące szczegółowości:- AGGREGATION_GRANULARITY_5_MINUTES (agregacja_granulacja_5_minut)
- AGREGACJA_GRANULARNOŚĆ_30_MINUT
- AGREGACJA_GRANULARNOŚĆ_1_GODZINA
- AGGREGATION_GRANULARITY_1_DAY (agregacja_ziarnistość_1_dzień)
- GRANULARNOŚĆ_AGREGACJI_1_TYDZIEŃ
- GRANULACJA_AGREGACJI_2_TYGODNIE
- AGREGACJA_GRANULARNOŚĆ_3_TYGODNIE
- AGREGACJA_GRANULARNOŚĆ_4_TYGODNIE
- Ziarno Agregacji: 1 Miesiąc
- AGGREGATION_GRANULARITY_1_YEAR
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table", # object_type is always "table" for data profiling
object_id=table.table_id,
data_profiling_config=config,
),
)
InferenceLog profil
Profil InferenceLog jest podobny do TimeSeries profilu, ale zawiera również metryki jakości modelu.
InferenceLog profile używają następujących parametrów:
| Parameter | Opis |
|---|---|
problem_type |
MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION lub MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION |
prediction_column |
Kolumna zawierająca przewidywane wartości modelu. |
timestamp_column |
Kolumna zawierająca znacznik czasu żądania wnioskowania. |
model_id_column |
Kolumna zawierająca identyfikator modelu używanego do przewidywania. |
granularities |
Określa sposób podziału danych na partycje w oknach czasowych. Zobacz TimeSeries profil , aby uzyskać dostępne wartości. |
label_column |
(Opcjonalnie) Kolumna zawierająca podstawowe informacje dotyczące prognoz modelu. |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
W przypadku InferenceLog profilów wycinki są tworzone automatycznie na podstawie unikatowych wartości .model_id_col
Snapshot profil
W przeciwieństwie do TimeSeries, Snapshot przedstawia, jak pełna zawartość tabeli zmienia się z czasem. Metryki są obliczane względem wszystkich danych w tabeli i odzwierciedlają stan tabeli przy każdym odświeżeniu profilu.
Uwaga / Notatka
Maksymalny rozmiar tabeli dla profilu migawki to 4 TB. W przypadku większych tabel należy zamiast tego użyć profilów szeregów czasowych.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)
Odświeżanie i wyświetlanie wyników
Aby wyświetlić historię odświeżania, należy użyć obszaru roboczego usługi Databricks, z którego włączono profilowanie danych.
Aby odświeżyć tabele metryk, użyj polecenia create_refresh. Przykład:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)
Podczas wywoływania create_refresh z notesu tabele metryk są tworzone lub aktualizowane. To obliczenie jest wykonywane na bezserwerowej infrastrukturze obliczeniowej, a nie na klastrze, do którego dołączony jest notes. Możesz nadal uruchamiać polecenia w notesie podczas aktualizowania statystyk.
Aby uzyskać informacje o statystykach przechowywanych w tabelach metryk, zobacz Monitorowanie tabel metryk. Tabele metryk to tabele katalogu Unity. Zapytania można wykonywać w notesach lub w Eksploratorze zapytań SQL i wyświetlać je w Eksploratorze wykazu.
Aby wyświetlić historię wszystkich odświeżeń skojarzonych z profilem, użyj polecenia list_refreshes.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
Aby uzyskać stan określonej operacji, która jest w kolejce, działa lub została zakończona, użyj polecenia get_refresh.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)
Wyświetlanie ustawień profilu
Ustawienia profilu można przejrzeć przy użyciu interfejsu API get_monitor.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)
Harmonogram
Aby skonfigurować profil do uruchamiania zgodnie z harmonogramem, użyj parametru schedule :create_monitor
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Aby uzyskać więcej informacji, zobacz wyrażenia cron .
Powiadomienia
Aby skonfigurować powiadomienia dla profilu, użyj parametru notificationscreate_monitor:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination
w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)
info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)
Na typ zdarzenia jest obsługiwanych maksymalnie 5 adresów e-mail (na przykład "on_failure").
Kontrolowanie dostępu do tabel metryk
Tabele metryk i pulpit nawigacyjny utworzony przez profil są własnością użytkownika, który utworzył profil. Uprawnienia Unity Catalog umożliwiają kontrolę dostępu do tabel z metrykami. Aby udostępnić pulpity nawigacyjne w obszarze roboczym, użyj przycisku Udostępnij w prawym górnym rogu pulpitu nawigacyjnego.
Usuwanie profilu
Aby usunąć profil:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)
To polecenie nie usuwa tabel profilów i pulpitu nawigacyjnego utworzonego przez profil. Musisz usunąć te zasoby w osobnym kroku lub zapisać je w innej lokalizacji.
Przykładowe notatniki
W poniższych przykładowych notesach pokazano, jak utworzyć profil, odświeżyć profil i zbadać utworzone tabele metryk.
Przykład notatnika: profil szeregów czasowych
W tym notesie pokazano, jak utworzyć TimeSeries profil typu.
Przykładowy notatnik dotyczący profilu TimeSeries
Przykład notatnika: profil predykcji (regresja)
W tym notesie pokazano, jak utworzyć profil typu InferenceLog dla problemu regresji.
Przykładowy notatnik regresji wnioskowania profilu
Przykład notesu: profil wnioskowania (klasyfikacja)
W tym notesie pokazano, jak utworzyć InferenceLog profil typu dla problemu klasyfikacji.
Przykładowy notes klasyfikacji profilu wnioskowania
Przykład notatnika: profil migawki
W tym notesie pokazano, jak utworzyć Snapshot profil typu.