Long-Task Fault Tolerance and Checkpoint Recovery
Chapter 56: Long-Task Fault Tolerance and Checkpoint Recovery
Introduction
A batch task processing 1,000 files. A deep research task spanning several hours. A monitoring Agent running continuously for days. All these scenarios face the same challenge: longer runtime means higher failure probability. Network interruptions, context overflow, tool timeouts, server restarts—any one of these can destroy hours of work. This chapter systematically explains how to design fault-tolerance mechanisms for Hermes Agent: checkpoints, idempotent tool calls, and session recovery, enabling Agents to recover gracefully from failures rather than starting from scratch.
56.1 Common Long-Task Failure Modes
Before designing fault-tolerance, catalog the failure modes Agent long tasks can encounter:
flowchart TD
Root[Long Task Failure] --> T[Timeout]
Root --> C[Context]
Root --> Tool[Tool]
Root --> Infra[Infrastructure]
T --> T1[LLM API response timeout]
T --> T2[Tool execution timeout]
T --> T3[Overall task timeout]
C --> C1[Context window overflow]
C --> C2[Critical info truncated]
C --> C3[Attention dilution]
Tool --> Tool1[Tool call failure]
Tool --> Tool2[Tool returns wrong result]
Tool --> Tool3[External API unavailable]
Infra --> I1[Server restart/crash]
Infra --> I2[Network interruption]
Infra --> I3[OOM kill]
Infra --> I4[Redis connection failure]
Failure Classification and Responses
| Failure Type | Frequency | Impact | Response Strategy |
|---|---|---|---|
| LLM API timeout | Medium | Current step fails | Retry with exponential backoff |
| Context overflow | High (inevitable for long tasks) | Reasoning quality degrades | Context compression / sliding window |
| Tool execution failure | Medium | Single step fails | Retry / fallback tool / skip |
| Server crash | Low | Entire task lost | Checkpoint mechanism |
| Network interruption | Medium | Any step fails | Retry + idempotent design |
| OOM kill | Low | Process termination | Streaming + batch processing |
56.2 Checkpoint Mechanism Design
The core idea of checkpoints: periodically save task execution state so the task can continue from the last save point rather than restarting from scratch.
sequenceDiagram
participant Agent
participant CP as Checkpoint Store
participant Tools
Agent->>CP: Save initial state (step 0)
Agent->>Tools: Execute step 1
Tools-->>Agent: Result 1
Agent->>CP: Save state (step 1 done)
Agent->>Tools: Execute step 2
Note over Agent: Crash!
Agent->>CP: Load last checkpoint (step 1)
Agent->>Tools: Continue from step 2
Tools-->>Agent: Result 2
Agent->>CP: Save state (step 2 done)
Agent->>CP: Mark complete
Checkpoint Data Structures
# checkpoint.py
import json
import os
import time
import asyncio
from typing import Any, Optional
from dataclasses import dataclass, field, asdict
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class StepResult:
step_id: int
step_type: str
input_hash: str
output: Any
tool_name: Optional[str] = None
tool_args: Optional[dict] = None
duration_seconds: float = 0
completed_at: float = field(default_factory=time.time)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class AgentCheckpoint:
task_id: str
task_description: str
status: TaskStatus
current_step: int
total_steps_estimated: int
messages: list
messages_compressed: bool = False
completed_steps: list = field(default_factory=list)
tool_cache: dict = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
checkpoint_version: int = 0
last_error: Optional[str] = None
retry_count: int = 0
def update(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
self.updated_at = time.time()
self.checkpoint_version += 1
def add_step_result(self, result: StepResult) -> None:
self.completed_steps.append(result)
self.current_step = result.step_id + 1
self.updated_at = time.time()
self.checkpoint_version += 1
def get_progress(self) -> float:
if self.total_steps_estimated == 0:
return 0.0
return min(self.current_step / self.total_steps_estimated, 1.0)
def to_json(self) -> str:
data = asdict(self)
data['status'] = self.status.value
data['completed_steps'] = [s.to_dict() if hasattr(s, 'to_dict') else s
for s in self.completed_steps]
return json.dumps(data, ensure_ascii=False)
@classmethod
def from_json(cls, json_str: str) -> 'AgentCheckpoint':
data = json.loads(json_str)
data['status'] = TaskStatus(data['status'])
data['completed_steps'] = [StepResult(**s) for s in data['completed_steps']]
return cls(**data)
class LocalFileCheckpointStore:
"""File system checkpoint store (for development/testing)."""
def __init__(self, base_dir: str = "/tmp/hermes_checkpoints"):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
def _path(self, task_id: str) -> str:
return os.path.join(self.base_dir, f"{task_id}.json")
async def save(self, checkpoint: AgentCheckpoint) -> None:
path = self._path(checkpoint.task_id)
# Atomic write: write to temp file, then rename
tmp = path + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
f.write(checkpoint.to_json())
os.rename(tmp, path)
async def load(self, task_id: str) -> Optional[AgentCheckpoint]:
path = self._path(task_id)
if not os.path.exists(path):
return None
with open(path, 'r', encoding='utf-8') as f:
return AgentCheckpoint.from_json(f.read())
async def delete(self, task_id: str) -> None:
path = self._path(task_id)
if os.path.exists(path):
os.remove(path)
class RedisCheckpointStore:
"""Redis checkpoint store (recommended for production)."""
KEY_PREFIX = "hermes:checkpoint:"
DEFAULT_TTL = 7 * 24 * 3600 # 7 days
def __init__(self, redis_url: str = "redis://localhost:6379"):
import redis.asyncio as aioredis
self.redis = aioredis.from_url(redis_url, encoding='utf-8', decode_responses=True)
async def save(self, checkpoint: AgentCheckpoint, ttl: int = DEFAULT_TTL) -> None:
key = f"{self.KEY_PREFIX}{checkpoint.task_id}"
await self.redis.setex(key, ttl, checkpoint.to_json())
async def load(self, task_id: str) -> Optional[AgentCheckpoint]:
data = await self.redis.get(f"{self.KEY_PREFIX}{task_id}")
return AgentCheckpoint.from_json(data) if data else None
async def delete(self, task_id: str) -> None:
await self.redis.delete(f"{self.KEY_PREFIX}{task_id}")
56.3 Idempotent Tool Call Design
Idempotency: executing the same operation multiple times produces the same result as executing it once. This is critical for checkpoint recovery—when a task resumes, we must ensure re-executing certain steps doesn't cause side effects (duplicate emails, duplicate file writes).
# idempotent_tools.py
import hashlib
import json
import functools
from typing import Callable, Any, Optional
def compute_input_hash(*args, **kwargs) -> str:
"""Compute a unique hash for tool call inputs."""
content = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(content.encode()).hexdigest()[:16]
class IdempotentToolCache:
def __init__(self, checkpoint: 'AgentCheckpoint'):
self.checkpoint = checkpoint
def get_cached_result(self, input_hash: str) -> Optional[Any]:
return self.checkpoint.tool_cache.get(input_hash)
def cache_result(self, input_hash: str, result: Any) -> None:
self.checkpoint.tool_cache[input_hash] = result
def idempotent_tool(tool_name: str):
"""
Decorator: makes tool calls idempotent.
How it works:
1. Compute hash of tool call inputs
2. Check if cached result exists
3. If yes, return cached result (skip actual execution)
4. If no, execute tool and cache result
"""
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(self, *args, tool_cache: IdempotentToolCache = None, **kwargs):
input_hash = compute_input_hash(tool_name, *args, **kwargs)
if tool_cache:
cached = tool_cache.get_cached_result(input_hash)
if cached is not None:
print(f" [Idempotent] Using cached result for {tool_name}")
return cached
result = await func(self, *args, **kwargs)
if tool_cache:
tool_cache.cache_result(input_hash, result)
return result
return wrapper
return decorator
# Which tools are naturally idempotent?
NATURALLY_IDEMPOTENT = {
"web_search", "read_file", "calculate", "list_directory"
}
# Which tools need deduplication?
REQUIRES_DEDUP = {
"write_file", "send_email", "database_insert", "api_post", "create_ticket"
}
56.4 A Resumable File Processing Agent
# resumable_file_agent.py
import asyncio
import os
import time
import signal
import uuid
from pathlib import Path
from typing import Optional
from openai import AsyncOpenAI
class ResumableFileProcessingAgent:
"""
File processing Agent with interrupt-and-resume support.
Processes all text files in a directory, extracting key information.
Supports Ctrl+C graceful interruption and checkpoint recovery.
"""
CHECKPOINT_INTERVAL = 1
def __init__(self, model: str, base_url: str, checkpoint_store=None, task_id: str = None):
self.client = AsyncOpenAI(base_url=base_url, api_key="not-needed")
self.model = model
self.store = checkpoint_store or LocalFileCheckpointStore()
self.task_id = task_id or uuid.uuid4().hex[:12]
self.checkpoint = None
self._interrupted = False
signal.signal(signal.SIGINT, self._handle_interrupt)
signal.signal(signal.SIGTERM, self._handle_interrupt)
def _handle_interrupt(self, signum, frame):
print(f"\n[Agent] Interrupt received. Will stop after current file completes...")
self._interrupted = True
async def _process_file(self, file_path: str, task: str) -> str:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
file_hash = compute_input_hash("process_file", file_path, content[:500])
if self.checkpoint:
cache = IdempotentToolCache(self.checkpoint)
cached = cache.get_cached_result(file_hash)
if cached:
print(f" [Cache hit] {os.path.basename(file_path)}")
return cached
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Extract key information from the file and provide a concise structured summary."},
{"role": "user", "content": f"File: {file_path}\nTask: {task}\n\nContent (first 3000 chars):\n{content[:3000]}"}
],
temperature=0.1,
max_tokens=512
)
result = response.choices[0].message.content or "Processing complete"
if self.checkpoint:
IdempotentToolCache(self.checkpoint).cache_result(file_hash, result)
return result
async def run(self, directory: str, task_description: str,
file_pattern: str = "*.txt", resume: bool = True) -> dict:
print(f"\n{'='*60}")
print(f"Resumable File Processing Agent | Task: {self.task_id}")
print(f"Directory: {directory}")
print(f"{'='*60}")
all_files = sorted(str(f) for f in Path(directory).glob(f"**/{file_pattern}"))
if not all_files:
return {"success": False, "error": f"No {file_pattern} files found in {directory}"}
print(f"Found {len(all_files)} files")
if resume:
self.checkpoint = await self.store.load(self.task_id)
if self.checkpoint and self.checkpoint.status == TaskStatus.COMPLETED:
return {"success": True, "answer": "Task already complete, loaded from checkpoint"}
if self.checkpoint:
print(f"[Agent] Resuming from step {self.checkpoint.current_step}/{len(all_files)}")
print(f"[Agent] Progress: {self.checkpoint.get_progress():.1%}")
else:
self.checkpoint = AgentCheckpoint(
task_id=self.task_id,
task_description=task_description,
status=TaskStatus.RUNNING,
current_step=0,
total_steps_estimated=len(all_files),
messages=[]
)
await self.store.save(self.checkpoint)
print("[Agent] Created new checkpoint")
file_results = {}
for step in self.checkpoint.completed_steps:
if step.tool_name == "process_file":
file_results[step.tool_args.get("file_path", "")] = step.output
start_from = self.checkpoint.current_step
for i, file_path in enumerate(all_files[start_from:], start=start_from):
if self._interrupted:
print(f"\n[Agent] Graceful stop after {i}/{len(all_files)} files")
self.checkpoint.update(status=TaskStatus.PAUSED)
await self.store.save(self.checkpoint)
return {
"success": False,
"interrupted": True,
"progress": i / len(all_files),
"task_id": self.task_id
}
print(f"\n[{i+1}/{len(all_files)}] Processing: {os.path.basename(file_path)}")
step_start = time.time()
try:
result = await self._process_file(file_path, task_description)
file_results[file_path] = result
print(f" Done ({time.time() - step_start:.1f}s): {result[:80]}...")
self.checkpoint.add_step_result(StepResult(
step_id=i,
step_type="tool_call",
input_hash=compute_input_hash("process_file", file_path),
output=result,
tool_name="process_file",
tool_args={"file_path": file_path},
duration_seconds=time.time() - step_start
))
if (i + 1) % self.CHECKPOINT_INTERVAL == 0:
await self.store.save(self.checkpoint)
print(f" [Checkpoint saved at step {i+1}]")
except Exception as e:
print(f" Failed: {e}")
self.checkpoint.last_error = str(e)
self.checkpoint.retry_count += 1
await self.store.save(self.checkpoint)
continue
# Generate final report
print(f"\n[Agent] All files processed. Generating report...")
results_str = "\n\n".join([f"### {path}\n{result}" for path, result in file_results.items()])
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Synthesize multiple file analysis results into a structured final report."},
{"role": "user", "content": f"Task: {task_description}\n\nFile results:\n{results_str}"}
],
temperature=0.3,
max_tokens=2048
)
final_report = response.choices[0].message.content
self.checkpoint.update(status=TaskStatus.COMPLETED)
await self.store.save(self.checkpoint)
report_path = f"/tmp/hermes_report_{self.task_id}.md"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(f"# Task Report\n\nTask ID: {self.task_id}\n"
f"Files processed: {len(file_results)}\n\n{final_report}")
print(f"[Agent] Report saved to: {report_path}")
return {
"success": True,
"answer": final_report,
"files_processed": len(file_results),
"files_total": len(all_files),
"report_path": report_path,
"task_id": self.task_id
}
async def main():
import tempfile
test_dir = tempfile.mkdtemp()
for i in range(5):
with open(f"{test_dir}/file_{i}.txt", 'w') as f:
f.write(f"This is test file {i+1}. " * 50)
agent = ResumableFileProcessingAgent(
model="NousResearch/Hermes-3-Llama-3.1-8B",
base_url="http://localhost:8000/v1",
task_id="demo_task_001"
)
result = await agent.run(
directory=test_dir,
task_description="Analyze file contents, identify key themes and important information",
resume=True # Auto-resume from last checkpoint
)
if result.get("interrupted"):
print(f"\nTask interrupted. Progress: {result['progress']:.1%}")
print(f"To resume, use task_id: {result['task_id']}")
else:
print(f"\nTask complete! Processed {result['files_processed']} files")
print(f"Report: {result.get('report_path')}")
if __name__ == "__main__":
asyncio.run(main())
56.5 Context Compression for Long Tasks
class ContextCompressor:
"""Automatically compress message history when it exceeds the context window."""
def __init__(self, client, model: str, max_tokens: int = 6000):
self.client = client
self.model = model
self.max_tokens = max_tokens
def estimate_tokens(self, messages: list) -> int:
return sum(len(str(m.get('content', ''))) // 4 for m in messages)
async def compress_if_needed(self, messages: list, keep_recent: int = 6) -> list:
if self.estimate_tokens(messages) <= self.max_tokens:
return messages
system_msgs = [m for m in messages if m['role'] == 'system']
recent_msgs = messages[-keep_recent:]
middle_msgs = messages[len(system_msgs):-keep_recent]
if not middle_msgs:
return messages
summary_content = "\n".join([
f"{m['role']}: {str(m.get('content', ''))[:300]}" for m in middle_msgs
])
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "Summarize the key information, known facts, and conclusions from the following conversation in 3-5 sentences:"},
{"role": "user", "content": summary_content}
],
temperature=0.1, max_tokens=256
)
summary = response.choices[0].message.content
compressed = system_msgs + [{"role": "user", "content": f"[Conversation summary] {summary}"}] + recent_msgs
print(f"[ContextCompressor] Compressed {len(middle_msgs)} messages into 1 summary")
return compressed
56.6 Fault Tolerance Architecture Summary
| Mechanism | Problem Solved | Implementation |
|---|---|---|
| Checkpoint saving | Server crash / restart | checkpoint.py |
| Idempotent tool cache | Duplicate side-effect operations | idempotent_tools.py |
| Exponential backoff retry | Transient failures | Agent execution loop |
| Context compression | Context window overflow | ContextCompressor |
| Graceful interrupt | Active pause needs | signal handler |
| Batch processing | Large dataset OOM | File-by-file processing |
Summary
This chapter systematically covered the complete fault-tolerance strategy for Hermes Agent long tasks:
- Failure mode classification: Timeouts, context overflow, tool failures, and infrastructure failures—each with dedicated countermeasures.
- Checkpoint mechanism: Periodic task state serialization with both filesystem and Redis backends; atomic writes guarantee data integrity.
- Idempotent tool design: Input-hash-based caching prevents re-executing side-effect operations (emails, file writes) when recovering.
- Resumable Agent: Complete implementation with signal handling (SIGINT/SIGTERM), checkpoint saving, and resume-from-interruption for file processing.
- Context compression: Automatic summary compression when message history exceeds the context window, preserving key information.
Review Questions
- How do you balance checkpoint save frequency? Too frequent adds I/O overhead; too sparse means losing more progress on failure. How would you dynamically adjust based on task characteristics?
- Idempotent tool caches are stored in checkpoints. If a file is modified between runs, cached old results may cause errors. How would you design content-aware cache invalidation?
- For Agents running for days, how should Redis checkpoint TTL be configured? Can cleanup be automated after task completion?
- If Hermes Agent is deployed in Kubernetes and a Pod receives SIGKILL (forced eviction), is there time to save a checkpoint? How would you design a more robust solution?