State Snapshots
from splinter.coordination import SharedState
state = SharedState(initial_data={"topic": "AI"})
# Make changes
state.set("research.findings", ["finding1", "finding2"])
# Take snapshot
snapshot = state.snapshot()
# Make more changes
state.set("research.findings", ["bad data"])
# Rollback
state.restore(snapshot)
print(state.get("research.findings")) # ["finding1", "finding2"]
With Checkpoints
from splinter.coordination import CheckpointManager, FileCheckpointStorage
manager = CheckpointManager(
storage=FileCheckpointStorage("./checkpoints"),
)
# Automatic checkpoint after each agent
manager.create_checkpoint(
workflow_id="wf-123",
step=5,
agent_id="researcher",
status=AgentStatus.COMPLETED,
state=current_state,
metrics=current_metrics,
)
# On failure, rollback to last good checkpoint
checkpoint = manager.get_latest_checkpoint("wf-123")
state.restore(checkpoint.state_snapshot)
Rollback Strategies
Last Good Checkpoint
# Get the most recent successful checkpoint
checkpoint = manager.get_latest_checkpoint("wf-123")
if checkpoint.status == AgentStatus.COMPLETED:
state.restore(checkpoint.state_snapshot)
Specific Step
# Rollback to specific step
checkpoint = manager.get_checkpoint("wf-123", step=3)
state.restore(checkpoint.state_snapshot)
State History
# Use built-in state history
state = SharedState()
# Make changes
state.set("key", "value1")
state.set("key", "value2")
state.set("key", "value3")
# View history
history = state.get_history("key")
# Restore from history
state.restore(history[0].snapshot) # Back to value1
Automatic Rollback
from splinter.coordination import ResumableWorkflow
resumable = ResumableWorkflow(
workflow_id="wf-123",
checkpoint_manager=manager,
)
# On error, automatically rollback to last checkpoint
resume_step = resumable.try_resume()
if resumable.is_resumed:
print(f"Rolled back to step {resume_step}")