Skip to main content
Save workflow state. Resume from failures without losing progress.

Basic Usage

from splinter.coordination import CheckpointManager

manager = CheckpointManager()

# Create checkpoint
manager.create_checkpoint(
    workflow_id="wf-123",
    step=5,
    agent_id="researcher",
    status=AgentStatus.COMPLETED,
    state=current_state,
    metrics=current_metrics,
)

# Get latest checkpoint
checkpoint = manager.get_latest_checkpoint("wf-123")

File Storage

Persist checkpoints across restarts:
from splinter.coordination import FileCheckpointStorage

manager = CheckpointManager(
    storage=FileCheckpointStorage("./checkpoints"),
    max_checkpoints=10,  # Keep last 10
)

Resume from Failure

# Get resume point
resume_point = manager.get_resume_point("wf-123")

if resume_point:
    resume_step, state_snapshot, metrics = resume_point
    print(f"Resuming from step {resume_step}")

    # Restore state
    state = SharedState(initial_data=state_snapshot.data)

Resumable Workflow

from splinter.coordination import ResumableWorkflow

resumable = ResumableWorkflow(
    workflow_id="wf-123",
    checkpoint_manager=manager,
)

# Try to resume
resume_step = resumable.try_resume()

if resumable.is_resumed:
    print(f"Resumed from step {resume_step}")
else:
    print("Starting fresh")

# Create checkpoints during execution
resumable.checkpoint(
    step=current_step,
    agent_id=current_agent,
    status=AgentStatus.COMPLETED,
)

List Checkpoints

checkpoints = manager.list_checkpoints("wf-123")

for cp in checkpoints:
    print(f"Step {cp.step}: {cp.agent_id} - {cp.status}")

Delete Checkpoints

# Delete specific step
manager.delete_checkpoints("wf-123", step=5)

# Delete all for workflow
manager.delete_checkpoints("wf-123")

Checkpoint Contents

Each checkpoint contains:
  • workflow_id: Workflow identifier
  • step: Step number
  • agent_id: Agent that completed
  • status: Completion status
  • state_snapshot: Full state snapshot
  • metrics: Execution metrics
  • timestamp: When created