Skip to content

Workflow

The Workflow class orchestrates the execution of multiple agents, managing the flow of data and state between them.

Purpose

Use Workflow to: - Chain multiple agents together (e.g., Writer -> Editor). - Manage complex process flows (branching, looping). - Maintain a shared context across different steps.

Usage

Linear Chain

Connect agents in a sequence where the output of one becomes the input of the next.

from yosrai import Agent, Workflow, Context

writer = Agent(name="Writer", instructions="Write a story about {{ topic }}.")
editor = Agent(name="Editor", instructions="Fix grammar in the story.")

flow = (
    Workflow(name="StoryFlow")
    .start(writer)
    .then(editor)
)

ctx = Context(topic="Space Exploration")
result = flow.run(ctx)

Branching

Use choice to execute different steps based on a condition. You can nest Workflows for complex branches.

# Define sub-workflows
positive_flow = Workflow("Positive").start(agent_happy)
negative_flow = Workflow("Negative").start(agent_sad)

main_flow = Workflow("Main").choice(
    condition=lambda ctx: ctx.get("sentiment") == "positive",
    if_true=positive_flow,
    if_false=negative_flow
)

Looping

Use loop to repeat a step (or a sub-workflow) until a condition is met.

# Loop the researcher agent until it finds the answer
flow = Workflow("Research").loop(
    step=researcher_agent,
    condition=lambda ctx: not ctx.get("answer_found")
)

Parallel Execution

Execute multiple agents in parallel.

flow = Workflow("Parallel").parallel([agent_a, agent_b])

Async Support

YosrAI Workflows support native async execution. Use arun instead of run.

import asyncio

async def main():
    flow = Workflow("AsyncPipeline").start(agent1).then(agent2)
    result = await flow.arun(Context(input="Start"))
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Verbose Mode & Tracing

Enable transparent execution:

# Verbose output during execution
flow = Workflow("Pipeline", verbose=True).start(agent1).then(agent2)

# Capture execution for replay
result = flow.run(Context(input="Start"), trace=True)
result.replay()  # Review with timing

Blueprint System

Serialize complete workflows:

# Export workflow structure
blueprint = flow.to_blueprint()

# Reconstruct with agents
agents = {"agent1": agent1, "agent2": agent2}
reconstructed = Workflow.from_blueprint(blueprint, agents=agents)

Export Formats

Generate code and diagrams:

# Generate Python code
code = flow.to_code()

# Generate Mermaid diagram
diagram = flow.to_mermaid()

Optimized Linear Pipelines

For simpler, strictly linear workflows where you simply want to chain agents or functions together into a single reusable unit, consider using the Pipeline class.

Unlike Workflow, a Pipeline is an Agent subclass, meaning it can be used as a tool/skill within other Agents or Conductors.

from yosrai.engine.pipeline import Pipeline

pipeline = Pipeline(
    name="BlogGen",
    steps=[researcher, outline_agent, writer],
    model="openai/gpt-4o"
)

# Use it like an agent
result = pipeline.run("Topic: AI Trends")

API Reference

yosrai.engine.workflows.workflow.Workflow

Bases: WorkflowStep

The Orchestrator. Manages the flow of data between steps (Agents, Choices, Parallel, Loops). Fluent API: flow.start(agent).choice(...).then(...)

__init__(name, event_manager=None, callbacks=[], verbose=False)

start(agent, response_model=None)

then(agent, response_model=None)

choice(condition, if_true, if_false)

loop(step, condition)

Repeatedly execute a step while the condition is true.

parallel(agents)

run(context=None, start_step=0, trace=False, resume_path=None, **kwargs)

Run the workflow with the given initial inputs.

Parameters:

Name Type Description Default
context Optional[Union[Context, str]]

Optional Context object, or a string to use as 'input'.

None
start_step int

Step index to start execution from (default 0).

0
trace bool

If True, return ExecutionResult with replay() capability.

False
resume_path Optional[List[int]]

Optional list of step indices for nested resumption.

None
**kwargs Any

Initial inputs if context is not provided (or to update it).

{}
Note

After calling run(), access usage via workflow.last_usage:

result = workflow.run(input="Hello") print(f"Total cost: ${workflow.last_usage.cost:.4f}")

arun(context=None, start_step=0, trace=False, resume_path=None, **kwargs) async

Run the workflow asynchronously with the given initial inputs.

Parameters:

Name Type Description Default
context Optional[Union[Context, str]]

Optional Context object, or a string to use as 'input'.

None
start_step int

Step index to start execution from (default 0).

0
trace bool

If True, return ExecutionResult with replay() capability.

False
resume_path Optional[List[int]]

Optional list of step indices for nested resumption.

None
**kwargs Any

Initial inputs if context is not provided (or to update it).

{}

to_blueprint()

Serialize the Workflow to a JSON-compatible blueprint.

Returns:

Type Description
Dict[str, Any]

Dict containing the workflow's structure.

from_blueprint(blueprint, agents={}, conditions={}, validate=True, event_manager=None, **kwargs) classmethod

Reconstruct a Workflow from a blueprint.

Parameters:

Name Type Description Default
blueprint Dict[str, Any]

The blueprint dict (from to_blueprint()).

required
agents Dict[str, Agent]

Map of agent names to Agent instances.

{}
conditions Dict[str, Callable]

Map of condition names to callable functions.

{}
validate bool

Whether to validate the blueprint before reconstruction (default: True).

True
event_manager Optional[EventManager]

Optional EventManager to share across steps.

None
**kwargs Any

Additional workflow configuration.

{}

Returns:

Type Description
Workflow

A new Workflow instance.

Raises:

Type Description
BlueprintValidationError

If validation is enabled and the blueprint is invalid.

Note

For full reconstruction, you must provide the agents and conditions that were used in the original workflow.

to_mermaid()

Export the workflow as a Mermaid flowchart diagram. Includes tools used by each agent as annotations.

Returns:

Type Description
str

Mermaid diagram string ready for rendering.

Example output
graph TD
    A["Researcher<br/>(web_search, file_read)"] --> B["Analyst<br/>(data_processor)"]
    B --> C["Writer<br/>(formatter)"]

to_code()

Generate clean Python code that recreates this Workflow.

Returns:

Type Description
str

Python code string defining the complete workflow.

Note
  • Tools used by agents cannot be automatically serialized. You must manually define tool functions in the generated code.
  • Callable conditions (lambda/functions) are exported by name only. Define them manually or use string-based conditions.