Chapter 63

Debugging Methodology: agent.log Analysis and Tracing

Chapter 63: Debugging Methodology — agent.log Analysis and Tracing

Debugging an Agent system is like searching for someone in a dark forest—you don't know which way they went or whether they're still moving. Logs are the flashlight in your hand, tracing is your compass, and methodology is the map that keeps you from getting lost.


63.1 Log Structure: agent.log vs. errors.log

The Two-Log Architecture

Hermes Agent uses two separate log files, each serving a distinct purpose:

agent.log — Normal Flow Log

errors.log — Error-Specific Log

Log Format Specification

import logging
import json
import traceback
from datetime import datetime

class StructuredFormatter(logging.Formatter):
    """JSON-format log formatter for ELK Stack compatibility"""
    
    def format(self, record: logging.LogRecord) -> str:
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if hasattr(record, 'extra'):
            log_entry.update(record.extra)
        if record.exc_info:
            log_entry["exception"] = {
                "type": record.exc_info[0].__name__ if record.exc_info[0] else None,
                "message": str(record.exc_info[1]),
                "traceback": traceback.format_exception(*record.exc_info)
            }
        return json.dumps(log_entry, ensure_ascii=False)

def setup_hermes_logging(log_dir: str = "/var/log/hermes"):
    import os
    os.makedirs(log_dir, exist_ok=True)
    formatter = StructuredFormatter()
    
    agent_logger = logging.getLogger('agent')
    agent_logger.setLevel(logging.DEBUG)
    agent_handler = logging.FileHandler(f"{log_dir}/agent.log")
    agent_handler.setFormatter(formatter)
    agent_logger.addHandler(agent_handler)
    
    error_logger = logging.getLogger('errors')
    error_logger.setLevel(logging.WARNING)
    error_handler = logging.FileHandler(f"{log_dir}/errors.log")
    error_handler.setFormatter(formatter)
    error_logger.addHandler(error_handler)
    
    return agent_logger, error_logger

Key Log Fields Reference

Field Appears In Purpose
session_id Both Correlate all logs for one session
timestamp Both Time-sequence analysis, phase duration
from_state/to_state agent.log State transition tracking
tool_name + tool_input Both Tool call tracing
tokens_used_so_far agent.log Token budget monitoring
retry_count errors.log Determine if error is intermittent
agent_state_snapshot errors.log Reconstruct error scene
will_retry errors.log Distinguish recoverable vs. fatal errors

Example agent.log entry:

{
  "timestamp": "2024-12-24T10:26:40.123Z",
  "level": "INFO", "logger": "agent", "message": "State transition",
  "session_id": "sess_abc123", "event": "tool_called",
  "from_state": "executing", "to_state": "waiting",
  "tool_name": "web_search", "tool_input": {"query": "latest AI papers"},
  "step_index": 2, "total_steps": 5, "tokens_used_so_far": 8240
}

Example errors.log entry:

{
  "timestamp": "2024-12-24T10:27:15.456Z",
  "level": "ERROR", "logger": "errors", "message": "Tool execution failed",
  "session_id": "sess_abc123", "error_type": "ToolTimeoutError",
  "error_message": "web_search timed out after 30 seconds",
  "state_when_error": "waiting", "retry_count": 1, "will_retry": true,
  "agent_state_snapshot": {"plan_steps": ["search","analyze","report"], "current_step": 0}
}

63.2 Tracing Multi-Step Tool Call Failures

import uuid
import time
import asyncio
import json
from datetime import datetime
from typing import Any, Callable, Optional

class TraceContext:
    def __init__(self, trace_id: str = None, parent_span_id: str = None):
        self.trace_id = trace_id or str(uuid.uuid4())[:8]
        self.parent_span_id = parent_span_id
        self.span_id = str(uuid.uuid4())[:8]
        self.start_time = time.time()
    
    def to_log_dict(self, status: str, error: str = None) -> dict:
        entry = {
            "trace_id": self.trace_id, "span_id": self.span_id,
            "parent_span_id": self.parent_span_id,
            "duration_ms": round((time.time() - self.start_time) * 1000, 2),
            "status": status,
        }
        if error:
            entry["error"] = error
        return entry

class InstrumentedToolCaller:
    def __init__(self, agent_logger, error_logger):
        self.agent_logger = agent_logger
        self.error_logger = error_logger
        self.call_chain = []
    
    async def call_tool(self, tool_name: str, tool_func: Callable, args: dict,
                        session_id: str, trace_context: TraceContext,
                        timeout: float = 30.0) -> Any:
        span = TraceContext(trace_id=trace_context.trace_id, parent_span_id=trace_context.span_id)
        call_record = {"tool": tool_name, "args": args, "start_time": time.time()}
        
        self.agent_logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "event": "tool_call_start", "session_id": session_id,
            "tool_name": tool_name, "args": self._sanitize_args(args),
            "trace_id": span.trace_id, "span_id": span.span_id,
        }))
        
        try:
            result = await asyncio.wait_for(
                tool_func(**args) if asyncio.iscoroutinefunction(tool_func)
                else asyncio.to_thread(tool_func, **args),
                timeout=timeout
            )
            duration_ms = (time.time() - call_record["start_time"]) * 1000
            self.agent_logger.info(json.dumps({
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "event": "tool_call_success", "session_id": session_id,
                "tool_name": tool_name, "duration_ms": round(duration_ms, 2),
                "result_summary": str(result)[:200],
                **span.to_log_dict("ok")
            }))
            call_record.update({"result": "success", "duration_ms": duration_ms})
            self.call_chain.append(call_record)
            return result
        
        except asyncio.TimeoutError:
            error_msg = f"Tool '{tool_name}' timed out after {timeout}s"
            self.error_logger.error(json.dumps({
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "event": "tool_call_timeout", "session_id": session_id,
                "tool_name": tool_name, "timeout_seconds": timeout,
                **span.to_log_dict("timeout", error_msg)
            }))
            call_record.update({"result": "timeout", "error": error_msg})
            self.call_chain.append(call_record)
            raise
        
        except Exception as e:
            duration_ms = (time.time() - call_record["start_time"]) * 1000
            self.error_logger.error(json.dumps({
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "event": "tool_call_error", "session_id": session_id,
                "tool_name": tool_name, "duration_ms": round(duration_ms, 2),
                "error_type": type(e).__name__, "error_message": str(e),
                **span.to_log_dict("error", str(e))
            }), exc_info=True)
            call_record.update({"result": "error", "error": str(e)})
            self.call_chain.append(call_record)
            raise
    
    def _sanitize_args(self, args: dict) -> dict:
        sensitive = {"password", "token", "api_key", "secret", "credential"}
        return {k: "***REDACTED***" if k.lower() in sensitive else v for k, v in args.items()}
    
    def get_call_chain_report(self) -> dict:
        failed = [c for c in self.call_chain if c["result"] != "success"]
        return {
            "total_calls": len(self.call_chain),
            "successful": len(self.call_chain) - len(failed),
            "failed": len(failed),
            "call_chain": self.call_chain,
            "failures": failed
        }

63.3 Common Bug Pattern Recognition

Pattern 1: Infinite Tool Call Loop

Symptom: The same tool called repeatedly at the same step; Agent never advances.

Log signature:

{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_success", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2}  // REPEATED

Detection:

def detect_infinite_loop(log_file: str, session_id: str, threshold: int = 3) -> list:
    from collections import defaultdict
    tool_call_counts = defaultdict(int)
    alerts = []
    
    with open(log_file) as f:
        for line in f:
            try:
                entry = json.loads(line)
                if (entry.get("session_id") == session_id and
                        entry.get("event") == "tool_call_start"):
                    key = f"{entry['tool_name']}@step{entry.get('step_index', '?')}"
                    tool_call_counts[key] += 1
                    if tool_call_counts[key] >= threshold:
                        alerts.append({
                            "tool": entry["tool_name"],
                            "step": entry.get("step_index"),
                            "count": tool_call_counts[key]
                        })
            except (json.JSONDecodeError, KeyError):
                continue
    return alerts

# Root cause: LLM misunderstood tool result, thinks task is incomplete
# Fix: Add explicit "step complete" signal in tool result; add per-step call counter with max

Pattern 2: Memory Pollution (Cross-Session Contamination)

Symptom: Agent in a new session references data from a previous session.

Log signature:

{"event": "session_start", "session_id": "sess_new001"}
{"event": "llm_response", "content": "Based on the data you mentioned earlier..."}
// "earlier" doesn't exist in this session!

Detection and fix:

import time

class MemoryPollutionGuard:
    def __init__(self):
        self._session_memories = {}
    
    def get_memory(self, session_id: str) -> dict:
        if session_id not in self._session_memories:
            self._session_memories[session_id] = {
                "conversation_history": [], "tool_results": {},
                "created_at": time.time()
            }
        return self._session_memories[session_id]
    
    def clear_session(self, session_id: str):
        self._session_memories.pop(session_id, None)
    
    def audit_cross_references(self, session_id: str, content: str) -> list:
        alerts = []
        session_age = time.time() - self.get_memory(session_id)["created_at"]
        
        # Check for leaked session IDs
        for other_id in self._session_memories:
            if other_id != session_id and other_id in content:
                alerts.append({"type": "session_id_leak", "leaked": other_id})
        
        # Temporal paradox: new session referencing past data
        if session_age < 60:
            for ref in ["previously", "last time", "as mentioned earlier", "you said before"]:
                if ref in content.lower():
                    alerts.append({"type": "temporal_paradox", "reference": ref,
                                   "session_age_s": session_age})
        return alerts

Pattern 3: Argument Parsing Failure

Symptom: Tool calls fail with validation errors or missing required fields.

Log signature:

{"event": "tool_call_error", "error_type": "ValidationError",
 "error_message": "Missing required field 'query'",
 "args": {"search_term": "AI papers"}}  // LLM used wrong field name!

Auto-detection and correction:

from typing import Any, Optional

class ToolArgValidator:
    SYNONYMS = {
        "query": ["search_term", "q", "search", "keyword"],
        "url": ["link", "href", "address"],
        "file_path": ["path", "filepath", "filename"],
        "max_results": ["limit", "count", "n", "top_k"]
    }
    
    def __init__(self, tool_schemas: dict):
        self.schemas = tool_schemas
    
    def validate(self, tool_name: str, args: dict) -> tuple:
        schema = self.schemas.get(tool_name)
        if not schema:
            return False, f"Unknown tool: {tool_name}", args
        
        required = schema.get("required", [])
        properties = schema.get("properties", {})
        errors = []
        corrected = dict(args)
        
        for field in required:
            if field not in args:
                fuzzy = self._fuzzy_find(field, args)
                if fuzzy:
                    corrected[field] = corrected.pop(fuzzy)
                    errors.append(f"Auto-corrected: '{fuzzy}' → '{field}'")
                else:
                    errors.append(f"Missing required: '{field}'")
        
        for field, value in args.items():
            if field in properties:
                expected = properties[field].get("type")
                if not self._check_type(value, expected):
                    converted = self._try_convert(value, expected)
                    if converted is not None:
                        corrected[field] = converted
                        errors.append(f"Auto-converted '{field}' to {expected}")
                    else:
                        errors.append(f"Type error '{field}': expected {expected}")
        
        is_valid = all("Missing" not in e for e in errors)
        return is_valid, "; ".join(errors), corrected
    
    def _fuzzy_find(self, target: str, args: dict) -> Optional[str]:
        for canonical, variants in self.SYNONYMS.items():
            if target == canonical:
                for v in variants:
                    if v in args:
                        return v
        return None
    
    def _check_type(self, value: Any, expected: str) -> bool:
        type_map = {"string": str, "integer": int, "number": (int, float),
                    "boolean": bool, "array": list}
        t = type_map.get(expected)
        return isinstance(value, t) if t else True
    
    def _try_convert(self, value: Any, expected: str) -> Optional[Any]:
        try:
            if expected == "string": return str(value)
            if expected == "integer": return int(value)
            if expected == "number": return float(value)
            if expected == "boolean": return bool(value)
        except (ValueError, TypeError):
            pass
        return None

63.4 Building a Debug Dashboard with ELK Stack

Architecture

Hermes Agent → agent.log  → Filebeat → Logstash → Elasticsearch → Kibana
               errors.log ↗

Logstash Configuration

# /etc/logstash/conf.d/hermes-agent.conf
input {
  file {
    path => ["/var/log/hermes/agent.log", "/var/log/hermes/errors.log"]
    start_position => "beginning"
    tags => ["hermes_agent"]
  }
}

filter {
  json { source => "message" target => "parsed" remove_field => ["message"] }
  ruby {
    code => "
      if event.get('parsed')
        event.get('parsed').each { |k, v| event.set(k, v) }
        event.remove('parsed')
      end
    "
  }
  if "/errors.log" in [path] {
    mutate { add_field => { "log_type" => "error" } }
  } else {
    mutate { add_field => { "log_type" => "agent" } }
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "hermes-agent-%{+YYYY.MM.dd}"
  }
}

Key Kibana Queries

// Tool call success rate by tool
{
  "query": {"bool": {"must": [{"term": {"event": "tool_call_start"}},
    {"range": {"@timestamp": {"gte": "now-1h"}}}]}},
  "aggs": {"by_tool": {"terms": {"field": "tool_name.keyword"},
    "aggs": {"error_rate": {"filter": {"term": {"log_type": "error"}}}}}}
}

// Top 10 slowest tools (P95 latency)
{
  "query": {"term": {"event": "tool_call_success"}},
  "aggs": {"by_tool": {"terms": {"field": "tool_name.keyword", "size": 10},
    "aggs": {"p95_ms": {"percentiles": {"field": "duration_ms", "percents": [95]}}}}}
}

Elasticsearch Index Template

{
  "index_patterns": ["hermes-agent-*"],
  "mappings": {
    "properties": {
      "timestamp": {"type": "date"},
      "session_id": {"type": "keyword"},
      "event": {"type": "keyword"},
      "tool_name": {"type": "keyword"},
      "from_state": {"type": "keyword"},
      "to_state": {"type": "keyword"},
      "duration_ms": {"type": "float"},
      "tokens_used_so_far": {"type": "integer"},
      "retry_count": {"type": "integer"},
      "log_type": {"type": "keyword"},
      "error_type": {"type": "keyword"}
    }
  }
}

63.5 Session Replay Implementation

What Is Session Replay?

Session replay lets developers exactly reproduce an Agent's task execution flow from logs, enabling:

Replayer Implementation

import json
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable

@dataclass
class ReplayEvent:
    timestamp: str
    event_type: str
    data: dict

class SessionReplayer:
    def __init__(self, log_file: str = "/var/log/hermes/agent.log"):
        self.log_file = log_file
    
    def load_session(self, session_id: str) -> list:
        events = []
        with open(self.log_file) as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    entry = json.loads(line)
                    if entry.get("session_id") == session_id:
                        events.append(ReplayEvent(
                            timestamp=entry.get("timestamp", ""),
                            event_type=entry.get("event", entry.get("message", "")),
                            data=entry
                        ))
                except json.JSONDecodeError:
                    continue
        events.sort(key=lambda e: e.timestamp)
        return events
    
    def print_timeline(self, session_id: str):
        events = self.load_session(session_id)
        if not events:
            print(f"No events found for session {session_id}")
            return
        
        print(f"\n{'='*60}\nSession Replay: {session_id}\n"
              f"Total Events: {len(events)}\n{'='*60}\n")
        
        start = datetime.fromisoformat(events[0].timestamp.rstrip('Z'))
        
        for event in events:
            try:
                elapsed = (datetime.fromisoformat(event.timestamp.rstrip('Z')) - start).total_seconds()
            except:
                elapsed = 0
            
            prefix = "ERR" if "error" in event.event_type.lower() else "   "
            print(f"[T+{elapsed:6.2f}s] {prefix} {event.event_type}")
            for key in ["tool_name", "from_state", "to_state", "error_message", "tokens_used_so_far"]:
                if key in event.data and event.data[key] is not None:
                    print(f"            {key}: {event.data[key]}")
            print()
    
    async def replay_with_mocks(
        self, session_id: str, tool_mocks: dict,
        on_state_change: Optional[Callable] = None
    ) -> dict:
        events = self.load_session(session_id)
        tool_calls = []
        
        for event in events:
            if event.event_type == "tool_call_start":
                tool_name = event.data.get("tool_name")
                args = event.data.get("args", {})
                
                if tool_name in tool_mocks:
                    mock = tool_mocks[tool_name]
                    result = mock(args) if callable(mock) else mock
                    status = "mocked"
                else:
                    result = None
                    status = "skipped_no_mock"
                
                tool_calls.append({"tool": tool_name, "result_status": status,
                                   "result": str(result)[:100]})
            
            elif "transition" in event.event_type.lower() and on_state_change:
                await on_state_change(
                    from_state=event.data.get("from_state"),
                    to_state=event.data.get("to_state"),
                    trigger=event.data.get("trigger")
                )
        
        return {"session_id": session_id, "total_events": len(events),
                "tool_calls": tool_calls, "replay_complete": True}
    
    def generate_test_case(self, session_id: str) -> str:
        events = self.load_session(session_id)
        tool_calls = [e for e in events if e.event_type == "tool_call_start"]
        tools_used = {e.data.get("tool_name", "") for e in events
                      if e.event_type == "tool_call_start"}
        
        tool_registrations = "\n".join(
            f'        agent.register_tool("{t}")' for t in tools_used if t
        )
        mock_entries = "\n".join(
            f'                "{e.data.get("tool_name")}": "mock_result",'
            for e in events if e.event_type == "tool_call_success"
        )
        
        return f'''import pytest
import asyncio
from hermes_agent import HermesAgentFSM

# Auto-generated from session {session_id} ({len(events)} events, {len(tool_calls)} tool calls)

class TestSession_{session_id.replace("-", "_")}:
    @pytest.fixture
    def agent(self):
        agent = HermesAgentFSM(session_id="replay")
{tool_registrations}
        return agent
    
    def test_replay(self, agent):
        from session_replayer import SessionReplayer
        replayer = SessionReplayer()
        result = asyncio.run(replayer.replay_with_mocks(
            session_id="{session_id}",
            tool_mocks={{
{mock_entries}
            }}
        ))
        assert result["replay_complete"] is True
        assert result["total_events"] > 0
'''

# Usage
replayer = SessionReplayer("/var/log/hermes/agent.log")
replayer.print_timeline("sess_abc123")

test_code = replayer.generate_test_case("sess_abc123")
with open("test_session_replay.py", "w") as f:
    f.write(test_code)
print("Test case generated: test_session_replay.py")

Chapter Summary

A complete debugging methodology transforms Hermes Agent from a black box into a transparent system:

  1. Dual-log architecture: agent.log records all state transitions in normal flow; errors.log records full error context—together they provide comprehensive coverage
  2. Distributed tracing: Each tool call generates a TraceID/SpanID, enabling cross-tool call chain analysis
  3. Bug pattern recognition: Infinite loops, memory pollution, and argument parsing failures are the three most common failure modes—all with automated detection solutions
  4. ELK dashboard: Real-time monitoring of tool success rates, P95 latency, and token usage trends catches issues before they escalate
  5. Session replay: Reproducing execution from logs and auto-generating test cases completes the debug-verify feedback loop

Review Questions

  1. When an Agent's output is flagged by a user as incorrect, how would you design reverse tracing—working backward from the output to the specific tool call that caused the error?
  2. In production, fully logging all tool call inputs and outputs generates massive data volumes. How would you design an "on-demand sampling" log strategy?
  3. If a bug appears in only 1 out of 100 calls, traditional reproduction approaches are unreliable. How would you design a "probabilistic bug tracing" system?
  4. In session replay testing, if tool responses change over time (e.g., web page content updates), how do you ensure test determinism?
Rate this chapter
4.8  / 5  (3 ratings)

💬 Comments