Udostępnij za pośrednictwem


przepływy pracy platformy agentów Microsoft — punkty kontrolne

Ta strona zawiera omówienie Checkpoints w systemie przepływu pracy Microsoft Agent Framework.

Przegląd

Punkty kontrolne umożliwiają zapisywanie stanu procesu w określonych punktach podczas jego wykonywania i kontynuowanie od tych punktów później. Ta funkcja jest szczególnie przydatna w następujących scenariuszach:

  • Długotrwałe przepływy pracy, w których chcesz uniknąć utraty postępu w przypadku awarii.
  • Długotrwałe przepływy pracy, w których chcesz wstrzymać i wznowić wykonywanie w późniejszym czasie.
  • Przepływy pracy, które wymagają okresowego zapisywania stanu na potrzeby inspekcji lub zgodności.
  • Przepływy pracy, które należy migrować w różnych środowiskach lub instancjach.

Kiedy są tworzone punkty kontrolne?

Pamiętaj, że przepływy pracy są wykonywane w superkrokach, jak opisano w podstawowych pojęciach. Punkty kontrolne są tworzone na końcu każdego superkroku, po zakończeniu wykonywania wszystkich funkcji wykonawczych w tym superkroku. Punkt kontrolny przechwytuje cały stan przepływu pracy, w tym:

  • Bieżący stan wszystkich funkcji wykonawczych
  • Wszystkie oczekujące komunikaty w przepływie pracy dla następnego superkroku
  • Oczekujące żądania i odpowiedzi
  • Stany udostępnione

Przechwytywanie punktów kontrolnych

Aby włączyć tworzenie punktów kontrolnych, należy podać element CheckpointManager podczas uruchamiania przepływu pracy. Następnie można uzyskać dostęp do punktu kontrolnego za pośrednictwem SuperStepCompletedEvent, lub poprzez właściwość Checkpoints w ramach uruchomienia.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Aby włączyć tworzenie punktów kontrolnych, należy podać element CheckpointStorage podczas tworzenia przepływu pracy. Następnie można uzyskać dostęp do punktu kontrolnego za pośrednictwem magazynu. Struktura agenta dostarcza trzy wbudowane implementacje — wybierz jedną zgodną z potrzebami dotyczącymi trwałości i wdrażania:

Dostawca Pakiet Durability Najlepsze dla
InMemoryCheckpointStorage agent-framework Tylko w trakcie procesu Testy, pokazy, krótkotrwałe przepływy pracy
FileCheckpointStorage agent-framework Dysk lokalny Przepływy pracy z jedną maszyną, programowanie lokalne
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Przepływy pracy w środowisku produkcyjnym, rozproszonym i międzyprocesowym

Wszystkie trzy implementują ten sam CheckpointStorage protokół, dzięki czemu można zamienić dostawców bez zmiany przepływu pracy lub kodu wykonawczego.

InMemoryCheckpointStorage program przechowuje punkty kontrolne w pamięci procesu. Idealne do testów, demonstracji i krótkich procesów roboczych, w których nie potrzebujesz trwałości w przypadku ponownych uruchomień.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Wznawianie z punktów kontrolnych

Przepływ pracy można wznowić bezpośrednio z konkretnego punktu kontrolnego w tym samym przebiegu.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Możesz wznowić przepływ pracy z określonego punktu kontrolnego bezpośrednio w tym samym wystąpieniu.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Przywracanie z punktów kontrolnych

Możesz też przywrócić przepływ pracy z punktu kontrolnego do nowego wystąpienia uruchomienia.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Możesz też przywrócić nowe wystąpienie przepływu pracy z punktu kontrolnego.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Zapisz stany wykonawcze

Aby upewnić się, że stan funkcji wykonawczej jest przechwytywany w punkcie OnCheckpointingAsync kontrolnym, funkcja wykonawcza musi zastąpić metodę i zapisać jej stan w kontekście przepływu pracy.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Ponadto, aby zapewnić prawidłowe przywrócenie stanu podczas wznawiania z punktu kontrolnego, egzekutor musi zastąpić OnCheckpointRestoredAsync metodę i załadować jego stan z kontekstu przepływu pracy.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Aby upewnić się, że stan funkcji wykonawczej jest przechwytywany w punkcie kontrolnym, funkcja wykonawcza musi zastąpić on_checkpoint_save metodę i zwrócić jej stan jako słownik.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Ponadto, aby upewnić się, że stan jest poprawnie przywracany podczas wznawiania z punktu kontrolnego, funkcja wykonawcza musi zastąpić metodę on_checkpoint_restore i przywrócić jej stan z dostarczonego słownika stanu.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Zagadnienia związane z zabezpieczeniami

Ważna

Przechowywanie punktów kontrolnych jest granicą zaufania. Niezależnie od tego, czy używasz wbudowanych implementacji magazynu, czy niestandardowej, zaplecze magazynu musi być traktowane jako zaufana, prywatna infrastruktura. Nigdy nie ładuj punktów kontrolnych z niezaufanych lub potencjalnie naruszonych źródeł.

Upewnij się, że lokalizacja magazynu używana dla punktów kontrolnych jest odpowiednio zabezpieczona. Tylko autoryzowane usługi i użytkownicy powinni mieć dostęp do odczytu lub zapisu do danych punktu kontrolnego.

Serializacja Pickle

Zarówno FileCheckpointStorage, jak i CosmosCheckpointStorage używają modułu pickle Python do serializacji stanu natywnego innego niż JSON, takich jak klasy danych, daty/godziny i obiekty niestandardowe. Aby ograniczyć ryzyko dowolnego wykonania kodu podczas deserializacji, obydwaj dostawcy domyślnie używają ograniczonego unpicklera. Tylko wbudowany zestaw bezpiecznych typów Python (prymitywnych, datetime, uuid, Decimal, wspólnych kolekcji itp.) i wszystkie wewnętrzne typy agent_framework są dozwolone podczas deserializacji. Każdy inny typ napotkany w punkcie kontrolnym powoduje niepowodzenie próby deserializacji z kodem błędu WorkflowCheckpointException.

Aby zezwolić na dodatkowe typy specyficzne dla aplikacji, przekaż je za pośrednictwem parametru allowed_checkpoint_types przy użyciu "module:qualname" formatu:

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage akceptuje ten sam parametr:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

Jeśli model zagrożeń w ogóle nie zezwala na serializację opartą na module pickle, użyj InMemoryCheckpointStorage lub zaimplementuj niestandardowy CheckpointStorage z alternatywną strategią serializacji.

Odpowiedzialność za lokalizację magazynu

FileCheckpointStorage wymaga jawnego storage_path parametru — nie ma katalogu domyślnego. Chociaż struktura chroni przed atakami typu path traversal, zabezpieczenie samego katalogu magazynu (uprawnienia do plików, szyfrowanie w spoczynku, kontrolę dostępu) jest odpowiedzialnością dewelopera. Tylko autoryzowane procesy powinny mieć dostęp do odczytu lub zapisu do katalogu punktu kontrolnego.

CosmosCheckpointStorage opiera się na Azure Cosmos DB do przechowywania. Użyj tożsamości zarządzanej/kontroli dostępu opartej na rolach, jeśli to możliwe, określ zakres bazy danych i kontenera dla usługi przepływu pracy oraz rotuj klucze konta, jeśli używasz uwierzytelniania opartego na kluczach. Podobnie jak w przypadku magazynu plików, tylko autoryzowane podmioty powinny mieć dostęp do odczytu lub zapisu do kontenera Cosmos DB, który przechowuje dokumenty punktów kontrolnych.

Dalsze kroki