第 63 章

调试方法论:agent.log 分析与追踪

第63章:调试方法论:agent.log 分析与追踪

调试 Agent 系统就像在黑暗的森林里找人——你不知道他往哪走,也不确定他是否还在走。日志是你手中的手电筒,追踪是你的罗盘,而方法论是让你不迷路的地图。


63.1 日志结构解析:agent.log vs errors.log

两个日志文件的定位

Hermes Agent 使用两个独立日志文件,各自承担不同职责:

agent.log:正常流程日志

errors.log:错误专用日志

日志格式规范

import logging
import json
import traceback
from datetime import datetime
from typing import Any, Optional

class StructuredFormatter(logging.Formatter):
    """
    结构化日志格式化器
    输出 JSON 格式,便于 ELK Stack 等工具解析
    """
    
    def format(self, record: logging.LogRecord) -> str:
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        
        # 附加结构化字段(如果有)
        if hasattr(record, 'extra'):
            log_entry.update(record.extra)
        
        # 错误信息
        if record.exc_info:
            log_entry["exception"] = {
                "type": record.exc_info[0].__name__ if record.exc_info[0] else None,
                "message": str(record.exc_info[1]),
                "traceback": traceback.format_exception(*record.exc_info)
            }
        
        return json.dumps(log_entry, ensure_ascii=False)

def setup_hermes_logging(log_dir: str = "/var/log/hermes") -> tuple:
    """
    配置 Hermes Agent 日志系统
    
    Returns:
        (agent_logger, error_logger)
    """
    import os
    os.makedirs(log_dir, exist_ok=True)
    
    formatter = StructuredFormatter()
    
    # Agent 主日志
    agent_logger = logging.getLogger('agent')
    agent_logger.setLevel(logging.DEBUG)
    agent_handler = logging.FileHandler(f"{log_dir}/agent.log")
    agent_handler.setFormatter(formatter)
    agent_logger.addHandler(agent_handler)
    
    # 错误日志(仅记录 WARNING 及以上)
    error_logger = logging.getLogger('errors')
    error_logger.setLevel(logging.WARNING)
    error_handler = logging.FileHandler(f"{log_dir}/errors.log")
    error_handler.setFormatter(formatter)
    error_logger.addHandler(error_handler)
    
    # 同时将错误记录到控制台
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    console_handler.setLevel(logging.ERROR)
    error_logger.addHandler(console_handler)
    
    return agent_logger, error_logger

# agent.log 典型条目示例
AGENT_LOG_EXAMPLE = {
    "timestamp": "2024-12-24T10:26:40.123Z",
    "level": "INFO",
    "logger": "agent",
    "message": "State transition",
    "session_id": "sess_abc123",
    "event": "tool_called",
    "from_state": "executing",
    "to_state": "waiting",
    "tool_name": "web_search",
    "tool_input": {"query": "最新 AI 论文", "max_results": 5},
    "step_index": 2,
    "total_steps": 5,
    "tokens_used_so_far": 8240,
    "elapsed_seconds": 12.3
}

# errors.log 典型条目示例
ERRORS_LOG_EXAMPLE = {
    "timestamp": "2024-12-24T10:27:15.456Z",
    "level": "ERROR",
    "logger": "errors",
    "message": "Tool execution failed",
    "session_id": "sess_abc123",
    "error_type": "ToolTimeoutError",
    "error_message": "web_search timed out after 30 seconds",
    "state_when_error": "waiting",
    "current_tool": "web_search",
    "tool_input": {"query": "最新 AI 论文"},
    "retry_count": 1,
    "max_retries": 3,
    "will_retry": True,
    "agent_state_snapshot": {
        "plan_steps": ["search", "analyze", "report"],
        "current_step": 0,
        "tokens_used": 8240
    },
    "exception": {
        "type": "ToolTimeoutError",
        "traceback": ["..."]
    }
}

关键日志字段说明

字段 出现在 用途
session_id 两者 关联同一会话的所有日志
timestamp 两者 时序分析,计算阶段耗时
from_state/to_state agent.log 状态转换追踪
tool_name + tool_input 两者 工具调用追踪
tokens_used_so_far agent.log Token 预算监控
retry_count errors.log 判断是否为间歇性问题
agent_state_snapshot errors.log 错误现场还原
will_retry errors.log 区分可恢复和不可恢复错误

63.2 多步工具调用失败的追踪方法

追踪挑战

Hermes Agent 的一次任务可能涉及 5-20 次工具调用,当其中某步失败时,需要快速定位:

  1. 失败发生在哪一步?
  2. 工具的输入参数是否正确?
  3. 失败是否影响后续步骤?
  4. 整体任务是否可以继续执行?

分布式追踪实现

import uuid
import time
import functools
from typing import Callable, Any
from contextlib import contextmanager

class TraceContext:
    """
    分布式追踪上下文
    为每次工具调用生成唯一 span,支持父子关系
    """
    
    def __init__(self, trace_id: str = None, parent_span_id: str = None):
        self.trace_id = trace_id or str(uuid.uuid4())[:8]
        self.parent_span_id = parent_span_id
        self.span_id = str(uuid.uuid4())[:8]
        self.start_time = time.time()
        self.events = []
    
    def add_event(self, name: str, data: dict = None):
        self.events.append({
            "time": time.time() - self.start_time,
            "name": name,
            "data": data or {}
        })
    
    def to_log_dict(self, status: str, error: str = None) -> dict:
        duration = time.time() - self.start_time
        entry = {
            "trace_id": self.trace_id,
            "span_id": self.span_id,
            "parent_span_id": self.parent_span_id,
            "duration_ms": round(duration * 1000, 2),
            "status": status,  # "ok" | "error" | "timeout"
            "events": self.events
        }
        if error:
            entry["error"] = error
        return entry

class InstrumentedToolCaller:
    """
    带追踪的工具调用器
    自动记录每次工具调用的完整上下文
    """
    
    def __init__(self, agent_logger, error_logger):
        self.agent_logger = agent_logger
        self.error_logger = error_logger
        self.call_chain = []  # 记录本次任务的完整调用链
    
    async def call_tool(
        self,
        tool_name: str,
        tool_func: Callable,
        args: dict,
        session_id: str,
        trace_context: TraceContext,
        timeout: float = 30.0
    ) -> Any:
        """
        带追踪的工具调用包装器
        
        自动记录:
        - 调用输入
        - 执行时间
        - 返回结果(摘要)
        - 错误信息
        """
        import asyncio
        
        span = TraceContext(
            trace_id=trace_context.trace_id,
            parent_span_id=trace_context.span_id
        )
        
        call_record = {
            "tool": tool_name,
            "args": args,
            "start_time": time.time()
        }
        
        # 记录调用开始
        self.agent_logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "event": "tool_call_start",
            "session_id": session_id,
            "tool_name": tool_name,
            "args": self._sanitize_args(args),
            "trace_id": span.trace_id,
            "span_id": span.span_id,
        }))
        
        try:
            # 带超时的工具调用
            result = await asyncio.wait_for(
                tool_func(**args) if asyncio.iscoroutinefunction(tool_func) 
                else asyncio.to_thread(tool_func, **args),
                timeout=timeout
            )
            
            duration_ms = (time.time() - call_record["start_time"]) * 1000
            
            # 记录成功结果
            self.agent_logger.info(json.dumps({
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "event": "tool_call_success",
                "session_id": session_id,
                "tool_name": tool_name,
                "duration_ms": round(duration_ms, 2),
                "result_summary": self._summarize_result(result),
                **span.to_log_dict("ok")
            }))
            
            call_record["result"] = "success"
            call_record["duration_ms"] = duration_ms
            self.call_chain.append(call_record)
            
            return result
        
        except asyncio.TimeoutError:
            duration_ms = timeout * 1000
            error_msg = f"Tool '{tool_name}' timed out after {timeout}s"
            
            self.error_logger.error(json.dumps({
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "event": "tool_call_timeout",
                "session_id": session_id,
                "tool_name": tool_name,
                "timeout_seconds": timeout,
                "args": self._sanitize_args(args),
                **span.to_log_dict("timeout", error_msg)
            }))
            
            call_record["result"] = "timeout"
            call_record["error"] = error_msg
            self.call_chain.append(call_record)
            raise
        
        except Exception as e:
            duration_ms = (time.time() - call_record["start_time"]) * 1000
            
            self.error_logger.error(json.dumps({
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "event": "tool_call_error",
                "session_id": session_id,
                "tool_name": tool_name,
                "duration_ms": round(duration_ms, 2),
                "error_type": type(e).__name__,
                "error_message": str(e),
                "args": self._sanitize_args(args),
                **span.to_log_dict("error", str(e))
            }), exc_info=True)
            
            call_record["result"] = "error"
            call_record["error"] = str(e)
            self.call_chain.append(call_record)
            raise
    
    def _sanitize_args(self, args: dict) -> dict:
        """清理敏感参数(密码、token 等)"""
        sensitive_keys = {"password", "token", "api_key", "secret", "credential"}
        return {
            k: "***REDACTED***" if k.lower() in sensitive_keys else v
            for k, v in args.items()
        }
    
    def _summarize_result(self, result: Any) -> str:
        """生成结果摘要(避免记录大型结果)"""
        result_str = str(result)
        if len(result_str) > 200:
            return result_str[:200] + f"... [{len(result_str)} chars total]"
        return result_str
    
    def get_call_chain_report(self) -> dict:
        """生成完整调用链报告"""
        total_time = sum(c.get("duration_ms", 0) for c in self.call_chain)
        failed = [c for c in self.call_chain if c["result"] != "success"]
        
        return {
            "total_tool_calls": len(self.call_chain),
            "successful_calls": len(self.call_chain) - len(failed),
            "failed_calls": len(failed),
            "total_time_ms": round(total_time, 2),
            "call_chain": self.call_chain,
            "failures": failed
        }

63.3 常见 Bug 模式识别

Bug 模式一:工具调用无限循环

症状:agent.log 中同一个工具被反复调用,Agent 无法推进到下一步。

日志特征

{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_success", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2}  // 重复!
{"event": "tool_call_success", "tool_name": "web_search", "step_index": 2}

检测代码

def detect_infinite_loop(log_file: str, session_id: str, threshold: int = 3) -> list:
    """检测工具调用无限循环"""
    from collections import defaultdict
    
    tool_call_counts = defaultdict(int)
    loop_alerts = []
    
    with open(log_file) as f:
        for line in f:
            try:
                entry = json.loads(line)
                if (entry.get("session_id") == session_id and 
                    entry.get("event") == "tool_call_start"):
                    
                    # 包含 step_index 的工具调用key
                    key = f"{entry['tool_name']}@step{entry.get('step_index', '?')}"
                    tool_call_counts[key] += 1
                    
                    if tool_call_counts[key] >= threshold:
                        loop_alerts.append({
                            "tool": entry["tool_name"],
                            "step": entry.get("step_index"),
                            "count": tool_call_counts[key],
                            "timestamp": entry["timestamp"]
                        })
            except (json.JSONDecodeError, KeyError):
                continue
    
    return loop_alerts

# 根本原因:LLM 没有正确理解工具结果,认为任务未完成
# 修复方案:
# 1. 在工具结果中明确标注"此步骤已完成"
# 2. 添加工具调用计数器,超过 N 次后强制跳过
# 3. 在系统提示中明确说明工具调用失败时的处理策略

Bug 模式二:记忆污染

症状:Agent 在新会话中使用了上一个会话的数据,产生不相关或错误的回答。

日志特征

// 新会话开始
{"event": "session_start", "session_id": "sess_new001"}
// 但 Agent 引用了旧数据
{"event": "llm_response", "content": "根据您之前提到的销售数据..."}  // "之前"不存在!

检测和修复

class MemoryPollutionGuard:
    """
    记忆污染检测和预防
    确保会话隔离
    """
    
    def __init__(self):
        self._session_memories = {}
    
    def get_memory(self, session_id: str) -> dict:
        """获取会话专属记忆,确保不跨污染"""
        if session_id not in self._session_memories:
            self._session_memories[session_id] = {
                "conversation_history": [],
                "tool_results": {},
                "working_context": {},
                "created_at": time.time()
            }
        return self._session_memories[session_id]
    
    def clear_session(self, session_id: str):
        """会话结束时完全清除记忆"""
        if session_id in self._session_memories:
            del self._session_memories[session_id]
    
    def audit_cross_session_references(self, session_id: str, content: str) -> list:
        """
        审计响应内容中是否包含其他会话的引用
        检测常见的污染信号
        """
        alerts = []
        current_session_memory = self.get_memory(session_id)
        
        # 检测其他会话的 session_id 出现在内容中
        for other_session_id in self._session_memories:
            if other_session_id != session_id and other_session_id in content:
                alerts.append({
                    "type": "session_id_leak",
                    "leaked_session": other_session_id,
                    "in_session": session_id
                })
        
        # 检测时间悖论(引用"之前"但会话刚开始)
        session_age = time.time() - current_session_memory["created_at"]
        time_references = ["之前", "上次", "previously", "last time", "as mentioned"]
        
        if session_age < 60:  # 会话开始不足60秒
            for ref in time_references:
                if ref in content:
                    alerts.append({
                        "type": "temporal_paradox",
                        "reference": ref,
                        "session_age_seconds": session_age
                    })
        
        return alerts

Bug 模式三:参数解析失败

症状:工具调用失败,错误信息为参数类型错误或必填字段缺失。

日志特征

{"event": "tool_call_error", "error_type": "ValidationError", 
 "error_message": "Missing required field 'query'",
 "args": {"search_term": "AI 论文"}}  // LLM 用了错误的字段名!

自动检测和修复

import re
from typing import Dict, Any

class ToolArgValidator:
    """
    工具参数验证器
    在调用工具前进行参数校验,提前发现问题
    """
    
    def __init__(self, tool_schemas: dict):
        self.schemas = tool_schemas
        self.error_patterns = []  # 记录历史错误模式
    
    def validate(self, tool_name: str, args: dict) -> tuple:
        """
        验证工具参数
        
        Returns:
            (is_valid, error_message, corrected_args)
        """
        schema = self.schemas.get(tool_name)
        if not schema:
            return False, f"Unknown tool: {tool_name}", args
        
        required_fields = schema.get("required", [])
        properties = schema.get("properties", {})
        
        errors = []
        corrected = dict(args)
        
        # 检查必填字段
        for field in required_fields:
            if field not in args:
                # 尝试模糊匹配(处理字段名拼写变体)
                fuzzy_match = self._fuzzy_find_field(field, args)
                if fuzzy_match:
                    corrected[field] = corrected.pop(fuzzy_match)
                    errors.append(f"Auto-corrected: '{fuzzy_match}' → '{field}'")
                else:
                    errors.append(f"Missing required field: '{field}'")
        
        # 检查类型
        for field, value in args.items():
            if field in properties:
                expected_type = properties[field].get("type")
                if not self._check_type(value, expected_type):
                    # 尝试类型转换
                    converted = self._try_convert(value, expected_type)
                    if converted is not None:
                        corrected[field] = converted
                        errors.append(f"Auto-converted '{field}': {type(value).__name__} → {expected_type}")
                    else:
                        errors.append(f"Type mismatch for '{field}': expected {expected_type}, got {type(value).__name__}")
        
        is_valid = all("Missing" not in e for e in errors)
        return is_valid, "; ".join(errors), corrected
    
    def _fuzzy_find_field(self, target: str, args: dict) -> Optional[str]:
        """模糊匹配字段名(处理同义词和拼写变体)"""
        synonyms = {
            "query": ["search_term", "q", "search", "keyword", "keywords"],
            "url": ["link", "href", "address", "web_url"],
            "file_path": ["path", "filepath", "filename", "file"],
            "max_results": ["limit", "count", "n", "num_results", "top_k"]
        }
        
        for canonical, variants in synonyms.items():
            if target == canonical:
                for variant in variants:
                    if variant in args:
                        return variant
        return None
    
    def _check_type(self, value: Any, expected_type: str) -> bool:
        type_map = {"string": str, "integer": int, "number": (int, float), "boolean": bool, "array": list}
        expected = type_map.get(expected_type)
        return isinstance(value, expected) if expected else True
    
    def _try_convert(self, value: Any, expected_type: str) -> Optional[Any]:
        try:
            if expected_type == "string":
                return str(value)
            elif expected_type == "integer":
                return int(value)
            elif expected_type == "number":
                return float(value)
            elif expected_type == "boolean":
                return bool(value)
        except (ValueError, TypeError):
            pass
        return None
    
    def record_error_pattern(self, tool_name: str, error: str, args: dict):
        """记录错误模式,用于后续分析"""
        self.error_patterns.append({
            "timestamp": datetime.utcnow().isoformat(),
            "tool": tool_name,
            "error": error,
            "args_keys": list(args.keys())
        })
    
    def get_frequent_errors(self, top_n: int = 5) -> list:
        """获取最常见的错误模式"""
        from collections import Counter
        error_counts = Counter(p["error"] for p in self.error_patterns)
        return error_counts.most_common(top_n)

63.4 构建调试仪表盘:ELK Stack 配置

架构概述

Hermes Agent → agent.log → Filebeat → Logstash → Elasticsearch → Kibana
                errors.log ↗

Logstash 配置

# /etc/logstash/conf.d/hermes-agent.conf

input {
  file {
    path => ["/var/log/hermes/agent.log", "/var/log/hermes/errors.log"]
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb"
    tags => ["hermes_agent"]
  }
}

filter {
  # 解析 JSON 格式日志
  json {
    source => "message"
    target => "parsed"
    remove_field => ["message"]
  }
  
  # 将解析出的字段提升到顶层
  ruby {
    code => "
      if event.get('parsed')
        event.get('parsed').each { |k, v| event.set(k, v) }
        event.remove('parsed')
      end
    "
  }
  
  # 按日志文件分类
  if "/errors.log" in [path] {
    mutate { add_field => { "log_type" => "error" } }
  } else {
    mutate { add_field => { "log_type" => "agent" } }
  }
  
  # 计算 duration 到毫秒
  if [elapsed_seconds] {
    ruby {
      code => "event.set('elapsed_ms', (event.get('elapsed_seconds') * 1000).round(2))"
    }
  }
  
  # 提取会话 ID 作为索引字段
  if [session_id] {
    mutate {
      add_field => { "[@metadata][session_id]" => "%{session_id}" }
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "hermes-agent-%{+YYYY.MM.dd}"
    template_name => "hermes-agent"
    template_overwrite => true
  }
}

Kibana 仪表盘关键查询

// 1. 工具调用成功率(按工具分类)
{
  "query": {
    "bool": {
      "must": [
        {"term": {"event": "tool_call_start"}},
        {"range": {"@timestamp": {"gte": "now-1h"}}}
      ]
    }
  },
  "aggs": {
    "by_tool": {
      "terms": {"field": "tool_name.keyword"},
      "aggs": {
        "success_rate": {
          "filter": {"term": {"log_type": "agent"}}
        },
        "error_rate": {
          "filter": {"term": {"log_type": "error"}}
        }
      }
    }
  }
}

// 2. 平均工具调用耗时(Top 10 慢工具)
{
  "query": {"term": {"event": "tool_call_success"}},
  "aggs": {
    "by_tool": {
      "terms": {"field": "tool_name.keyword", "size": 10},
      "aggs": {
        "avg_duration": {"avg": {"field": "duration_ms"}},
        "p95_duration": {"percentiles": {"field": "duration_ms", "percents": [95]}}
      }
    }
  }
}

Elasticsearch 索引模板

{
  "index_patterns": ["hermes-agent-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "properties": {
      "timestamp": {"type": "date"},
      "session_id": {"type": "keyword"},
      "event": {"type": "keyword"},
      "level": {"type": "keyword"},
      "tool_name": {"type": "keyword"},
      "from_state": {"type": "keyword"},
      "to_state": {"type": "keyword"},
      "duration_ms": {"type": "float"},
      "tokens_used_so_far": {"type": "integer"},
      "retry_count": {"type": "integer"},
      "log_type": {"type": "keyword"},
      "error_type": {"type": "keyword"},
      "message": {"type": "text", "analyzer": "standard"}
    }
  }
}

63.5 Session 重放(Replay)实现

什么是 Session 重放

Session 重放允许开发者精确重现 Agent 曾经执行的任务流程,用于:

重放系统实现

import json
import asyncio
from typing import Optional, Callable
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ReplayEvent:
    timestamp: str
    event_type: str
    data: dict

class SessionReplayer:
    """
    Session 重放器
    从 agent.log 重现 Agent 的完整执行过程
    """
    
    def __init__(self, log_file: str = "/var/log/hermes/agent.log"):
        self.log_file = log_file
    
    def load_session(self, session_id: str) -> list:
        """从日志文件加载指定 session 的所有事件"""
        events = []
        
        with open(self.log_file, 'r') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    entry = json.loads(line)
                    if entry.get("session_id") == session_id:
                        events.append(ReplayEvent(
                            timestamp=entry.get("timestamp", ""),
                            event_type=entry.get("event", entry.get("message", "")),
                            data=entry
                        ))
                except json.JSONDecodeError:
                    continue
        
        # 按时间排序
        events.sort(key=lambda e: e.timestamp)
        return events
    
    def print_timeline(self, session_id: str):
        """打印会话时间线(可视化执行流程)"""
        events = self.load_session(session_id)
        
        if not events:
            print(f"No events found for session {session_id}")
            return
        
        print(f"\n{'='*60}")
        print(f"Session Replay: {session_id}")
        print(f"Total Events: {len(events)}")
        print(f"{'='*60}\n")
        
        # 计算相对时间
        start_time = datetime.fromisoformat(events[0].timestamp.rstrip('Z'))
        
        for event in events:
            try:
                event_time = datetime.fromisoformat(event.timestamp.rstrip('Z'))
                elapsed = (event_time - start_time).total_seconds()
            except:
                elapsed = 0
            
            event_type = event.event_type
            data = event.data
            
            # 格式化输出
            prefix = "  "
            if "error" in event_type.lower():
                prefix = "❌"
            elif "success" in event_type.lower():
                prefix = "✅"
            elif "start" in event_type.lower():
                prefix = "▶️ "
            elif "transition" in event_type.lower():
                prefix = "→ "
            
            print(f"[T+{elapsed:6.2f}s] {prefix} {event_type}")
            
            # 打印关键字段
            for key in ["tool_name", "from_state", "to_state", "error_message", 
                       "tokens_used_so_far", "step_index"]:
                if key in data and data[key] is not None:
                    print(f"            {key}: {data[key]}")
            
            print()
    
    async def replay_with_mocks(
        self,
        session_id: str,
        tool_mocks: dict,  # {tool_name: mock_response}
        on_state_change: Optional[Callable] = None
    ) -> dict:
        """
        使用模拟工具重放 Session
        
        Args:
            session_id: 要重放的会话 ID
            tool_mocks: 工具模拟响应 {tool_name: mock_result}
            on_state_change: 状态变化时的回调函数
        
        Returns:
            重放结果报告
        """
        events = self.load_session(session_id)
        
        replay_results = []
        tool_calls_in_replay = []
        
        for event in events:
            event_type = event.event_type
            data = event.data
            
            if event_type == "tool_call_start":
                tool_name = data.get("tool_name")
                args = data.get("args", {})
                
                # 使用模拟工具或实际工具
                if tool_name in tool_mocks:
                    mock = tool_mocks[tool_name]
                    result = mock(args) if callable(mock) else mock
                    status = "mocked"
                else:
                    result = None
                    status = "skipped_no_mock"
                
                tool_calls_in_replay.append({
                    "tool": tool_name,
                    "args": args,
                    "result": str(result)[:100],
                    "status": status
                })
            
            elif event_type == "State transition" and on_state_change:
                await on_state_change(
                    from_state=data.get("from_state"),
                    to_state=data.get("to_state"),
                    trigger=data.get("trigger")
                )
            
            # 模拟真实时序(可选,用于调试时序相关 Bug)
            # await asyncio.sleep(0.01)
        
        return {
            "session_id": session_id,
            "total_events": len(events),
            "tool_calls": tool_calls_in_replay,
            "replay_complete": True
        }
    
    def generate_test_case(self, session_id: str) -> str:
        """
        从 Session 日志自动生成 pytest 测试用例
        """
        events = self.load_session(session_id)
        tool_calls = [e for e in events if e.event_type == "tool_call_start"]
        
        test_code = f'''
import pytest
import asyncio

# Auto-generated test case from session {session_id}
# Generated at: {datetime.now().isoformat()}

class TestSession{session_id.replace("-", "_")}:
    """
    Replay test for session {session_id}
    {len(events)} total events, {len(tool_calls)} tool calls
    """
    
    @pytest.fixture
    def agent(self):
        from hermes_agent import HermesAgentFSM
        agent = HermesAgentFSM(session_id="replay_{session_id}")
        # Register required tools
'''
        
        # 提取所有用到的工具
        tools_used = set()
        for event in events:
            if event.event_type == "tool_call_start":
                tools_used.add(event.data.get("tool_name", ""))
        
        for tool in tools_used:
            if tool:
                test_code += f'        agent.register_tool("{tool}")\n'
        
        test_code += '''        return agent
    
    def test_replay(self, agent):
        """Verify Agent reproduces the same state transitions"""
        replayer = SessionReplayer()
        result = asyncio.run(replayer.replay_with_mocks(
            session_id="{session_id}",
            tool_mocks={{
'''.format(session_id=session_id)
        
        for event in events:
            if event.event_type == "tool_call_success":
                tool = event.data.get("tool_name", "")
                result_summary = event.data.get("result_summary", "{}")
                test_code += f'                "{tool}": "{result_summary}",\n'
        
        test_code += '''            }
        ))
        assert result["replay_complete"] == True
        assert result["total_events"] > 0
'''
        
        return test_code

# 使用示例
replayer = SessionReplayer("/var/log/hermes/agent.log")

# 打印会话时间线
replayer.print_timeline("sess_abc123")

# 生成测试用例
test_code = replayer.generate_test_case("sess_abc123")
with open("test_session_replay.py", "w") as f:
    f.write(test_code)
print("测试用例已生成:test_session_replay.py")

本章小结

完整的调试方法论让 Hermes Agent 从黑盒变成透明系统:

  1. 双日志架构:agent.log 记录正常流程的所有状态转换,errors.log 记录带上下文的完整错误信息,两者配合实现全面覆盖
  2. 分布式追踪:每个工具调用生成 TraceID/SpanID,支持跨工具调用的调用链分析
  3. Bug 模式识别:无限循环(工具被重复调用)、记忆污染(会话数据交叉)、参数解析失败(字段名错误)是最常见的三种故障模式,均有自动检测方案
  4. ELK 仪表盘:实时监控工具成功率、P95 延迟、Token 使用趋势,提前发现问题
  5. Session 重放:从日志重现执行过程,自动生成测试用例,构成完整的调试-验证闭环

思考题

  1. 当 Agent 产生的输出被用户标记为"错误"时,如何设计反向追踪(从输出追溯到导致错误的工具调用)?
  2. 在生产环境中,完整记录所有工具调用的输入输出会产生大量数据,如何设计"按需采样"的日志策略?
  3. 如果同一个 Bug 在 100 次调用中只出现 1 次,传统的复现方式难以奏效,如何设计"概率性 Bug 追踪"系统?
  4. Session 重放测试中,如果工具的实际网络响应随时间变化(如网页内容更新),如何保证测试的确定性?
本章评分
4.8  / 5  (3 评分)

💬 留言讨论