Freigeben über


Verwenden von „foreachBatch” zum Schreiben in beliebige Datensenken

In diesem Artikel wird die Verwendung von foreachBatch mit strukturiertem Streaming erläutert, um die Ausgabe einer Streamingabfrage in Datenquellen zu schreiben, die über keine vorhandene Streamingsenke verfügen.

Mit dem Codemuster streamingDF.writeStream.foreachBatch(...) können Sie Batchfunktionen auf die Ausgabedaten der einzelnen Mikrobatches der Streamingabfrage anwenden. Funktionen, die mit foreachBatch verwendet werden, akzeptieren zwei Parameter:

  • Ein DataFrame, der die Ausgabedaten eines Mikrobatches enthält.
  • Die eindeutige ID des Mikrobatches.

Sie müssen für Delta Lake-Zusammenführungsvorgänge bei strukturiertem Streaming foreachBatch verwenden. Weitere Informationen finden Sie unter Ausführen eines Upserts aus Streamingabfragen mithilfe von foreachBatch.

Anwenden zusätzlicher DataFrame-Vorgänge

Viele DataFrame- und Dataset-Vorgänge werden in Streaming-DataFrames nicht unterstützt, da Spark die Erstellung inkrementeller Pläne in diesen Fällen nicht unterstützt. Mit foreachBatch() können Sie einige dieser Vorgänge auf jede Mikrobatchausgabe anwenden. Sie können zum Beispiel foreachBatch() und den SQL-Vorgang MERGE INTO verwenden, um die Ausgabe von Streamingaggregationen in eine Deltatabelle im Aktualisierungsmodus zu schreiben. Weitere Details finden Sie in MERGE INTO.

Wichtig

  • foreachBatch() bietet nur At-Least-Once-Schreibgarantien. Sie können jedoch das der Funktion zur Verfügung gestellte batchId verwenden, um die Ausgabe zu deduplizieren und eine Exactly-Once-Garantie zu erhalten. In beiden Fällen müssen Sie selbst über die End-to-End-Semantik entscheiden.
  • foreachBatch() funktioniert nicht mit dem kontinuierlichen Verarbeitungsmodus, da er im Wesentlichen auf der Mikrobatchausführung einer Streamingabfrage beruht. Wenn Sie Daten im kontinuierlichen Modus schreiben, verwenden Sie stattdessen foreach().
  • Bei der Verwendung von foreachBatch mit einem zustandsbehafteten Operator ist es wichtig, den Batch vollständig zu nutzen, bevor die Verarbeitung abgeschlossen ist. Weitere Informationen finden Sie unter Vollständige Nutzung aller Datenrahmen-Batches.

Ein leerer Datenrahmen kann mit foreachBatch() aufgerufen werden, und der Benutzercode muss resilient sein, um einen ordnungsgemäßen Betrieb zu ermöglichen. Das folgende Beispiel soll dies erläutern:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Verhaltensänderungen bei foreachBatch in Databricks Runtime 14.0

In Databricks Runtime 14.0 und höher bei der Berechnung, die mit dem Standardzugriffsmodus konfiguriert ist, gelten die folgenden Verhaltensänderungen:

  • print() Befehle schreiben Ausgaben in die Treiberprotokolle.
  • Sie können nicht auf das Untermodul dbutils.widgets innerhalb der Funktion zugreifen.
  • Alle Dateien, Module oder Objekte, auf die in der Funktion verwiesen wird, müssen serialisierbar und in Spark verfügbar sein.

Wiederverwenden vorhandener Batchdatenquellen

Mit foreachBatch() können Sie vorhandene Batchdaten-Writer für Datensenken verwenden, die möglicherweise keine Unterstützung für strukturiertes Streaming bieten. Hier sind einige Beispiele:

Viele andere Batchdatenquellen können von foreachBatch() aus verwendet werden. Siehe Herstellen einer Verbindung mit Datenquellen und externen Diensten.

Schreiben an mehrere Speicherorte

Wenn Sie die Ausgabe einer Streamingabfrage an mehrere Speicherorte schreiben müssen, empfiehlt Databricks die Verwendung mehrerer Writer für strukturiertes Streaming, um eine optimale Parallelisierung und einen optimalen Durchsatz zu erzielen.

Die Verwendung foreachBatch zum Schreiben in mehrere Senken serialisiert die Ausführung von Streamingschreibvorgängen, was die Wartezeit für jeden Mikrobatch erhöhen kann.

Wenn Sie zum Schreiben in mehrere Delta-Tabellen foreachBatch verwenden, lesen Sie die Informationen unter Idempotente Schreibvorgänge in Tabellen in foreachBatch.

Verwenden Sie jeden DataFrame-Batch vollständig.

Wenn Sie zustandsbehaftete Operatoren (z. B. die Verwendung dropDuplicatesWithinWatermark) verwenden, muss jede Batch-Iteration den gesamten DataFrame verwenden oder die Abfrage neu starten. Wenn Sie den gesamten DataFrame nicht nutzen, schlägt die Streamingabfrage mit dem nächsten Batch fehl.

Dies kann in mehreren Fällen geschehen. Die folgenden Beispiele zeigen, wie Abfragen behoben werden, die ein DataFrame nicht ordnungsgemäß nutzen.

Eine Teilmenge des Batches gezielt verwenden

Wenn Sie sich nur um eine Teilmenge des Batches kümmern, könnten Sie Code wie den folgenden haben.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

In diesem Fall verarbeitet batch_df.show(2) nur die ersten beiden Elemente im Batch, was auch erwartet wird. Wenn jedoch mehr Elemente vorhanden sind, müssen sie genutzt werden. Der folgende Code verwendet den vollständigen DataFrame.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Hier ignoriert die do_nothing Funktion im Hintergrund den Rest des DataFrames.

Behandeln eines Fehlers in einem Batch

Bei der Fehlerbehandlung in foreachBatch, empfiehlt Databricks, die Streamingabfrage schnell scheitern zu lassen und stattdessen die Orchestrierungsebene wie Lakeflow Jobs oder Apache Airflow zu verwenden, um die Wiederholungslogik zu verwalten. Dies ist viel sicherer als das Erstellen komplexer Wiederholungsschleifen in Ihrem Code, bei denen Datenverlust auftreten kann.

Hier sind Richtlinien, die auf Ihrem Schreibziel basieren:

Ziel Beispiele Leitlinien
DataFrame-Vorgänge Delta Lake-Tabellen Sie müssen die txnAppId- und txnVersion-Schreiboptionen verwenden und txnVersion an batchId binden, um die Idempotenz zu gewährleisten und die Datenkorrektheit bei Wiederholungen zu schützen. Ausnahmen nicht lokal abfangen und wiederholen. Stattdessen empfiehlt Databricks, Fehler propagieren zu lassen, damit die Spark-Metriken korrekt bleiben, die Daten nicht dupliziert werden und der Orchestrator den vollständigen Batch sauber wiederholen kann.
Benutzerdefinierter Code und externe Ziele .collect(), OLTP-Datenbanken, Nachrichtenwarteschlangen, APIs Implementieren Sie Ihre eigene Idempotenz. Sie müssen davon ausgehen, dass jeder Vorgang batchübergreifend wiederholt werden kann und erneut ausgeführt wird. Wenn batchId gleich bleibt, muss das Ergebnis gleich bleiben. Sie können rein vorübergehende Fehler wie kurze Verbindungstimeouts wiederholen, aber äußerst vorsichtig sein, um teil- oder duplizierte Schreibvorgänge zu vermeiden, wenn der Wiederholungsvorgang letztendlich fehlschlägt. Der sicherste Ansatz besteht darin, Fehler weiterzugeben und dem Orchestrator zu ermöglichen, den gesamten Stapel erneut auszuführen.

Hier sind einige Beispiele für Ausnahmetypen und Empfehlungen für die Behandlung in foreachBatch:

Ausnahmetyp Beispiele Empfohlene Maßnahme
Vorübergehende Sinkfehler SQLTransientConnectionException, HTTP 429, Timeouts Catch: Wiederholen oder an eine Dead-Letter-Warteschlange senden
Duplikat- oder Schlüsseleinschränkungsverletzungen, wenn die Spüle idempotent ist SQLIntegrityConstraintViolationException Catch: Protokollieren und Unterdrücken
Benutzerdefinierte wiederholbare Fehler Eingebettete Socket-Ausnahmen, wiederholbare Datenbankfehler Catch: Metriken inkrementieren und eine kontrollierte Fortsetzung ermöglichen
Logik- oder Schemafehler NullPointerException, AttributeError, Schema-Nichtübereinstimmung Weitergabe: Spark die Abfrage fehlschlagen lassen
Nicht-wiederholbare Sink-Fehler oder unentdeckte Logikfehler ValueError, PermissionError Weitergabe: Spark schlägt die Abfrage fehl
Kritische Fehler OutOfMemoryError, beschädigter Zustand, Datenintegritätsverletzungen Weitergabe: Spark lässt die Abfrage fehlschlagen

Codebeispiele: Ausnahmebehandlung

In den folgenden Beispielen wird absichtlich ein Fehler foreach ausgelöst, um verschiedene Ansätze zur Fehlerbehandlung zu zeigen.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Der obige Code behandelt und unterdrückt den Fehler im Hintergrund und verwendet möglicherweise nicht den Rest des Batches. Es gibt zwei Optionen für die Behandlung dieser Situation.

Zuerst können Sie den Fehler erneut auslösen, der ihn an die Orchestrierungsebene übergibt, um den Batch erneut zu versuchen. Dies kann den Fehler lösen, wenn es sich um ein vorübergehendes Problem handelt, oder es für Ihr Betriebsteam auslösen, um manuell zu beheben. Ändern Sie dazu den Code partial_func, damit er wie folgt aussieht:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Wenn Sie die Ausnahme abfangen und den Rest des Batches ignorieren möchten, können Sie den Code so ändern, dass die do_nothing Funktion verwendet wird, um den Rest des Batches im Hintergrund zu ignorieren.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()