第 56 章
长任务容错与断点续跑
第56章:长任务容错与断点续跑
导语
一个需要处理 1000 个文件的批量任务、一个跨越数小时的深度研究任务、一个需要持续运行数天的监控 Agent——这些场景都面临同样的挑战:长时间运行意味着更高的失败概率。网络中断、上下文溢出、工具超时、服务器重启……任何一个都可能让数小时的工作付之东流。本章系统讲解如何为 Hermes Agent 设计容错机制:检查点(Checkpoint)、幂等工具调用、会话恢复,让 Agent 从失败中优雅恢复,而非从零开始。
56.1 长任务的常见失败模式
在设计容错机制之前,先梳理 Agent 长任务可能遇到的失败模式:
flowchart TD
Root[长任务失败] --> T[超时类]
Root --> C[上下文类]
Root --> Tool[工具类]
Root --> Infra[基础设施类]
T --> T1[LLM API 响应超时]
T --> T2[工具执行超时]
T --> T3[任务整体超时]
C --> C1[上下文窗口溢出]
C --> C2[关键信息被截断]
C --> C3[注意力分散]
Tool --> Tool1[工具调用失败]
Tool --> Tool2[工具返回错误结果]
Tool --> Tool3[外部 API 不可用]
Infra --> I1[服务器重启/崩溃]
Infra --> I2[网络中断]
Infra --> I3[内存溢出 OOM]
Infra --> I4[Redis 连接失败]
失败模式分类与应对
| 失败类型 | 发生频率 | 影响 | 应对策略 |
|---|---|---|---|
| LLM API 超时 | 中 | 当前步骤失败 | 重试(指数退避) |
| 上下文溢出 | 高(长任务必然) | 推理质量下降 | 上下文压缩/滑动窗口 |
| 工具执行失败 | 中 | 单步失败 | 重试/备选工具/跳过 |
| 服务器崩溃 | 低 | 整个任务丢失 | 检查点机制 |
| 网络中断 | 中 | 任意步骤失败 | 重试 + 幂等设计 |
| OOM 内存溢出 | 低 | 进程终止 | 流式处理 + 分批 |
56.2 检查点(Checkpoint)机制设计
检查点的核心思想:定期保存任务执行状态,使任务可以从上次保存点继续,而非从头开始。
sequenceDiagram
participant Agent
participant Checkpoint as Checkpoint 存储
participant Tools
Agent->>Checkpoint: 保存初始状态(步骤0)
Agent->>Tools: 执行步骤1
Tools-->>Agent: 结果1
Agent->>Checkpoint: 保存状态(步骤1完成)
Agent->>Tools: 执行步骤2
Note over Agent: ❌ 崩溃!
Agent->>Checkpoint: 加载最近检查点(步骤1)
Agent->>Tools: 从步骤2继续
Tools-->>Agent: 结果2
Agent->>Checkpoint: 保存状态(步骤2完成)
Agent->>Checkpoint: 标记完成
检查点数据结构
# checkpoint.py
"""
Agent 检查点系统
支持本地文件、Redis 两种存储后端
"""
import json
import os
import time
import hashlib
import asyncio
from typing import Any, Optional
from dataclasses import dataclass, field, asdict
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused" # 主动暂停(可以恢复)
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class StepResult:
"""单个执行步骤的结果"""
step_id: int
step_type: str # 'llm_inference', 'tool_call', 'summary'
input_hash: str # 输入内容的哈希(用于幂等验证)
output: Any
tool_name: Optional[str] = None
tool_args: Optional[dict] = None
duration_seconds: float = 0
completed_at: float = field(default_factory=time.time)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class AgentCheckpoint:
"""
Agent 任务检查点
包含恢复任务所需的所有状态
"""
task_id: str
task_description: str
# 执行状态
status: TaskStatus
current_step: int
total_steps_estimated: int
# 消息历史(可能被压缩)
messages: list
messages_compressed: bool = False # 是否已经过摘要压缩
# 已完成步骤的结果
completed_steps: list[StepResult] = field(default_factory=list)
# 工具调用结果缓存(用于幂等恢复)
tool_cache: dict = field(default_factory=dict) # input_hash -> result
# 元数据
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
checkpoint_version: int = 0
# 错误信息(如果失败)
last_error: Optional[str] = None
retry_count: int = 0
def update(self, **kwargs):
"""更新检查点字段"""
for key, value in kwargs.items():
setattr(self, key, value)
self.updated_at = time.time()
self.checkpoint_version += 1
def add_step_result(self, result: StepResult) -> None:
self.completed_steps.append(result)
self.current_step = result.step_id + 1
self.updated_at = time.time()
self.checkpoint_version += 1
def get_progress(self) -> float:
"""返回任务进度(0.0 - 1.0)"""
if self.total_steps_estimated == 0:
return 0.0
return min(self.current_step / self.total_steps_estimated, 1.0)
def to_json(self) -> str:
data = asdict(self)
data['status'] = self.status.value
return json.dumps(data, ensure_ascii=False)
@classmethod
def from_json(cls, json_str: str) -> 'AgentCheckpoint':
data = json.loads(json_str)
data['status'] = TaskStatus(data['status'])
data['completed_steps'] = [StepResult(**s) for s in data['completed_steps']]
return cls(**data)
# ─── 存储后端 ──────────────────────────────────────────────────
class LocalFileCheckpointStore:
"""本地文件系统检查点存储(开发/测试用)"""
def __init__(self, base_dir: str = "/tmp/hermes_checkpoints"):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
def _path(self, task_id: str) -> str:
return os.path.join(self.base_dir, f"{task_id}.json")
async def save(self, checkpoint: AgentCheckpoint) -> None:
path = self._path(checkpoint.task_id)
# 原子写入:先写临时文件,再重命名
tmp_path = path + ".tmp"
with open(tmp_path, 'w', encoding='utf-8') as f:
f.write(checkpoint.to_json())
os.rename(tmp_path, path)
async def load(self, task_id: str) -> Optional[AgentCheckpoint]:
path = self._path(task_id)
if not os.path.exists(path):
return None
with open(path, 'r', encoding='utf-8') as f:
return AgentCheckpoint.from_json(f.read())
async def delete(self, task_id: str) -> None:
path = self._path(task_id)
if os.path.exists(path):
os.remove(path)
async def list_tasks(self) -> list[str]:
return [f.replace('.json', '') for f in os.listdir(self.base_dir) if f.endswith('.json')]
class RedisCheckpointStore:
"""Redis 检查点存储(生产推荐)"""
KEY_PREFIX = "hermes:checkpoint:"
DEFAULT_TTL = 7 * 24 * 3600 # 7天
def __init__(self, redis_url: str = "redis://localhost:6379"):
import redis.asyncio as aioredis
self.redis = aioredis.from_url(redis_url, encoding='utf-8', decode_responses=True)
async def save(self, checkpoint: AgentCheckpoint, ttl: int = DEFAULT_TTL) -> None:
key = f"{self.KEY_PREFIX}{checkpoint.task_id}"
await self.redis.setex(key, ttl, checkpoint.to_json())
async def load(self, task_id: str) -> Optional[AgentCheckpoint]:
key = f"{self.KEY_PREFIX}{task_id}"
data = await self.redis.get(key)
if data is None:
return None
return AgentCheckpoint.from_json(data)
async def delete(self, task_id: str) -> None:
await self.redis.delete(f"{self.KEY_PREFIX}{task_id}")
async def list_tasks(self) -> list[str]:
keys = await self.redis.keys(f"{self.KEY_PREFIX}*")
return [k.replace(self.KEY_PREFIX, '') for k in keys]
56.3 幂等工具调用设计
幂等性:同一个操作执行多次,结果与执行一次相同。这对于断点续跑至关重要——当任务恢复时,我们需要确保重新执行某些步骤不会产生副作用(如重复发邮件、重复写文件)。
# idempotent_tools.py
"""
幂等工具调用设计
使用输入哈希作为操作 ID,防止重复执行
"""
import hashlib
import json
import functools
from typing import Callable, Any, Optional
def compute_input_hash(*args, **kwargs) -> str:
"""计算工具调用输入的唯一哈希"""
content = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(content.encode()).hexdigest()[:16]
class IdempotentToolCache:
"""
工具调用结果缓存
基于输入哈希实现幂等保证
"""
def __init__(self, checkpoint: 'AgentCheckpoint'):
self.checkpoint = checkpoint
def get_cached_result(self, input_hash: str) -> Optional[Any]:
"""查找是否有缓存的工具调用结果"""
return self.checkpoint.tool_cache.get(input_hash)
def cache_result(self, input_hash: str, result: Any) -> None:
"""缓存工具调用结果"""
self.checkpoint.tool_cache[input_hash] = result
def idempotent_tool(tool_name: str):
"""
装饰器:使工具调用幂等
工作原理:
1. 计算工具调用输入的哈希
2. 查找是否已有缓存结果
3. 如果有,直接返回缓存结果(跳过实际执行)
4. 如果没有,执行工具并缓存结果
"""
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(self, *args, tool_cache: IdempotentToolCache = None, **kwargs):
# 计算输入哈希
input_hash = compute_input_hash(tool_name, *args, **kwargs)
# 检查缓存
if tool_cache:
cached = tool_cache.get_cached_result(input_hash)
if cached is not None:
print(f" [幂等] 使用缓存结果 {tool_name}({input_hash[:8]}...)")
return cached
# 执行工具
result = await func(self, *args, **kwargs)
# 缓存结果
if tool_cache:
tool_cache.cache_result(input_hash, result)
return result
wrapper._is_idempotent = True
wrapper._tool_name = tool_name
return wrapper
return decorator
# 不同类型工具的幂等策略
class ToolIdempotencyStrategy:
"""
工具幂等策略
不同工具需要不同的幂等处理方式
"""
@staticmethod
def is_naturally_idempotent(tool_name: str) -> bool:
"""
判断工具是否天然幂等
读操作通常是天然幂等的
"""
naturally_idempotent = {
"web_search", # 搜索结果相同(短期内)
"read_file", # 读取不改变状态
"get_datetime", # 可以重新获取(结果可能不同,但无副作用)
"calculate", # 纯计算,结果一致
"list_directory", # 只读操作
}
return tool_name in naturally_idempotent
@staticmethod
def requires_dedup(tool_name: str) -> bool:
"""
判断工具是否需要去重
写操作通常需要去重
"""
write_operations = {
"write_file", # 文件写入(需要检查是否已写入)
"send_email", # 发送邮件(绝对不能重复)
"database_insert", # 数据库插入(可能导致重复记录)
"api_post", # POST 请求(副作用不可逆)
"create_ticket", # 创建工单(不能重复创建)
}
return tool_name in write_operations
@staticmethod
def get_dedup_key(tool_name: str, args: dict) -> str:
"""
生成去重键
用于检查操作是否已经执行过
"""
if tool_name == "send_email":
return f"email:{args.get('to')}:{args.get('subject')}"
elif tool_name == "write_file":
return f"file:{args.get('path')}"
elif tool_name == "database_insert":
return f"db:{args.get('table')}:{json.dumps(args.get('data', {}), sort_keys=True)}"
else:
return compute_input_hash(tool_name, **args)
56.4 可中断恢复的文件处理 Agent
以下是一个完整的实战示例:一个批量处理文件的 Agent,支持中断后从上次停止的地方继续。
# resumable_file_agent.py
"""
可中断恢复的文件处理 Agent
实战示例:批量处理目录中的所有文本文件,提取关键信息并生成报告
"""
import asyncio
import os
import time
import signal
import uuid
from pathlib import Path
from typing import Optional
from openai import AsyncOpenAI
from checkpoint import (
AgentCheckpoint, StepResult, TaskStatus,
LocalFileCheckpointStore, RedisCheckpointStore
)
from idempotent_tools import IdempotentToolCache, compute_input_hash
class ResumableFileProcessingAgent:
"""
可中断恢复的文件处理 Agent
功能:
- 遍历目录,逐文件处理
- 每处理一个文件后保存检查点
- 支持 Ctrl+C 优雅中断
- 支持从任意检查点恢复
"""
CHECKPOINT_INTERVAL = 1 # 每处理1个文件保存一次检查点
def __init__(
self,
model: str = "NousResearch/Hermes-3-Llama-3.1-8B",
base_url: str = "http://localhost:8000/v1",
checkpoint_store = None,
task_id: Optional[str] = None
):
self.client = AsyncOpenAI(base_url=base_url, api_key="not-needed")
self.model = model
self.store = checkpoint_store or LocalFileCheckpointStore()
self.task_id = task_id or uuid.uuid4().hex[:12]
self.checkpoint: Optional[AgentCheckpoint] = None
self._interrupted = False
# 注册中断信号处理
signal.signal(signal.SIGINT, self._handle_interrupt)
signal.signal(signal.SIGTERM, self._handle_interrupt)
def _handle_interrupt(self, signum, frame):
"""优雅处理中断信号"""
print(f"\n[Agent] 接收到中断信号,将在当前文件处理完成后停止...")
self._interrupted = True
async def _process_single_file(self, file_path: str, context: str) -> str:
"""
处理单个文件,提取关键信息
使用幂等缓存,避免重复处理
"""
# 计算文件内容哈希(检测文件是否变化)
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
file_hash = compute_input_hash("process_file", file_path, content[:500])
# 检查幂等缓存
if self.checkpoint:
cache = IdempotentToolCache(self.checkpoint)
cached = cache.get_cached_result(file_hash)
if cached is not None:
print(f" [缓存] {os.path.basename(file_path)}")
return cached
# 实际处理文件
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "你是一个文件分析专家。从文本文件中提取关键信息,生成简洁的结构化摘要。"
},
{
"role": "user",
"content": f"""分析以下文件的内容,提取关键信息:
文件:{file_path}
上下文:{context}
文件内容(前3000字):
{content[:3000]}
请提供:
1. 文件类型和主题
2. 关键信息点(3-5条)
3. 重要数据或结论(如有)"""
}
],
temperature=0.1,
max_tokens=512
)
result = response.choices[0].message.content or "处理完成"
# 缓存结果
if self.checkpoint:
cache = IdempotentToolCache(self.checkpoint)
cache.cache_result(file_hash, result)
return result
async def _generate_final_report(self, file_results: dict, task_description: str) -> str:
"""基于所有文件的处理结果生成最终报告"""
results_str = "\n\n".join([
f"### {path}\n{result}"
for path, result in file_results.items()
])
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "你是一个报告撰写专家。将多个文件的分析结果综合为一份结构化报告。"
},
{
"role": "user",
"content": f"""任务:{task_description}
各文件分析结果:
{results_str}
请生成一份综合报告,包含:
1. 执行摘要
2. 主要发现
3. 跨文件的共性模式
4. 重要结论与建议"""
}
],
temperature=0.3,
max_tokens=2048
)
return response.choices[0].message.content or "报告生成完成"
async def run(
self,
directory: str,
task_description: str,
file_pattern: str = "*.txt",
resume: bool = True
) -> dict:
"""
主执行函数
Args:
directory: 要处理的目录
task_description: 任务描述
file_pattern: 文件匹配模式
resume: 是否从检查点恢复
"""
print(f"\n{'='*60}")
print(f"可中断恢复文件处理 Agent")
print(f"任务 ID: {self.task_id}")
print(f"目录: {directory}")
print(f"{'='*60}")
# 获取所有待处理文件
all_files = sorted(Path(directory).glob(f"**/{file_pattern}"))
all_file_paths = [str(f) for f in all_files]
if not all_file_paths:
return {"success": False, "error": f"目录 {directory} 中没有 {file_pattern} 文件"}
print(f"找到 {len(all_file_paths)} 个文件")
# 尝试加载检查点
if resume:
self.checkpoint = await self.store.load(self.task_id)
if self.checkpoint and self.checkpoint.status == TaskStatus.COMPLETED:
print("[Agent] 任务已完成,跳过处理")
return {"success": True, "answer": "任务已完成,从检查点加载结果"}
if self.checkpoint:
print(f"[Agent] 从检查点恢复(步骤 {self.checkpoint.current_step}/{len(all_file_paths)})")
print(f"[Agent] 已完成进度: {self.checkpoint.get_progress():.1%}")
else:
# 创建新检查点
self.checkpoint = AgentCheckpoint(
task_id=self.task_id,
task_description=task_description,
status=TaskStatus.RUNNING,
current_step=0,
total_steps_estimated=len(all_file_paths),
messages=[]
)
await self.store.save(self.checkpoint)
print("[Agent] 创建新任务检查点")
# 收集已完成步骤的结果
file_results = {}
for step in self.checkpoint.completed_steps:
if step.tool_name == "process_file":
file_results[step.input_hash] = step.output
# 从上次中断处继续
start_from = self.checkpoint.current_step
for i, file_path in enumerate(all_file_paths[start_from:], start=start_from):
if self._interrupted:
print(f"\n[Agent] 优雅中断,已完成 {i}/{len(all_file_paths)} 个文件")
self.checkpoint.update(status=TaskStatus.PAUSED)
await self.store.save(self.checkpoint)
return {
"success": False,
"interrupted": True,
"progress": i / len(all_file_paths),
"task_id": self.task_id,
"resume_command": f"agent.run('{directory}', resume=True)"
}
print(f"\n[{i+1}/{len(all_file_paths)}] 处理: {os.path.basename(file_path)}")
step_start = time.time()
try:
result = await self._process_single_file(file_path, task_description)
file_results[file_path] = result
print(f" 完成({time.time() - step_start:.1f}s): {result[:80]}...")
# 记录步骤结果
step_result = StepResult(
step_id=i,
step_type="tool_call",
input_hash=compute_input_hash("process_file", file_path),
output=result,
tool_name="process_file",
tool_args={"file_path": file_path},
duration_seconds=time.time() - step_start
)
self.checkpoint.add_step_result(step_result)
# 定期保存检查点
if (i + 1) % self.CHECKPOINT_INTERVAL == 0:
await self.store.save(self.checkpoint)
print(f" [检查点] 已保存(步骤 {i+1})")
except Exception as e:
print(f" 处理失败: {e}")
self.checkpoint.last_error = str(e)
self.checkpoint.retry_count += 1
await self.store.save(self.checkpoint)
# 继续处理其他文件
continue
# 生成最终报告
print(f"\n[Agent] 所有文件处理完成,生成最终报告...")
final_report = await self._generate_final_report(file_results, task_description)
# 标记任务完成
self.checkpoint.update(status=TaskStatus.COMPLETED)
await self.store.save(self.checkpoint)
# 输出最终报告到文件
report_path = f"/tmp/hermes_report_{self.task_id}.md"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(f"# 任务报告\n\n")
f.write(f"任务 ID: {self.task_id}\n")
f.write(f"处理文件数: {len(file_results)}\n")
f.write(f"完成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(final_report)
print(f"\n[Agent] 报告已保存至: {report_path}")
return {
"success": True,
"answer": final_report,
"files_processed": len(file_results),
"files_total": len(all_file_paths),
"report_path": report_path,
"task_id": self.task_id
}
# ─── 上下文压缩(防止溢出)─────────────────────────────────────
class ContextCompressor:
"""
上下文压缩器
当消息历史过长时,自动摘要压缩
"""
def __init__(self, client: AsyncOpenAI, model: str, max_tokens: int = 6000):
self.client = client
self.model = model
self.max_tokens = max_tokens
def estimate_tokens(self, messages: list) -> int:
"""粗略估算 Token 数量(4字符约1个Token)"""
return sum(len(str(m.get('content', ''))) // 4 for m in messages)
async def compress_if_needed(self, messages: list, keep_recent: int = 6) -> list:
"""如果超过阈值,压缩历史消息"""
if self.estimate_tokens(messages) <= self.max_tokens:
return messages
system_msgs = [m for m in messages if m['role'] == 'system']
recent_msgs = messages[-keep_recent:]
middle_msgs = messages[len(system_msgs):-keep_recent]
if not middle_msgs:
return messages
# 生成摘要
summary_content = "\n".join([
f"{m['role']}: {str(m.get('content', ''))[:300]}"
for m in middle_msgs
])
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "请用3-5句话总结以下对话的关键信息、已知事实和重要结论:"},
{"role": "user", "content": summary_content}
],
temperature=0.1,
max_tokens=256
)
summary = response.choices[0].message.content
compressed_msg = {
"role": "user",
"content": f"[历史对话摘要] {summary}"
}
compressed = system_msgs + [compressed_msg] + recent_msgs
print(f"[ContextCompressor] 压缩 {len(middle_msgs)} 条消息为 1 条摘要")
return compressed
# ─── 使用示例 ──────────────────────────────────────────────────
async def main():
# 创建测试文件
import tempfile
test_dir = tempfile.mkdtemp()
for i in range(5):
with open(f"{test_dir}/file_{i}.txt", 'w') as f:
f.write(f"这是第 {i+1} 个测试文件。包含一些测试内容,用于验证 Agent 的文件处理能力。\n" * 10)
print(f"测试目录:{test_dir}")
# 首次运行(可能被中断)
agent = ResumableFileProcessingAgent(
model="NousResearch/Hermes-3-Llama-3.1-8B",
base_url="http://localhost:8000/v1",
task_id="demo_task_001"
)
result = await agent.run(
directory=test_dir,
task_description="分析文件内容,识别关键主题和重要信息",
file_pattern="*.txt",
resume=True # 自动从上次中断处继续
)
if result.get("interrupted"):
print(f"\n任务中断,进度:{result['progress']:.1%}")
print(f"要继续执行,请使用 task_id: {result['task_id']}")
else:
print(f"\n任务完成!处理了 {result['files_processed']} 个文件")
print(f"报告位置:{result.get('report_path')}")
if __name__ == "__main__":
asyncio.run(main())
56.5 容错架构总结
flowchart TD
Start[任务开始] --> CP1[创建/加载检查点]
CP1 --> Loop[执行循环]
Loop --> Exec[执行当前步骤]
Exec --> Success{步骤成功?}
Success --> |是| Cache[更新幂等缓存]
Cache --> SaveCP[保存检查点]
SaveCP --> More{还有步骤?}
More --> |是| Loop
More --> |否| Report[生成最终报告]
Report --> Done[完成]
Success --> |否| Retry{重试次数\n< 上限?}
Retry --> |是| Wait[等待后重试]
Wait --> Exec
Retry --> |否| Skip{可以跳过?}
Skip --> |是| Loop
Skip --> |否| Fail[任务失败]
Interrupt{检测到中断} --> |是| SaveCP2[保存检查点]
SaveCP2 --> Exit[优雅退出]
Loop --> Interrupt
| 容错机制 | 解决的问题 | 代码位置 |
|---|---|---|
| 检查点保存 | 服务器崩溃/重启 | checkpoint.py |
| 幂等工具缓存 | 重复执行副作用 | idempotent_tools.py |
| 指数退避重试 | 暂时性失败 | Agent 执行循环 |
| 上下文压缩 | 上下文窗口溢出 | ContextCompressor |
| 优雅中断 | 主动暂停需求 | signal 处理器 |
| 分批处理 | 大数据集 OOM | 文件逐个处理 |
小结
本章系统讲解了 Hermes Agent 长任务容错的完整方案:
- 失败模式分类:超时、上下文溢出、工具失败、基础设施故障——每类有专属对策。
- 检查点机制:定期序列化任务状态,支持文件系统和 Redis 两种存储后端,原子写入保证数据完整性。
- 幂等工具设计:基于输入哈希的缓存机制,防止恢复时重复执行有副作用的操作(发邮件、写文件等)。
- 可中断恢复 Agent:完整实现包含信号处理(SIGINT/SIGTERM)、检查点保存、从中断点续跑的文件处理 Agent。
- 上下文压缩:当消息历史超过上下文窗口时,自动摘要压缩,保留关键信息。
思考题
- 检查点保存的频率如何权衡?太频繁会增加 I/O 开销,太稀疏会在失败时丢失更多进度。如何根据任务特性动态调整?
- 幂等工具缓存存储在检查点中,如果一个文件被修改后任务重启,缓存的旧结果会导致错误。如何设计内容感知的缓存失效机制?
- 对于需要数天运行的 Agent,Redis 检查点的 TTL 如何设置?能否在任务完成后自动清理?
- 如果 Hermes Agent 部署在 Kubernetes 中,Pod 被强制驱逐(SIGKILL),检查点还来得及保存吗?如何设计更鲁棒的方案?