Compartilhar via


Fluxos de trabalho do Microsoft Agent Framework – Pontos de verificação

Esta página fornece uma visão geral de Checkpoints no sistema de fluxo de trabalho do Microsoft Agent Framework.

Visão geral

Os pontos de verificação permitem que você salve o estado de um fluxo de trabalho em pontos específicos durante a execução e retome desses pontos mais tarde. Esse recurso é particularmente útil para os seguintes cenários:

  • Fluxos de trabalho de execução longa em que você deseja evitar perder o progresso em caso de falhas.
  • Fluxos de trabalho de execução longa em que você deseja pausar e retomar a execução posteriormente.
  • Fluxos de trabalho que exigem salvamento de estado periódico para finalidade de auditoria ou conformidade.
  • Fluxos de trabalho que precisam ser migrados entre diferentes ambientes ou instâncias.

Quando os pontos de verificação são criados?

Lembre-se de que os fluxos de trabalho são executados em superpassos, conforme documentado nos conceitos centrais. Os pontos de verificação são criados no final de cada superstep, depois que todos os executores naquele superstep tiverem concluído a execução. Um ponto de verificação captura todo o estado do fluxo de trabalho, incluindo:

  • O estado atual de todos os executores
  • Todas as mensagens pendentes no fluxo de trabalho para o próximo superstep
  • Solicitações e respostas pendentes
  • Estados compartilhados

Capturando pontos de verificação

Para habilitar checkpointing, é necessário fornecer um CheckpointManager ao executar o fluxo de trabalho. Um ponto de verificação pode ser acessado por meio de um SuperStepCompletedEvent ou através da propriedade Checkpoints durante a execução.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Para habilitar o ponto de verificação, é necessário fornecer um CheckpointStorage ao criar um fluxo de trabalho. Um ponto de verificação pode ser acessado por meio do armazenamento. O Agent Framework fornece três implementações internas: escolha aquela que corresponda às suas necessidades de durabilidade e implantação:

Fornecedor Package Durability Mais adequado para
InMemoryCheckpointStorage agent-framework Apenas em andamento Testes, demonstrações, fluxos de trabalho de curta duração
FileCheckpointStorage agent-framework Disco local Fluxos de trabalho de máquina única, desenvolvimento local
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Fluxos de trabalho de produção, distribuídos e entre processos

Todos os três implementam o mesmo CheckpointStorage protocolo, para que você possa trocar provedores sem alterar o fluxo de trabalho ou o código do executor.

InMemoryCheckpointStorage mantém pontos de verificação na memória do processo. Melhor para testes, demonstrações e fluxos de trabalho de curta duração em que você não precisa de durabilidade entre reinicializações.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Retomada de Pontos de Verificação

Você pode retomar um fluxo de trabalho de um ponto de verificação específico diretamente na mesma execução.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Você pode retomar um fluxo de trabalho a partir de um ponto de verificação específico diretamente na mesma instância.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Reidração de pontos de verificação

Ou você pode reidratar um fluxo de trabalho de um ponto de verificação para uma nova instância de execução.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Ou você pode reidratar uma nova instância de fluxo de trabalho de um ponto de verificação.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Salvar estados do executor

Para garantir que o estado de um executor seja capturado em um ponto de verificação, o executor deve substituir o OnCheckpointingAsync método e salvar seu estado no contexto do fluxo de trabalho.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Além disso, para garantir que o estado seja restaurado corretamente ao retomar de um ponto de verificação, o executor deve substituir o método OnCheckpointRestoredAsync e carregar seu estado do contexto do fluxo de trabalho.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Para garantir que o estado de um executor seja capturado em um ponto de verificação, o executor deve sobrescrever o método on_checkpoint_save e retornar seu estado como um dicionário.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Além disso, para garantir que o estado seja restaurado corretamente ao retomar de um ponto de verificação, o executor deve sobrescrever o método on_checkpoint_restore e restaurar seu estado a partir do dicionário de estado fornecido.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Considerações de segurança

Importante

O armazenamento de ponto de verificação é um limite de confiança. Se você usar as implementações de armazenamento internas ou uma personalizada, o back-end de armazenamento deve ser tratado como uma infraestrutura privada confiável. Nunca carregue pontos de verificação de fonte não confiável ou potencialmente adulterada.

Verifique se o local de armazenamento usado para pontos de verificação é protegido adequadamente. Somente os serviços autorizados e os usuários devem ter acesso de leitura ou gravação aos dados de ponto de verificação.

Serialização Pickle

Os FileCheckpointStorage e CosmosCheckpointStorage usam o módulo pickle do Python para serializar o estado não nativo JSON, como classes de dados, datetimes e objetos personalizados. Para atenuar os riscos de execução arbitrária de código durante a desserialização, ambos os provedores usam um unpickler restrito por padrão. Apenas um conjunto embutido de tipos de Python seguros (primitivos, datetime, uuid, Decimal, coleções comuns, etc.) e todos os tipos internos agent_framework são permitidos durante a desserialização. Qualquer outro tipo encontrado em um ponto de verificação faz com que a desserialização falhe com um erro WorkflowCheckpointException.

Para permitir tipos adicionais específicos do aplicativo, passe-os usando o parâmetro allowed_checkpoint_types no formato "module:qualname".

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage aceita o mesmo parâmetro:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

Se o modelo de ameaça não permitir serialização baseada em pickle, use InMemoryCheckpointStorage ou implemente CheckpointStorage personalizado com uma estratégia de serialização alternativa.

Responsabilidade do local de armazenamento

FileCheckpointStorage requer um parâmetro explícito storage_path – não há nenhum diretório padrão. Embora a estrutura seja validada em relação a ataques de passagem de caminho, proteger o próprio diretório de armazenamento (permissões de arquivo, criptografia em repouso, controles de acesso) é responsabilidade do desenvolvedor. Somente os processos autorizados devem ter acesso de leitura ou gravação ao diretório de ponto de verificação.

CosmosCheckpointStorage depende de Azure Cosmos DB para armazenamento. Use a identidade gerenciada/RBAC sempre que possível, restrinja o banco de dados e o contêiner ao serviço de execução do fluxo de trabalho, e rotacione as chaves da conta se você usar autenticação baseada em chave. Assim como acontece com o armazenamento de arquivos, somente os principais autorizados devem ter acesso de leitura ou gravação ao contêiner do Cosmos DB que contém documentos de ponto de verificação.

Próximas etapas