Skip to main content
The Workflow class provides advanced multi-agent orchestration with dependencies, checkpointing, and coordination.

Constructor

Workflow(
    workflow_id: str,
    limits: ExecutionLimits | None = None,
    loop_detection: LoopDetectionConfig | None = None,
    checkpoint_enabled: bool = False,
    checkpoint_storage: CheckpointStorage | None = None,
)

Parameters

ParameterTypeDefaultDescription
workflow_idstrRequiredUnique identifier for this workflow
limitsExecutionLimitsNoneBudget, step, and time limits
loop_detectionLoopDetectionConfigNoneLoop detection configuration
checkpoint_enabledboolFalseEnable automatic checkpointing
checkpoint_storageCheckpointStorageNoneWhere to store checkpoints

Example

from splinter.workflow import Workflow
from splinter.types import ExecutionLimits, LoopDetectionConfig

workflow = Workflow(
    workflow_id="research-pipeline",
    limits=ExecutionLimits(max_budget=20.0, max_steps=200),
    loop_detection=LoopDetectionConfig(max_repeated_outputs=3),
    checkpoint_enabled=True,
)

Methods

add_agent()

Add an agent to the workflow.
def add_agent(config: AgentConfig) -> None
from splinter.types import AgentConfig, LLMProvider

workflow.add_agent(AgentConfig(
    agent_id="researcher",
    provider=LLMProvider.OPENAI,
    model="gpt-4o",
    system_prompt="Research topics. Output JSON.",
    tools=["web_search"],
    state_ownership=["research.*"],
))

add_step()

Add a workflow step.
def add_step(
    agent_id: str,
    depends_on: list[str] | None = None,
) -> None
workflow.add_step("researcher")
workflow.add_step("writer", depends_on=["researcher"])
workflow.add_step("reviewer", depends_on=["writer"])

run()

Execute the workflow.
async def run(
    initial_state: dict[str, Any] | None = None,
) -> WorkflowResult
result = await workflow.run(initial_state={"topic": "AI trends"})

print(f"Success: {result.success}")
print(f"Outputs: {result.outputs}")
print(f"Cost: ${result.metrics['total_cost']:.4f}")

resume_from_checkpoint()

Resume workflow from a checkpoint.
@classmethod
async def resume_from_checkpoint(
    checkpoint: Checkpoint,
) -> Workflow
checkpoint = mgr.get_latest_checkpoint("wf-1")
workflow = await Workflow.resume_from_checkpoint(checkpoint)
result = await workflow.run()

WorkflowResult

@dataclass
class WorkflowResult:
    success: bool           # Did workflow complete successfully?
    outputs: dict[str, Any] # Output from each agent
    metrics: dict[str, Any] # Execution metrics
    final_state: dict       # Final shared state
    errors: list[str]       # Any errors that occurred

Full Example

from splinter.workflow import Workflow
from splinter.types import AgentConfig, ExecutionLimits, LLMProvider

# Create workflow
workflow = Workflow(
    workflow_id="research-pipeline",
    limits=ExecutionLimits(max_budget=20.0, max_steps=200),
    checkpoint_enabled=True,
)

# Add agents
workflow.add_agent(AgentConfig(
    agent_id="researcher",
    provider=LLMProvider.OPENAI,
    model="gpt-4o",
    system_prompt="Research topics. Output JSON.",
    state_ownership=["research.*"],
))

workflow.add_agent(AgentConfig(
    agent_id="writer",
    provider=LLMProvider.ANTHROPIC,
    model="claude-sonnet-4-20250514",
    system_prompt="Write articles. Output JSON.",
    state_ownership=["content.*"],
))

# Define execution order
workflow.add_step("researcher")
workflow.add_step("writer", depends_on=["researcher"])

# Run
result = await workflow.run(initial_state={"topic": "AI trends"})

print(f"Success: {result.success}")
print(f"Cost: ${result.metrics['total_cost']:.4f}")
print(f"Research: {result.outputs['researcher']}")
print(f"Article: {result.outputs['writer']}")