Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Przepływy pracy umożliwiają łączenie wielu kroków — każdy krok przetwarza dane i przekazuje je do następnego.
Definiowanie kroków przepływu pracy (funkcji wykonawczych):
using Microsoft.Agents.AI.Workflows;
// Step 1: Convert text to uppercase
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Step 2: Reverse the string and yield output
class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
return ValueTask.FromResult(string.Concat(message.Reverse()));
}
}
ReverseTextExecutor reverse = new();
Skompiluj i uruchom przepływ pracy:
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is ExecutorCompletedEvent executorComplete)
{
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
}
}
Tip
Zobacz tutaj , aby zapoznać się z pełną aplikacją przykładową z możliwością uruchamiania.
Zdefiniuj kroki przepływu pracy (funkcje wykonawcze) i połącz je z krawędziami:
# Step 1: A class-based executor that converts text to uppercase
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert input to uppercase and forward to the next node."""
await ctx.send_message(text.upper())
# Step 2: A function-based executor that reverses the string and yields output
@executor(id="reverse_text")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the string and yield the final workflow output."""
await ctx.yield_output(text[::-1])
def create_workflow():
"""Build the workflow: UpperCase → reverse_text."""
upper = UpperCase(id="upper_case")
return WorkflowBuilder(start_executor=upper).add_edge(upper, reverse_text).build()
Skompiluj i uruchom przepływ pracy:
workflow = create_workflow()
events = await workflow.run("hello world")
print(f"Output: {events.get_outputs()}")
print(f"Final state: {events.get_final_state()}")
Tip
Zapoznaj się z pełnym przykładem kompletnego pliku możliwego do uruchomienia.
Dalsze kroki
Głębiej:
- Omówienie przepływów pracy — omówienie architektury przepływu pracy
- Sekwencyjne przepływy pracy — liniowe wzorce krok po kroku
- Agenci w przepływach pracy — używanie agentów jako kroków przepływu pracy