Tworzenie profilu danych przy użyciu interfejsu API

Na tej stronie opisano sposób tworzenia profilu danych w usłudze Databricks przy użyciu zestawu SDK usługi Databricks i opisano parametry używane w wywołaniach interfejsu API. Możesz również utworzyć profil danych i zarządzać nim przy użyciu interfejsu API REST.

Aby uzyskać informacje referencyjne, zobacz dokumentację zestawu SDK profilowania danych i dokumentację interfejsu API REST.

Można utworzyć profil na dowolnej zarządzanej lub zewnętrznej tabeli Delta zarejestrowanej w Unity Catalog. W metamagazynie Katalogu Unity można utworzyć tylko jeden profil dla każdej tabeli.

Uwaga / Notatka

Aby uzyskać informacje o przestarzałym quality_monitors interfejsie API, zobacz Tworzenie profilu danych przy użyciu interfejsu quality_monitors API (przestarzałe).

Wymagania

Aby użyć najnowszej wersji interfejsu API, użyj następującego polecenia na początku notesu, aby zainstalować klienta języka Python:

%pip install "databricks-sdk>=0.68.0"

Aby uwierzytelnić się w celu korzystania z zestawu SDK usługi Databricks w danym środowisku, zobacz Uwierzytelnianie.

Typy profilów

Podczas tworzenia profilu wybierasz jeden z następujących typów profilów: TimeSeries, InferenceLoglub Snapshot. W tej sekcji krótko opisano każdą opcję. Aby uzyskać szczegółowe informacje, zobacz dokumentację zestawu SDK profilowania danych lub dokumentację interfejsu API REST.

Uwaga / Notatka

  • Po pierwszym utworzeniu szeregu czasowego lub profilu wnioskowania usługa Databricks analizuje tylko dane z 30 dni przed jego utworzeniem. Po utworzeniu profilu wszystkie nowe dane są przetwarzane.
  • Profile zdefiniowane na zmaterializowanych widokach nie obsługują przetwarzania przyrostowego.

Wskazówka

W przypadku profilów TimeSeries i Inference najlepszym rozwiązaniem jest włączenie strumienia danych zmian (CDF) w tabeli. Po włączeniu usługi CDF przetwarzane są tylko nowo dołączone dane, a nie ponowne przetwarzanie całej tabeli przy każdym odświeżeniu. Dzięki temu wykonywanie jest bardziej wydajne i zmniejsza koszty w miarę skalowania w wielu tabelach.

TimeSeries profil

Profil TimeSeries porównuje dystrybucje danych w oknach czasowych. TimeSeries W przypadku profilu należy podać następujące informacje:

  • Kolumna znacznika czasu (timestamp_column). Typ danych kolumny sygnatury czasowej musi być typu TIMESTAMP lub typu, który można przekonwertować na znaczniki czasu przy użyciu to_timestampfunkcji PySpark.
  • Zestaw, dla którego mają granularities być obliczane metryki. Dostępne są następujące szczegółowości:
    • AGGREGATION_GRANULARITY_5_MINUTES (agregacja_granulacja_5_minut)
    • AGREGACJA_GRANULARNOŚĆ_30_MINUT
    • AGREGACJA_GRANULARNOŚĆ_1_GODZINA
    • AGGREGATION_GRANULARITY_1_DAY (agregacja_ziarnistość_1_dzień)
    • GRANULARNOŚĆ_AGREGACJI_1_TYDZIEŃ
    • GRANULACJA_AGREGACJI_2_TYGODNIE
    • AGREGACJA_GRANULARNOŚĆ_3_TYGODNIE
    • AGREGACJA_GRANULARNOŚĆ_4_TYGODNIE
    • Ziarno Agregacji: 1 Miesiąc
    • AGGREGATION_GRANULARITY_1_YEAR
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 time_series=TimeSeriesConfig(
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
 slicing_exprs=["type='Red'"]
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",     # object_type is always "table" for data profiling
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

InferenceLog profil

Profil InferenceLog jest podobny do TimeSeries profilu, ale zawiera również metryki jakości modelu. InferenceLog profile używają następujących parametrów:

Parameter Opis
problem_type MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION lub MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION
prediction_column Kolumna zawierająca przewidywane wartości modelu.
timestamp_column Kolumna zawierająca znacznik czasu żądania wnioskowania.
model_id_column Kolumna zawierająca identyfikator modelu używanego do przewidywania.
granularities Określa sposób podziału danych na partycje w oknach czasowych. Zobacz TimeSeries profil , aby uzyskać dostępne wartości.
label_column (Opcjonalnie) Kolumna zawierająca podstawowe informacje dotyczące prognoz modelu.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 inference_log=InferenceLogConfig(
    problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
    prediction_column="preds",
    model_id_column="model_ver",
    label_column="label", # optional
    timestamp_column="ts",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

W przypadku InferenceLog profilów wycinki są tworzone automatycznie na podstawie unikatowych wartości .model_id_col

Snapshot profil

W przeciwieństwie do TimeSeries, Snapshot przedstawia, jak pełna zawartość tabeli zmienia się z czasem. Metryki są obliczane względem wszystkich danych w tabeli i odzwierciedlają stan tabeli przy każdym odświeżeniu profilu.

Uwaga / Notatka

Maksymalny rozmiar tabeli dla profilu migawki to 4 TB. W przypadku większych tabel należy zamiast tego użyć profilów szeregów czasowych.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
 snapshot=SnapshotConfig(),
 slicing_exprs=["type='Red'"]
)

Odświeżanie i wyświetlanie wyników

Aby wyświetlić historię odświeżania, należy użyć obszaru roboczego usługi Databricks, z którego włączono profilowanie danych.

Aby odświeżyć tabele metryk, użyj polecenia create_refresh. Przykład:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
  object_type=table_object_type, object_id=table_id, refresh=Refresh(
   object_type=table_object_type,
   object_id=table_id,
 )
)

Podczas wywoływania create_refresh z notesu tabele metryk są tworzone lub aktualizowane. To obliczenie jest wykonywane na bezserwerowej infrastrukturze obliczeniowej, a nie na klastrze, do którego dołączony jest notes. Możesz nadal uruchamiać polecenia w notesie podczas aktualizowania statystyk.

Aby uzyskać informacje o statystykach przechowywanych w tabelach metryk, zobacz Monitorowanie tabel metryk. Tabele metryk to tabele katalogu Unity. Zapytania można wykonywać w notesach lub w Eksploratorze zapytań SQL i wyświetlać je w Eksploratorze wykazu.

Aby wyświetlić historię wszystkich odświeżeń skojarzonych z profilem, użyj polecenia list_refreshes.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

Aby uzyskać stan określonej operacji, która jest w kolejce, działa lub została zakończona, użyj polecenia get_refresh.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
  run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
  time.sleep(30)

Wyświetlanie ustawień profilu

Ustawienia profilu można przejrzeć przy użyciu interfejsu API get_monitor.

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)

Harmonogram

Aby skonfigurować profil do uruchamiania zgodnie z harmonogramem, użyj parametru schedule :create_monitor

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 schedule=CronSchedule(
        quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
        timezone_id="PST",
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Aby uzyskać więcej informacji, zobacz wyrażenia cron .

Powiadomienia

Aby skonfigurować powiadomienia dla profilu, użyj parametru notificationscreate_monitor:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
 output_schema_id=schema.schema_id,
 snapshot=SnapshotConfig(),
 notification_settings=NotificationSettings(
        # Notify the given email when a monitoring refresh fails or times out.
        on_failure=NotificationDestination(
            email_addresses=["your_email@domain.com"]
        )
 )
)

info = w.data_quality.create_monitor(
   monitor=Monitor(
     object_type="table",
     object_id=table.table_id,
     data_profiling_config=config,
   ),
)

Na typ zdarzenia jest obsługiwanych maksymalnie 5 adresów e-mail (na przykład "on_failure").

Kontrolowanie dostępu do tabel metryk

Tabele metryk i pulpit nawigacyjny utworzony przez profil są własnością użytkownika, który utworzył profil. Uprawnienia Unity Catalog umożliwiają kontrolę dostępu do tabel z metrykami. Aby udostępnić pulpity nawigacyjne w obszarze roboczym, użyj przycisku Udostępnij w prawym górnym rogu pulpitu nawigacyjnego.

Usuwanie profilu

Aby usunąć profil:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)

To polecenie nie usuwa tabel profilów i pulpitu nawigacyjnego utworzonego przez profil. Musisz usunąć te zasoby w osobnym kroku lub zapisać je w innej lokalizacji.

Przykładowe notatniki

W poniższych przykładowych notesach pokazano, jak utworzyć profil, odświeżyć profil i zbadać utworzone tabele metryk.

Przykład notatnika: profil szeregów czasowych

W tym notesie pokazano, jak utworzyć TimeSeries profil typu.

Przykładowy notatnik dotyczący profilu TimeSeries

Pobierz laptopa

Przykład notatnika: profil predykcji (regresja)

W tym notesie pokazano, jak utworzyć profil typu InferenceLog dla problemu regresji.

Przykładowy notatnik regresji wnioskowania profilu

Pobierz laptopa

Przykład notesu: profil wnioskowania (klasyfikacja)

W tym notesie pokazano, jak utworzyć InferenceLog profil typu dla problemu klasyfikacji.

Przykładowy notes klasyfikacji profilu wnioskowania

Pobierz laptopa

Przykład notatnika: profil migawki

W tym notesie pokazano, jak utworzyć Snapshot profil typu.

Przykładowy notatnik profilu migawki

Pobierz laptopa