第 63 章
调试方法论:agent.log 分析与追踪
第63章:调试方法论:agent.log 分析与追踪
调试 Agent 系统就像在黑暗的森林里找人——你不知道他往哪走,也不确定他是否还在走。日志是你手中的手电筒,追踪是你的罗盘,而方法论是让你不迷路的地图。
63.1 日志结构解析:agent.log vs errors.log
两个日志文件的定位
Hermes Agent 使用两个独立日志文件,各自承担不同职责:
agent.log:正常流程日志
- 记录每次状态转换
- 记录工具调用的输入输出
- 记录 Token 使用统计
- 记录性能指标(延迟、批次大小等)
- 格式:结构化 JSON,便于程序解析
errors.log:错误专用日志
- 记录所有异常和错误
- 包含完整的错误堆栈
- 记录错误发生时的 Agent 状态快照
- 包含重试信息和最终结果
- 格式:带有上下文信息的结构化 JSON
日志格式规范
import logging
import json
import traceback
from datetime import datetime
from typing import Any, Optional
class StructuredFormatter(logging.Formatter):
"""
结构化日志格式化器
输出 JSON 格式,便于 ELK Stack 等工具解析
"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# 附加结构化字段(如果有)
if hasattr(record, 'extra'):
log_entry.update(record.extra)
# 错误信息
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__ if record.exc_info[0] else None,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
return json.dumps(log_entry, ensure_ascii=False)
def setup_hermes_logging(log_dir: str = "/var/log/hermes") -> tuple:
"""
配置 Hermes Agent 日志系统
Returns:
(agent_logger, error_logger)
"""
import os
os.makedirs(log_dir, exist_ok=True)
formatter = StructuredFormatter()
# Agent 主日志
agent_logger = logging.getLogger('agent')
agent_logger.setLevel(logging.DEBUG)
agent_handler = logging.FileHandler(f"{log_dir}/agent.log")
agent_handler.setFormatter(formatter)
agent_logger.addHandler(agent_handler)
# 错误日志(仅记录 WARNING 及以上)
error_logger = logging.getLogger('errors')
error_logger.setLevel(logging.WARNING)
error_handler = logging.FileHandler(f"{log_dir}/errors.log")
error_handler.setFormatter(formatter)
error_logger.addHandler(error_handler)
# 同时将错误记录到控制台
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.ERROR)
error_logger.addHandler(console_handler)
return agent_logger, error_logger
# agent.log 典型条目示例
AGENT_LOG_EXAMPLE = {
"timestamp": "2024-12-24T10:26:40.123Z",
"level": "INFO",
"logger": "agent",
"message": "State transition",
"session_id": "sess_abc123",
"event": "tool_called",
"from_state": "executing",
"to_state": "waiting",
"tool_name": "web_search",
"tool_input": {"query": "最新 AI 论文", "max_results": 5},
"step_index": 2,
"total_steps": 5,
"tokens_used_so_far": 8240,
"elapsed_seconds": 12.3
}
# errors.log 典型条目示例
ERRORS_LOG_EXAMPLE = {
"timestamp": "2024-12-24T10:27:15.456Z",
"level": "ERROR",
"logger": "errors",
"message": "Tool execution failed",
"session_id": "sess_abc123",
"error_type": "ToolTimeoutError",
"error_message": "web_search timed out after 30 seconds",
"state_when_error": "waiting",
"current_tool": "web_search",
"tool_input": {"query": "最新 AI 论文"},
"retry_count": 1,
"max_retries": 3,
"will_retry": True,
"agent_state_snapshot": {
"plan_steps": ["search", "analyze", "report"],
"current_step": 0,
"tokens_used": 8240
},
"exception": {
"type": "ToolTimeoutError",
"traceback": ["..."]
}
}
关键日志字段说明
| 字段 | 出现在 | 用途 |
|---|---|---|
session_id |
两者 | 关联同一会话的所有日志 |
timestamp |
两者 | 时序分析,计算阶段耗时 |
from_state/to_state |
agent.log | 状态转换追踪 |
tool_name + tool_input |
两者 | 工具调用追踪 |
tokens_used_so_far |
agent.log | Token 预算监控 |
retry_count |
errors.log | 判断是否为间歇性问题 |
agent_state_snapshot |
errors.log | 错误现场还原 |
will_retry |
errors.log | 区分可恢复和不可恢复错误 |
63.2 多步工具调用失败的追踪方法
追踪挑战
Hermes Agent 的一次任务可能涉及 5-20 次工具调用,当其中某步失败时,需要快速定位:
- 失败发生在哪一步?
- 工具的输入参数是否正确?
- 失败是否影响后续步骤?
- 整体任务是否可以继续执行?
分布式追踪实现
import uuid
import time
import functools
from typing import Callable, Any
from contextlib import contextmanager
class TraceContext:
"""
分布式追踪上下文
为每次工具调用生成唯一 span,支持父子关系
"""
def __init__(self, trace_id: str = None, parent_span_id: str = None):
self.trace_id = trace_id or str(uuid.uuid4())[:8]
self.parent_span_id = parent_span_id
self.span_id = str(uuid.uuid4())[:8]
self.start_time = time.time()
self.events = []
def add_event(self, name: str, data: dict = None):
self.events.append({
"time": time.time() - self.start_time,
"name": name,
"data": data or {}
})
def to_log_dict(self, status: str, error: str = None) -> dict:
duration = time.time() - self.start_time
entry = {
"trace_id": self.trace_id,
"span_id": self.span_id,
"parent_span_id": self.parent_span_id,
"duration_ms": round(duration * 1000, 2),
"status": status, # "ok" | "error" | "timeout"
"events": self.events
}
if error:
entry["error"] = error
return entry
class InstrumentedToolCaller:
"""
带追踪的工具调用器
自动记录每次工具调用的完整上下文
"""
def __init__(self, agent_logger, error_logger):
self.agent_logger = agent_logger
self.error_logger = error_logger
self.call_chain = [] # 记录本次任务的完整调用链
async def call_tool(
self,
tool_name: str,
tool_func: Callable,
args: dict,
session_id: str,
trace_context: TraceContext,
timeout: float = 30.0
) -> Any:
"""
带追踪的工具调用包装器
自动记录:
- 调用输入
- 执行时间
- 返回结果(摘要)
- 错误信息
"""
import asyncio
span = TraceContext(
trace_id=trace_context.trace_id,
parent_span_id=trace_context.span_id
)
call_record = {
"tool": tool_name,
"args": args,
"start_time": time.time()
}
# 记录调用开始
self.agent_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_start",
"session_id": session_id,
"tool_name": tool_name,
"args": self._sanitize_args(args),
"trace_id": span.trace_id,
"span_id": span.span_id,
}))
try:
# 带超时的工具调用
result = await asyncio.wait_for(
tool_func(**args) if asyncio.iscoroutinefunction(tool_func)
else asyncio.to_thread(tool_func, **args),
timeout=timeout
)
duration_ms = (time.time() - call_record["start_time"]) * 1000
# 记录成功结果
self.agent_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_success",
"session_id": session_id,
"tool_name": tool_name,
"duration_ms": round(duration_ms, 2),
"result_summary": self._summarize_result(result),
**span.to_log_dict("ok")
}))
call_record["result"] = "success"
call_record["duration_ms"] = duration_ms
self.call_chain.append(call_record)
return result
except asyncio.TimeoutError:
duration_ms = timeout * 1000
error_msg = f"Tool '{tool_name}' timed out after {timeout}s"
self.error_logger.error(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_timeout",
"session_id": session_id,
"tool_name": tool_name,
"timeout_seconds": timeout,
"args": self._sanitize_args(args),
**span.to_log_dict("timeout", error_msg)
}))
call_record["result"] = "timeout"
call_record["error"] = error_msg
self.call_chain.append(call_record)
raise
except Exception as e:
duration_ms = (time.time() - call_record["start_time"]) * 1000
self.error_logger.error(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_error",
"session_id": session_id,
"tool_name": tool_name,
"duration_ms": round(duration_ms, 2),
"error_type": type(e).__name__,
"error_message": str(e),
"args": self._sanitize_args(args),
**span.to_log_dict("error", str(e))
}), exc_info=True)
call_record["result"] = "error"
call_record["error"] = str(e)
self.call_chain.append(call_record)
raise
def _sanitize_args(self, args: dict) -> dict:
"""清理敏感参数(密码、token 等)"""
sensitive_keys = {"password", "token", "api_key", "secret", "credential"}
return {
k: "***REDACTED***" if k.lower() in sensitive_keys else v
for k, v in args.items()
}
def _summarize_result(self, result: Any) -> str:
"""生成结果摘要(避免记录大型结果)"""
result_str = str(result)
if len(result_str) > 200:
return result_str[:200] + f"... [{len(result_str)} chars total]"
return result_str
def get_call_chain_report(self) -> dict:
"""生成完整调用链报告"""
total_time = sum(c.get("duration_ms", 0) for c in self.call_chain)
failed = [c for c in self.call_chain if c["result"] != "success"]
return {
"total_tool_calls": len(self.call_chain),
"successful_calls": len(self.call_chain) - len(failed),
"failed_calls": len(failed),
"total_time_ms": round(total_time, 2),
"call_chain": self.call_chain,
"failures": failed
}
63.3 常见 Bug 模式识别
Bug 模式一:工具调用无限循环
症状:agent.log 中同一个工具被反复调用,Agent 无法推进到下一步。
日志特征:
{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_success", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2} // 重复!
{"event": "tool_call_success", "tool_name": "web_search", "step_index": 2}
检测代码:
def detect_infinite_loop(log_file: str, session_id: str, threshold: int = 3) -> list:
"""检测工具调用无限循环"""
from collections import defaultdict
tool_call_counts = defaultdict(int)
loop_alerts = []
with open(log_file) as f:
for line in f:
try:
entry = json.loads(line)
if (entry.get("session_id") == session_id and
entry.get("event") == "tool_call_start"):
# 包含 step_index 的工具调用key
key = f"{entry['tool_name']}@step{entry.get('step_index', '?')}"
tool_call_counts[key] += 1
if tool_call_counts[key] >= threshold:
loop_alerts.append({
"tool": entry["tool_name"],
"step": entry.get("step_index"),
"count": tool_call_counts[key],
"timestamp": entry["timestamp"]
})
except (json.JSONDecodeError, KeyError):
continue
return loop_alerts
# 根本原因:LLM 没有正确理解工具结果,认为任务未完成
# 修复方案:
# 1. 在工具结果中明确标注"此步骤已完成"
# 2. 添加工具调用计数器,超过 N 次后强制跳过
# 3. 在系统提示中明确说明工具调用失败时的处理策略
Bug 模式二:记忆污染
症状:Agent 在新会话中使用了上一个会话的数据,产生不相关或错误的回答。
日志特征:
// 新会话开始
{"event": "session_start", "session_id": "sess_new001"}
// 但 Agent 引用了旧数据
{"event": "llm_response", "content": "根据您之前提到的销售数据..."} // "之前"不存在!
检测和修复:
class MemoryPollutionGuard:
"""
记忆污染检测和预防
确保会话隔离
"""
def __init__(self):
self._session_memories = {}
def get_memory(self, session_id: str) -> dict:
"""获取会话专属记忆,确保不跨污染"""
if session_id not in self._session_memories:
self._session_memories[session_id] = {
"conversation_history": [],
"tool_results": {},
"working_context": {},
"created_at": time.time()
}
return self._session_memories[session_id]
def clear_session(self, session_id: str):
"""会话结束时完全清除记忆"""
if session_id in self._session_memories:
del self._session_memories[session_id]
def audit_cross_session_references(self, session_id: str, content: str) -> list:
"""
审计响应内容中是否包含其他会话的引用
检测常见的污染信号
"""
alerts = []
current_session_memory = self.get_memory(session_id)
# 检测其他会话的 session_id 出现在内容中
for other_session_id in self._session_memories:
if other_session_id != session_id and other_session_id in content:
alerts.append({
"type": "session_id_leak",
"leaked_session": other_session_id,
"in_session": session_id
})
# 检测时间悖论(引用"之前"但会话刚开始)
session_age = time.time() - current_session_memory["created_at"]
time_references = ["之前", "上次", "previously", "last time", "as mentioned"]
if session_age < 60: # 会话开始不足60秒
for ref in time_references:
if ref in content:
alerts.append({
"type": "temporal_paradox",
"reference": ref,
"session_age_seconds": session_age
})
return alerts
Bug 模式三:参数解析失败
症状:工具调用失败,错误信息为参数类型错误或必填字段缺失。
日志特征:
{"event": "tool_call_error", "error_type": "ValidationError",
"error_message": "Missing required field 'query'",
"args": {"search_term": "AI 论文"}} // LLM 用了错误的字段名!
自动检测和修复:
import re
from typing import Dict, Any
class ToolArgValidator:
"""
工具参数验证器
在调用工具前进行参数校验,提前发现问题
"""
def __init__(self, tool_schemas: dict):
self.schemas = tool_schemas
self.error_patterns = [] # 记录历史错误模式
def validate(self, tool_name: str, args: dict) -> tuple:
"""
验证工具参数
Returns:
(is_valid, error_message, corrected_args)
"""
schema = self.schemas.get(tool_name)
if not schema:
return False, f"Unknown tool: {tool_name}", args
required_fields = schema.get("required", [])
properties = schema.get("properties", {})
errors = []
corrected = dict(args)
# 检查必填字段
for field in required_fields:
if field not in args:
# 尝试模糊匹配(处理字段名拼写变体)
fuzzy_match = self._fuzzy_find_field(field, args)
if fuzzy_match:
corrected[field] = corrected.pop(fuzzy_match)
errors.append(f"Auto-corrected: '{fuzzy_match}' → '{field}'")
else:
errors.append(f"Missing required field: '{field}'")
# 检查类型
for field, value in args.items():
if field in properties:
expected_type = properties[field].get("type")
if not self._check_type(value, expected_type):
# 尝试类型转换
converted = self._try_convert(value, expected_type)
if converted is not None:
corrected[field] = converted
errors.append(f"Auto-converted '{field}': {type(value).__name__} → {expected_type}")
else:
errors.append(f"Type mismatch for '{field}': expected {expected_type}, got {type(value).__name__}")
is_valid = all("Missing" not in e for e in errors)
return is_valid, "; ".join(errors), corrected
def _fuzzy_find_field(self, target: str, args: dict) -> Optional[str]:
"""模糊匹配字段名(处理同义词和拼写变体)"""
synonyms = {
"query": ["search_term", "q", "search", "keyword", "keywords"],
"url": ["link", "href", "address", "web_url"],
"file_path": ["path", "filepath", "filename", "file"],
"max_results": ["limit", "count", "n", "num_results", "top_k"]
}
for canonical, variants in synonyms.items():
if target == canonical:
for variant in variants:
if variant in args:
return variant
return None
def _check_type(self, value: Any, expected_type: str) -> bool:
type_map = {"string": str, "integer": int, "number": (int, float), "boolean": bool, "array": list}
expected = type_map.get(expected_type)
return isinstance(value, expected) if expected else True
def _try_convert(self, value: Any, expected_type: str) -> Optional[Any]:
try:
if expected_type == "string":
return str(value)
elif expected_type == "integer":
return int(value)
elif expected_type == "number":
return float(value)
elif expected_type == "boolean":
return bool(value)
except (ValueError, TypeError):
pass
return None
def record_error_pattern(self, tool_name: str, error: str, args: dict):
"""记录错误模式,用于后续分析"""
self.error_patterns.append({
"timestamp": datetime.utcnow().isoformat(),
"tool": tool_name,
"error": error,
"args_keys": list(args.keys())
})
def get_frequent_errors(self, top_n: int = 5) -> list:
"""获取最常见的错误模式"""
from collections import Counter
error_counts = Counter(p["error"] for p in self.error_patterns)
return error_counts.most_common(top_n)
63.4 构建调试仪表盘:ELK Stack 配置
架构概述
Hermes Agent → agent.log → Filebeat → Logstash → Elasticsearch → Kibana
errors.log ↗
Logstash 配置
# /etc/logstash/conf.d/hermes-agent.conf
input {
file {
path => ["/var/log/hermes/agent.log", "/var/log/hermes/errors.log"]
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
tags => ["hermes_agent"]
}
}
filter {
# 解析 JSON 格式日志
json {
source => "message"
target => "parsed"
remove_field => ["message"]
}
# 将解析出的字段提升到顶层
ruby {
code => "
if event.get('parsed')
event.get('parsed').each { |k, v| event.set(k, v) }
event.remove('parsed')
end
"
}
# 按日志文件分类
if "/errors.log" in [path] {
mutate { add_field => { "log_type" => "error" } }
} else {
mutate { add_field => { "log_type" => "agent" } }
}
# 计算 duration 到毫秒
if [elapsed_seconds] {
ruby {
code => "event.set('elapsed_ms', (event.get('elapsed_seconds') * 1000).round(2))"
}
}
# 提取会话 ID 作为索引字段
if [session_id] {
mutate {
add_field => { "[@metadata][session_id]" => "%{session_id}" }
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "hermes-agent-%{+YYYY.MM.dd}"
template_name => "hermes-agent"
template_overwrite => true
}
}
Kibana 仪表盘关键查询
// 1. 工具调用成功率(按工具分类)
{
"query": {
"bool": {
"must": [
{"term": {"event": "tool_call_start"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
},
"aggs": {
"by_tool": {
"terms": {"field": "tool_name.keyword"},
"aggs": {
"success_rate": {
"filter": {"term": {"log_type": "agent"}}
},
"error_rate": {
"filter": {"term": {"log_type": "error"}}
}
}
}
}
}
// 2. 平均工具调用耗时(Top 10 慢工具)
{
"query": {"term": {"event": "tool_call_success"}},
"aggs": {
"by_tool": {
"terms": {"field": "tool_name.keyword", "size": 10},
"aggs": {
"avg_duration": {"avg": {"field": "duration_ms"}},
"p95_duration": {"percentiles": {"field": "duration_ms", "percents": [95]}}
}
}
}
}
Elasticsearch 索引模板
{
"index_patterns": ["hermes-agent-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.refresh_interval": "5s"
},
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"session_id": {"type": "keyword"},
"event": {"type": "keyword"},
"level": {"type": "keyword"},
"tool_name": {"type": "keyword"},
"from_state": {"type": "keyword"},
"to_state": {"type": "keyword"},
"duration_ms": {"type": "float"},
"tokens_used_so_far": {"type": "integer"},
"retry_count": {"type": "integer"},
"log_type": {"type": "keyword"},
"error_type": {"type": "keyword"},
"message": {"type": "text", "analyzer": "standard"}
}
}
}
63.5 Session 重放(Replay)实现
什么是 Session 重放
Session 重放允许开发者精确重现 Agent 曾经执行的任务流程,用于:
- 复现已报告的 Bug
- 验证修复是否有效
- 性能回归测试
- 生成测试用例
重放系统实现
import json
import asyncio
from typing import Optional, Callable
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ReplayEvent:
timestamp: str
event_type: str
data: dict
class SessionReplayer:
"""
Session 重放器
从 agent.log 重现 Agent 的完整执行过程
"""
def __init__(self, log_file: str = "/var/log/hermes/agent.log"):
self.log_file = log_file
def load_session(self, session_id: str) -> list:
"""从日志文件加载指定 session 的所有事件"""
events = []
with open(self.log_file, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if entry.get("session_id") == session_id:
events.append(ReplayEvent(
timestamp=entry.get("timestamp", ""),
event_type=entry.get("event", entry.get("message", "")),
data=entry
))
except json.JSONDecodeError:
continue
# 按时间排序
events.sort(key=lambda e: e.timestamp)
return events
def print_timeline(self, session_id: str):
"""打印会话时间线(可视化执行流程)"""
events = self.load_session(session_id)
if not events:
print(f"No events found for session {session_id}")
return
print(f"\n{'='*60}")
print(f"Session Replay: {session_id}")
print(f"Total Events: {len(events)}")
print(f"{'='*60}\n")
# 计算相对时间
start_time = datetime.fromisoformat(events[0].timestamp.rstrip('Z'))
for event in events:
try:
event_time = datetime.fromisoformat(event.timestamp.rstrip('Z'))
elapsed = (event_time - start_time).total_seconds()
except:
elapsed = 0
event_type = event.event_type
data = event.data
# 格式化输出
prefix = " "
if "error" in event_type.lower():
prefix = "❌"
elif "success" in event_type.lower():
prefix = "✅"
elif "start" in event_type.lower():
prefix = "▶️ "
elif "transition" in event_type.lower():
prefix = "→ "
print(f"[T+{elapsed:6.2f}s] {prefix} {event_type}")
# 打印关键字段
for key in ["tool_name", "from_state", "to_state", "error_message",
"tokens_used_so_far", "step_index"]:
if key in data and data[key] is not None:
print(f" {key}: {data[key]}")
print()
async def replay_with_mocks(
self,
session_id: str,
tool_mocks: dict, # {tool_name: mock_response}
on_state_change: Optional[Callable] = None
) -> dict:
"""
使用模拟工具重放 Session
Args:
session_id: 要重放的会话 ID
tool_mocks: 工具模拟响应 {tool_name: mock_result}
on_state_change: 状态变化时的回调函数
Returns:
重放结果报告
"""
events = self.load_session(session_id)
replay_results = []
tool_calls_in_replay = []
for event in events:
event_type = event.event_type
data = event.data
if event_type == "tool_call_start":
tool_name = data.get("tool_name")
args = data.get("args", {})
# 使用模拟工具或实际工具
if tool_name in tool_mocks:
mock = tool_mocks[tool_name]
result = mock(args) if callable(mock) else mock
status = "mocked"
else:
result = None
status = "skipped_no_mock"
tool_calls_in_replay.append({
"tool": tool_name,
"args": args,
"result": str(result)[:100],
"status": status
})
elif event_type == "State transition" and on_state_change:
await on_state_change(
from_state=data.get("from_state"),
to_state=data.get("to_state"),
trigger=data.get("trigger")
)
# 模拟真实时序(可选,用于调试时序相关 Bug)
# await asyncio.sleep(0.01)
return {
"session_id": session_id,
"total_events": len(events),
"tool_calls": tool_calls_in_replay,
"replay_complete": True
}
def generate_test_case(self, session_id: str) -> str:
"""
从 Session 日志自动生成 pytest 测试用例
"""
events = self.load_session(session_id)
tool_calls = [e for e in events if e.event_type == "tool_call_start"]
test_code = f'''
import pytest
import asyncio
# Auto-generated test case from session {session_id}
# Generated at: {datetime.now().isoformat()}
class TestSession{session_id.replace("-", "_")}:
"""
Replay test for session {session_id}
{len(events)} total events, {len(tool_calls)} tool calls
"""
@pytest.fixture
def agent(self):
from hermes_agent import HermesAgentFSM
agent = HermesAgentFSM(session_id="replay_{session_id}")
# Register required tools
'''
# 提取所有用到的工具
tools_used = set()
for event in events:
if event.event_type == "tool_call_start":
tools_used.add(event.data.get("tool_name", ""))
for tool in tools_used:
if tool:
test_code += f' agent.register_tool("{tool}")\n'
test_code += ''' return agent
def test_replay(self, agent):
"""Verify Agent reproduces the same state transitions"""
replayer = SessionReplayer()
result = asyncio.run(replayer.replay_with_mocks(
session_id="{session_id}",
tool_mocks={{
'''.format(session_id=session_id)
for event in events:
if event.event_type == "tool_call_success":
tool = event.data.get("tool_name", "")
result_summary = event.data.get("result_summary", "{}")
test_code += f' "{tool}": "{result_summary}",\n'
test_code += ''' }
))
assert result["replay_complete"] == True
assert result["total_events"] > 0
'''
return test_code
# 使用示例
replayer = SessionReplayer("/var/log/hermes/agent.log")
# 打印会话时间线
replayer.print_timeline("sess_abc123")
# 生成测试用例
test_code = replayer.generate_test_case("sess_abc123")
with open("test_session_replay.py", "w") as f:
f.write(test_code)
print("测试用例已生成:test_session_replay.py")
本章小结
完整的调试方法论让 Hermes Agent 从黑盒变成透明系统:
- 双日志架构:agent.log 记录正常流程的所有状态转换,errors.log 记录带上下文的完整错误信息,两者配合实现全面覆盖
- 分布式追踪:每个工具调用生成 TraceID/SpanID,支持跨工具调用的调用链分析
- Bug 模式识别:无限循环(工具被重复调用)、记忆污染(会话数据交叉)、参数解析失败(字段名错误)是最常见的三种故障模式,均有自动检测方案
- ELK 仪表盘:实时监控工具成功率、P95 延迟、Token 使用趋势,提前发现问题
- Session 重放:从日志重现执行过程,自动生成测试用例,构成完整的调试-验证闭环
思考题
- 当 Agent 产生的输出被用户标记为"错误"时,如何设计反向追踪(从输出追溯到导致错误的工具调用)?
- 在生产环境中,完整记录所有工具调用的输入输出会产生大量数据,如何设计"按需采样"的日志策略?
- 如果同一个 Bug 在 100 次调用中只出现 1 次,传统的复现方式难以奏效,如何设计"概率性 Bug 追踪"系统?
- Session 重放测试中,如果工具的实际网络响应随时间变化(如网页内容更新),如何保证测试的确定性?