Rediger

Del via


Microsoft Agent Framework Workflows - Checkpoints

This page provides an overview of Checkpoints in the Microsoft Agent Framework Workflow system.

Overview

Checkpoints allow you to save the state of a workflow at specific points during its execution, and resume from those points later. This feature is particularly useful for the following scenarios:

  • Long-running workflows where you want to avoid losing progress in case of failures.
  • Long-running workflows where you want to pause and resume execution at a later time.
  • Workflows that require periodic state saving for auditing or compliance purposes.
  • Workflows that need to be migrated across different environments or instances.

When Are Checkpoints Created?

Remember that workflows are executed in supersteps, as documented in the core concepts. Checkpoints are created at the end of each superstep, after all executors in that superstep have completed their execution. A checkpoint captures the entire state of the workflow, including:

  • The current state of all executors
  • All pending messages in the workflow for the next superstep
  • Pending requests and responses
  • Shared states

Capturing Checkpoints

To enable checkpointing, a CheckpointManager needs to be provided when running the workflow. A checkpoint can then be accessed via a SuperStepCompletedEvent, or through the Checkpoints property on the run.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

To enable checkpointing, a CheckpointStorage needs to be provided when creating a workflow. A checkpoint can then be accessed via the storage. Agent Framework ships three built-in implementations — pick the one that matches your durability and deployment needs:

Provider Package Durability Best for
InMemoryCheckpointStorage agent-framework In-process only Tests, demos, short-lived workflows
FileCheckpointStorage agent-framework Local disk Single-machine workflows, local development
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Production, distributed, cross-process workflows

All three implement the same CheckpointStorage protocol, so you can swap providers without changing workflow or executor code.

InMemoryCheckpointStorage keeps checkpoints in process memory. Best for tests, demos, and short-lived workflows where you do not need durability across restarts.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Resuming from Checkpoints

You can resume a workflow from a specific checkpoint directly on the same run.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

You can resume a workflow from a specific checkpoint directly on the same workflow instance.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Rehydrating from Checkpoints

Or you can rehydrate a workflow from a checkpoint into a new run instance.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Or you can rehydrate a new workflow instance from a checkpoint.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Save Executor States

To ensure that the state of an executor is captured in a checkpoint, the executor must override the OnCheckpointingAsync method and save its state to the workflow context.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Also, to ensure the state is correctly restored when resuming from a checkpoint, the executor must override the OnCheckpointRestoredAsync method and load its state from the workflow context.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

To ensure that the state of an executor is captured in a checkpoint, the executor must override the on_checkpoint_save method and return its state as a dictionary.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Also, to ensure the state is correctly restored when resuming from a checkpoint, the executor must override the on_checkpoint_restore method and restore its state from the provided state dictionary.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Security Considerations

Important

Checkpoint storage is a trust boundary. Whether you use the built-in storage implementations or a custom one, the storage backend must be treated as trusted, private infrastructure. Never load checkpoints from untrusted or potentially tampered sources.

Ensure that the storage location used for checkpoints is secured appropriately. Only authorized services and users should have read or write access to checkpoint data.

Pickle serialization

Both FileCheckpointStorage and CosmosCheckpointStorage use Python's pickle module to serialize non-JSON-native state such as dataclasses, datetimes, and custom objects. To mitigate the risks of arbitrary code execution during deserialization, both providers use a restricted unpickler by default. Only a built-in set of safe Python types (primitives, datetime, uuid, Decimal, common collections, etc.) and all agent_framework internal types are permitted during deserialization. Any other type encountered in a checkpoint causes deserialization to fail with a WorkflowCheckpointException.

To allow additional application-specific types, pass them via the allowed_checkpoint_types parameter using "module:qualname" format:

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage accepts the same parameter:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

If your threat model does not permit pickle-based serialization at all, use InMemoryCheckpointStorage or implement a custom CheckpointStorage with an alternative serialization strategy.

Storage location responsibility

FileCheckpointStorage requires an explicit storage_path parameter — there is no default directory. While the framework validates against path traversal attacks, securing the storage directory itself (file permissions, encryption at rest, access controls) is the developer's responsibility. Only authorized processes should have read or write access to the checkpoint directory.

CosmosCheckpointStorage relies on Azure Cosmos DB for storage. Use managed identity / RBAC where possible, scope the database and container to the workflow service, and rotate account keys if you use key-based auth. As with file storage, only authorized principals should have read or write access to the Cosmos DB container that holds checkpoint documents.

Next Steps