Documentation Index
Fetch the complete documentation index at: https://splinter.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The Workflow class provides advanced multi-agent orchestration with dependencies, checkpointing, and coordination.
Constructor
Workflow(
workflow_id: str,
limits: ExecutionLimits | None = None,
loop_detection: LoopDetectionConfig | None = None,
checkpoint_enabled: bool = False,
checkpoint_storage: CheckpointStorage | None = None,
)
Parameters
| Parameter | Type | Default | Description |
|---|
workflow_id | str | Required | Unique identifier for this workflow |
limits | ExecutionLimits | None | Budget, step, and time limits |
loop_detection | LoopDetectionConfig | None | Loop detection configuration |
checkpoint_enabled | bool | False | Enable automatic checkpointing |
checkpoint_storage | CheckpointStorage | None | Where to store checkpoints |
Example
from splinter.workflow import Workflow
from splinter.types import ExecutionLimits, LoopDetectionConfig
workflow = Workflow(
workflow_id="research-pipeline",
limits=ExecutionLimits(max_budget=20.0, max_steps=200),
loop_detection=LoopDetectionConfig(max_repeated_outputs=3),
checkpoint_enabled=True,
)
Methods
add_agent()
Add an agent to the workflow.
def add_agent(config: AgentConfig) -> None
from splinter.types import AgentConfig, LLMProvider
workflow.add_agent(AgentConfig(
agent_id="researcher",
provider=LLMProvider.OPENAI,
model="gpt-4o",
system_prompt="Research topics. Output JSON.",
tools=["web_search"],
state_ownership=["research.*"],
))
add_step()
Add a workflow step.
def add_step(
agent_id: str,
depends_on: list[str] | None = None,
) -> None
workflow.add_step("researcher")
workflow.add_step("writer", depends_on=["researcher"])
workflow.add_step("reviewer", depends_on=["writer"])
run()
Execute the workflow.
async def run(
initial_state: dict[str, Any] | None = None,
) -> WorkflowResult
result = await workflow.run(initial_state={"topic": "AI trends"})
print(f"Success: {result.success}")
print(f"Outputs: {result.outputs}")
print(f"Cost: ${result.metrics['total_cost']:.4f}")
resume_from_checkpoint()
Resume workflow from a checkpoint.
@classmethod
async def resume_from_checkpoint(
checkpoint: Checkpoint,
) -> Workflow
checkpoint = mgr.get_latest_checkpoint("wf-1")
workflow = await Workflow.resume_from_checkpoint(checkpoint)
result = await workflow.run()
WorkflowResult
@dataclass
class WorkflowResult:
success: bool # Did workflow complete successfully?
outputs: dict[str, Any] # Output from each agent
metrics: dict[str, Any] # Execution metrics
final_state: dict # Final shared state
errors: list[str] # Any errors that occurred
Full Example
from splinter.workflow import Workflow
from splinter.types import AgentConfig, ExecutionLimits, LLMProvider
# Create workflow
workflow = Workflow(
workflow_id="research-pipeline",
limits=ExecutionLimits(max_budget=20.0, max_steps=200),
checkpoint_enabled=True,
)
# Add agents
workflow.add_agent(AgentConfig(
agent_id="researcher",
provider=LLMProvider.OPENAI,
model="gpt-4o",
system_prompt="Research topics. Output JSON.",
state_ownership=["research.*"],
))
workflow.add_agent(AgentConfig(
agent_id="writer",
provider=LLMProvider.ANTHROPIC,
model="claude-sonnet-4-20250514",
system_prompt="Write articles. Output JSON.",
state_ownership=["content.*"],
))
# Define execution order
workflow.add_step("researcher")
workflow.add_step("writer", depends_on=["researcher"])
# Run
result = await workflow.run(initial_state={"topic": "AI trends"})
print(f"Success: {result.success}")
print(f"Cost: ${result.metrics['total_cost']:.4f}")
print(f"Research: {result.outputs['researcher']}")
print(f"Article: {result.outputs['writer']}")