Debugging Methodology: agent.log Analysis and Tracing
Chapter 63: Debugging Methodology โ agent.log Analysis and Tracing
Debugging an Agent system is like searching for someone in a dark forestโyou don't know which way they went or whether they're still moving. Logs are the flashlight in your hand, tracing is your compass, and methodology is the map that keeps you from getting lost.
63.1 Log Structure: agent.log vs. errors.log
The Two-Log Architecture
Hermes Agent uses two separate log files, each serving a distinct purpose:
agent.log โ Normal Flow Log
- Every state transition
- Tool call inputs and outputs
- Token usage statistics
- Performance metrics (latency, batch size, etc.)
- Format: Structured JSON for programmatic parsing
errors.log โ Error-Specific Log
- All exceptions and errors with full stack traces
- Agent state snapshot at time of error
- Retry information and final outcomes
- Format: Structured JSON with full context
Log Format Specification
import logging
import json
import traceback
from datetime import datetime
class StructuredFormatter(logging.Formatter):
"""JSON-format log formatter for ELK Stack compatibility"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, 'extra'):
log_entry.update(record.extra)
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__ if record.exc_info[0] else None,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
return json.dumps(log_entry, ensure_ascii=False)
def setup_hermes_logging(log_dir: str = "/var/log/hermes"):
import os
os.makedirs(log_dir, exist_ok=True)
formatter = StructuredFormatter()
agent_logger = logging.getLogger('agent')
agent_logger.setLevel(logging.DEBUG)
agent_handler = logging.FileHandler(f"{log_dir}/agent.log")
agent_handler.setFormatter(formatter)
agent_logger.addHandler(agent_handler)
error_logger = logging.getLogger('errors')
error_logger.setLevel(logging.WARNING)
error_handler = logging.FileHandler(f"{log_dir}/errors.log")
error_handler.setFormatter(formatter)
error_logger.addHandler(error_handler)
return agent_logger, error_logger
Key Log Fields Reference
| Field | Appears In | Purpose |
|---|---|---|
session_id |
Both | Correlate all logs for one session |
timestamp |
Both | Time-sequence analysis, phase duration |
from_state/to_state |
agent.log | State transition tracking |
tool_name + tool_input |
Both | Tool call tracing |
tokens_used_so_far |
agent.log | Token budget monitoring |
retry_count |
errors.log | Determine if error is intermittent |
agent_state_snapshot |
errors.log | Reconstruct error scene |
will_retry |
errors.log | Distinguish recoverable vs. fatal errors |
Example agent.log entry:
{
"timestamp": "2024-12-24T10:26:40.123Z",
"level": "INFO", "logger": "agent", "message": "State transition",
"session_id": "sess_abc123", "event": "tool_called",
"from_state": "executing", "to_state": "waiting",
"tool_name": "web_search", "tool_input": {"query": "latest AI papers"},
"step_index": 2, "total_steps": 5, "tokens_used_so_far": 8240
}
Example errors.log entry:
{
"timestamp": "2024-12-24T10:27:15.456Z",
"level": "ERROR", "logger": "errors", "message": "Tool execution failed",
"session_id": "sess_abc123", "error_type": "ToolTimeoutError",
"error_message": "web_search timed out after 30 seconds",
"state_when_error": "waiting", "retry_count": 1, "will_retry": true,
"agent_state_snapshot": {"plan_steps": ["search","analyze","report"], "current_step": 0}
}
63.2 Tracing Multi-Step Tool Call Failures
import uuid
import time
import asyncio
import json
from datetime import datetime
from typing import Any, Callable, Optional
class TraceContext:
def __init__(self, trace_id: str = None, parent_span_id: str = None):
self.trace_id = trace_id or str(uuid.uuid4())[:8]
self.parent_span_id = parent_span_id
self.span_id = str(uuid.uuid4())[:8]
self.start_time = time.time()
def to_log_dict(self, status: str, error: str = None) -> dict:
entry = {
"trace_id": self.trace_id, "span_id": self.span_id,
"parent_span_id": self.parent_span_id,
"duration_ms": round((time.time() - self.start_time) * 1000, 2),
"status": status,
}
if error:
entry["error"] = error
return entry
class InstrumentedToolCaller:
def __init__(self, agent_logger, error_logger):
self.agent_logger = agent_logger
self.error_logger = error_logger
self.call_chain = []
async def call_tool(self, tool_name: str, tool_func: Callable, args: dict,
session_id: str, trace_context: TraceContext,
timeout: float = 30.0) -> Any:
span = TraceContext(trace_id=trace_context.trace_id, parent_span_id=trace_context.span_id)
call_record = {"tool": tool_name, "args": args, "start_time": time.time()}
self.agent_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_start", "session_id": session_id,
"tool_name": tool_name, "args": self._sanitize_args(args),
"trace_id": span.trace_id, "span_id": span.span_id,
}))
try:
result = await asyncio.wait_for(
tool_func(**args) if asyncio.iscoroutinefunction(tool_func)
else asyncio.to_thread(tool_func, **args),
timeout=timeout
)
duration_ms = (time.time() - call_record["start_time"]) * 1000
self.agent_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_success", "session_id": session_id,
"tool_name": tool_name, "duration_ms": round(duration_ms, 2),
"result_summary": str(result)[:200],
**span.to_log_dict("ok")
}))
call_record.update({"result": "success", "duration_ms": duration_ms})
self.call_chain.append(call_record)
return result
except asyncio.TimeoutError:
error_msg = f"Tool '{tool_name}' timed out after {timeout}s"
self.error_logger.error(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_timeout", "session_id": session_id,
"tool_name": tool_name, "timeout_seconds": timeout,
**span.to_log_dict("timeout", error_msg)
}))
call_record.update({"result": "timeout", "error": error_msg})
self.call_chain.append(call_record)
raise
except Exception as e:
duration_ms = (time.time() - call_record["start_time"]) * 1000
self.error_logger.error(json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"event": "tool_call_error", "session_id": session_id,
"tool_name": tool_name, "duration_ms": round(duration_ms, 2),
"error_type": type(e).__name__, "error_message": str(e),
**span.to_log_dict("error", str(e))
}), exc_info=True)
call_record.update({"result": "error", "error": str(e)})
self.call_chain.append(call_record)
raise
def _sanitize_args(self, args: dict) -> dict:
sensitive = {"password", "token", "api_key", "secret", "credential"}
return {k: "***REDACTED***" if k.lower() in sensitive else v for k, v in args.items()}
def get_call_chain_report(self) -> dict:
failed = [c for c in self.call_chain if c["result"] != "success"]
return {
"total_calls": len(self.call_chain),
"successful": len(self.call_chain) - len(failed),
"failed": len(failed),
"call_chain": self.call_chain,
"failures": failed
}
63.3 Common Bug Pattern Recognition
Pattern 1: Infinite Tool Call Loop
Symptom: The same tool called repeatedly at the same step; Agent never advances.
Log signature:
{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_success", "tool_name": "web_search", "step_index": 2}
{"event": "tool_call_start", "tool_name": "web_search", "step_index": 2} // REPEATED
Detection:
def detect_infinite_loop(log_file: str, session_id: str, threshold: int = 3) -> list:
from collections import defaultdict
tool_call_counts = defaultdict(int)
alerts = []
with open(log_file) as f:
for line in f:
try:
entry = json.loads(line)
if (entry.get("session_id") == session_id and
entry.get("event") == "tool_call_start"):
key = f"{entry['tool_name']}@step{entry.get('step_index', '?')}"
tool_call_counts[key] += 1
if tool_call_counts[key] >= threshold:
alerts.append({
"tool": entry["tool_name"],
"step": entry.get("step_index"),
"count": tool_call_counts[key]
})
except (json.JSONDecodeError, KeyError):
continue
return alerts
# Root cause: LLM misunderstood tool result, thinks task is incomplete
# Fix: Add explicit "step complete" signal in tool result; add per-step call counter with max
Pattern 2: Memory Pollution (Cross-Session Contamination)
Symptom: Agent in a new session references data from a previous session.
Log signature:
{"event": "session_start", "session_id": "sess_new001"}
{"event": "llm_response", "content": "Based on the data you mentioned earlier..."}
// "earlier" doesn't exist in this session!
Detection and fix:
import time
class MemoryPollutionGuard:
def __init__(self):
self._session_memories = {}
def get_memory(self, session_id: str) -> dict:
if session_id not in self._session_memories:
self._session_memories[session_id] = {
"conversation_history": [], "tool_results": {},
"created_at": time.time()
}
return self._session_memories[session_id]
def clear_session(self, session_id: str):
self._session_memories.pop(session_id, None)
def audit_cross_references(self, session_id: str, content: str) -> list:
alerts = []
session_age = time.time() - self.get_memory(session_id)["created_at"]
# Check for leaked session IDs
for other_id in self._session_memories:
if other_id != session_id and other_id in content:
alerts.append({"type": "session_id_leak", "leaked": other_id})
# Temporal paradox: new session referencing past data
if session_age < 60:
for ref in ["previously", "last time", "as mentioned earlier", "you said before"]:
if ref in content.lower():
alerts.append({"type": "temporal_paradox", "reference": ref,
"session_age_s": session_age})
return alerts
Pattern 3: Argument Parsing Failure
Symptom: Tool calls fail with validation errors or missing required fields.
Log signature:
{"event": "tool_call_error", "error_type": "ValidationError",
"error_message": "Missing required field 'query'",
"args": {"search_term": "AI papers"}} // LLM used wrong field name!
Auto-detection and correction:
from typing import Any, Optional
class ToolArgValidator:
SYNONYMS = {
"query": ["search_term", "q", "search", "keyword"],
"url": ["link", "href", "address"],
"file_path": ["path", "filepath", "filename"],
"max_results": ["limit", "count", "n", "top_k"]
}
def __init__(self, tool_schemas: dict):
self.schemas = tool_schemas
def validate(self, tool_name: str, args: dict) -> tuple:
schema = self.schemas.get(tool_name)
if not schema:
return False, f"Unknown tool: {tool_name}", args
required = schema.get("required", [])
properties = schema.get("properties", {})
errors = []
corrected = dict(args)
for field in required:
if field not in args:
fuzzy = self._fuzzy_find(field, args)
if fuzzy:
corrected[field] = corrected.pop(fuzzy)
errors.append(f"Auto-corrected: '{fuzzy}' โ '{field}'")
else:
errors.append(f"Missing required: '{field}'")
for field, value in args.items():
if field in properties:
expected = properties[field].get("type")
if not self._check_type(value, expected):
converted = self._try_convert(value, expected)
if converted is not None:
corrected[field] = converted
errors.append(f"Auto-converted '{field}' to {expected}")
else:
errors.append(f"Type error '{field}': expected {expected}")
is_valid = all("Missing" not in e for e in errors)
return is_valid, "; ".join(errors), corrected
def _fuzzy_find(self, target: str, args: dict) -> Optional[str]:
for canonical, variants in self.SYNONYMS.items():
if target == canonical:
for v in variants:
if v in args:
return v
return None
def _check_type(self, value: Any, expected: str) -> bool:
type_map = {"string": str, "integer": int, "number": (int, float),
"boolean": bool, "array": list}
t = type_map.get(expected)
return isinstance(value, t) if t else True
def _try_convert(self, value: Any, expected: str) -> Optional[Any]:
try:
if expected == "string": return str(value)
if expected == "integer": return int(value)
if expected == "number": return float(value)
if expected == "boolean": return bool(value)
except (ValueError, TypeError):
pass
return None
63.4 Building a Debug Dashboard with ELK Stack
Architecture
Hermes Agent โ agent.log โ Filebeat โ Logstash โ Elasticsearch โ Kibana
errors.log โ
Logstash Configuration
# /etc/logstash/conf.d/hermes-agent.conf
input {
file {
path => ["/var/log/hermes/agent.log", "/var/log/hermes/errors.log"]
start_position => "beginning"
tags => ["hermes_agent"]
}
}
filter {
json { source => "message" target => "parsed" remove_field => ["message"] }
ruby {
code => "
if event.get('parsed')
event.get('parsed').each { |k, v| event.set(k, v) }
event.remove('parsed')
end
"
}
if "/errors.log" in [path] {
mutate { add_field => { "log_type" => "error" } }
} else {
mutate { add_field => { "log_type" => "agent" } }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "hermes-agent-%{+YYYY.MM.dd}"
}
}
Key Kibana Queries
// Tool call success rate by tool
{
"query": {"bool": {"must": [{"term": {"event": "tool_call_start"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}]}},
"aggs": {"by_tool": {"terms": {"field": "tool_name.keyword"},
"aggs": {"error_rate": {"filter": {"term": {"log_type": "error"}}}}}}
}
// Top 10 slowest tools (P95 latency)
{
"query": {"term": {"event": "tool_call_success"}},
"aggs": {"by_tool": {"terms": {"field": "tool_name.keyword", "size": 10},
"aggs": {"p95_ms": {"percentiles": {"field": "duration_ms", "percents": [95]}}}}}
}
Elasticsearch Index Template
{
"index_patterns": ["hermes-agent-*"],
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"session_id": {"type": "keyword"},
"event": {"type": "keyword"},
"tool_name": {"type": "keyword"},
"from_state": {"type": "keyword"},
"to_state": {"type": "keyword"},
"duration_ms": {"type": "float"},
"tokens_used_so_far": {"type": "integer"},
"retry_count": {"type": "integer"},
"log_type": {"type": "keyword"},
"error_type": {"type": "keyword"}
}
}
}
63.5 Session Replay Implementation
What Is Session Replay?
Session replay lets developers exactly reproduce an Agent's task execution flow from logs, enabling:
- Reproducing reported bugs
- Verifying that fixes work
- Performance regression testing
- Auto-generating test cases
Replayer Implementation
import json
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable
@dataclass
class ReplayEvent:
timestamp: str
event_type: str
data: dict
class SessionReplayer:
def __init__(self, log_file: str = "/var/log/hermes/agent.log"):
self.log_file = log_file
def load_session(self, session_id: str) -> list:
events = []
with open(self.log_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if entry.get("session_id") == session_id:
events.append(ReplayEvent(
timestamp=entry.get("timestamp", ""),
event_type=entry.get("event", entry.get("message", "")),
data=entry
))
except json.JSONDecodeError:
continue
events.sort(key=lambda e: e.timestamp)
return events
def print_timeline(self, session_id: str):
events = self.load_session(session_id)
if not events:
print(f"No events found for session {session_id}")
return
print(f"\n{'='*60}\nSession Replay: {session_id}\n"
f"Total Events: {len(events)}\n{'='*60}\n")
start = datetime.fromisoformat(events[0].timestamp.rstrip('Z'))
for event in events:
try:
elapsed = (datetime.fromisoformat(event.timestamp.rstrip('Z')) - start).total_seconds()
except:
elapsed = 0
prefix = "ERR" if "error" in event.event_type.lower() else " "
print(f"[T+{elapsed:6.2f}s] {prefix} {event.event_type}")
for key in ["tool_name", "from_state", "to_state", "error_message", "tokens_used_so_far"]:
if key in event.data and event.data[key] is not None:
print(f" {key}: {event.data[key]}")
print()
async def replay_with_mocks(
self, session_id: str, tool_mocks: dict,
on_state_change: Optional[Callable] = None
) -> dict:
events = self.load_session(session_id)
tool_calls = []
for event in events:
if event.event_type == "tool_call_start":
tool_name = event.data.get("tool_name")
args = event.data.get("args", {})
if tool_name in tool_mocks:
mock = tool_mocks[tool_name]
result = mock(args) if callable(mock) else mock
status = "mocked"
else:
result = None
status = "skipped_no_mock"
tool_calls.append({"tool": tool_name, "result_status": status,
"result": str(result)[:100]})
elif "transition" in event.event_type.lower() and on_state_change:
await on_state_change(
from_state=event.data.get("from_state"),
to_state=event.data.get("to_state"),
trigger=event.data.get("trigger")
)
return {"session_id": session_id, "total_events": len(events),
"tool_calls": tool_calls, "replay_complete": True}
def generate_test_case(self, session_id: str) -> str:
events = self.load_session(session_id)
tool_calls = [e for e in events if e.event_type == "tool_call_start"]
tools_used = {e.data.get("tool_name", "") for e in events
if e.event_type == "tool_call_start"}
tool_registrations = "\n".join(
f' agent.register_tool("{t}")' for t in tools_used if t
)
mock_entries = "\n".join(
f' "{e.data.get("tool_name")}": "mock_result",'
for e in events if e.event_type == "tool_call_success"
)
return f'''import pytest
import asyncio
from hermes_agent import HermesAgentFSM
# Auto-generated from session {session_id} ({len(events)} events, {len(tool_calls)} tool calls)
class TestSession_{session_id.replace("-", "_")}:
@pytest.fixture
def agent(self):
agent = HermesAgentFSM(session_id="replay")
{tool_registrations}
return agent
def test_replay(self, agent):
from session_replayer import SessionReplayer
replayer = SessionReplayer()
result = asyncio.run(replayer.replay_with_mocks(
session_id="{session_id}",
tool_mocks={{
{mock_entries}
}}
))
assert result["replay_complete"] is True
assert result["total_events"] > 0
'''
# Usage
replayer = SessionReplayer("/var/log/hermes/agent.log")
replayer.print_timeline("sess_abc123")
test_code = replayer.generate_test_case("sess_abc123")
with open("test_session_replay.py", "w") as f:
f.write(test_code)
print("Test case generated: test_session_replay.py")
Chapter Summary
A complete debugging methodology transforms Hermes Agent from a black box into a transparent system:
- Dual-log architecture: agent.log records all state transitions in normal flow; errors.log records full error contextโtogether they provide comprehensive coverage
- Distributed tracing: Each tool call generates a TraceID/SpanID, enabling cross-tool call chain analysis
- Bug pattern recognition: Infinite loops, memory pollution, and argument parsing failures are the three most common failure modesโall with automated detection solutions
- ELK dashboard: Real-time monitoring of tool success rates, P95 latency, and token usage trends catches issues before they escalate
- Session replay: Reproducing execution from logs and auto-generating test cases completes the debug-verify feedback loop
Review Questions
- When an Agent's output is flagged by a user as incorrect, how would you design reverse tracingโworking backward from the output to the specific tool call that caused the error?
- In production, fully logging all tool call inputs and outputs generates massive data volumes. How would you design an "on-demand sampling" log strategy?
- If a bug appears in only 1 out of 100 calls, traditional reproduction approaches are unreliable. How would you design a "probabilistic bug tracing" system?
- In session replay testing, if tool responses change over time (e.g., web page content updates), how do you ensure test determinism?