Skill Performance Profiling and Optimization
Chapter 35: Skill Performance Profiling and Optimization
A Skill that performs flawlessly in development can grind to a halt in production. Performance problems rarely live where you expect them: you assume LLM inference is the bottleneck, but it turns out a single tool call is waiting on DNS resolution. You think concurrency is sufficient, but memory fragmentation is triggering the garbage collector every few seconds. This chapter provides a systematic performance profiling methodologyโfrom measurement to optimizationโwith concrete tools and code at every step.
35.1 Root Cause Classification
Before optimizing, you must measure. Optimization without measurement is the most expensive form of guesswork.
Performance Problem Taxonomy
Skill Performance Issues
โโโ Latency
โ โโโ LLM inference latency (time-to-first-token)
โ โโโ Tool call latency (primarily network I/O)
โ โโโ Skill load latency (cold-start)
โ โโโ Serialization latency (parallelizable but not parallelized)
โ
โโโ Throughput
โ โโโ Insufficient concurrency (semaphore too small)
โ โโโ Event loop blocking (sync code mixed into async)
โ โโโ Connection pool exhaustion (HTTP/DB)
โ
โโโ Resource Usage
โโโ Memory leak (long-lived object accumulation)
โโโ CPU-intensive ops blocking the event loop
โโโ Excessive disk I/O (over-logging)
Typical Latency Benchmarks
| Operation | Typical Latency | Optimized Target |
|---|---|---|
| LLM inference (local 7B) | 2000โ8000ms | ~1500ms (quantized) |
| LLM API call (cloud) | 500โ3000ms | ~300ms (streaming + cache) |
| HTTP tool call | 100โ2000ms | ~50ms (keep-alive + concurrency) |
| Local DB query | 1โ100ms | <1ms (indexed + pool) |
| File I/O | 0.1โ50ms | <0.1ms (mmap) |
| Skill serialization | 1โ20ms | <1ms (MessagePack) |
35.2 Execution Time Distribution Analysis
Built-in Profiler: SkillProfiler
# hermes/profiling/skill_profiler.py
import time, statistics, functools
from contextlib import asynccontextmanager
from typing import Dict, List, Any, Callable
from dataclasses import dataclass, field
@dataclass
class SpanRecord:
name: str
start_time: float
end_time: float
metadata: Dict = field(default_factory=dict)
error: str = None
@property
def duration_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
@property
def success(self) -> bool:
return self.error is None
class SkillProfiler:
def __init__(self, skill_name: str):
self.skill_name = skill_name
self.spans: List[SpanRecord] = []
self._session_start = time.time()
@asynccontextmanager
async def span(self, name: str, **metadata):
start = time.time()
error = None
try:
yield
except Exception as e:
error = str(e)
raise
finally:
self.spans.append(SpanRecord(
name=name, start_time=start,
end_time=time.time(), metadata=metadata, error=error
))
def trace(self, span_name: str):
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with self.span(span_name):
return await func(*args, **kwargs)
return wrapper
return decorator
def get_report(self) -> Dict:
total_ms = (time.time() - self._session_start) * 1000
by_name: Dict[str, List[float]] = {}
for span in self.spans:
by_name.setdefault(span.name, []).append(span.duration_ms)
operations = {}
for name, durations in by_name.items():
s = sorted(durations)
operations[name] = {
"count": len(durations),
"total_ms": sum(durations),
"mean_ms": statistics.mean(durations),
"p95_ms": s[int(len(s) * 0.95)],
"max_ms": max(durations),
"pct_of_total": sum(durations) / total_ms * 100
}
sorted_ops = sorted(
operations.items(), key=lambda x: x[1]["total_ms"], reverse=True
)
return {
"skill": self.skill_name,
"session_total_ms": total_ms,
"operations": dict(sorted_ops),
"bottleneck": sorted_ops[0][0] if sorted_ops else None
}
def print_report(self):
report = self.get_report()
print(f"\n{'='*60}")
print(f"Skill Profile: {report['skill']}")
print(f"Total session: {report['session_total_ms']:.1f}ms | Bottleneck: {report['bottleneck']}")
print(f"\n{'Operation':<30} {'Count':>6} {'Mean(ms)':>10} {'P95(ms)':>10} {'Share':>8}")
print("-" * 70)
for name, stats in report["operations"].items():
print(f"{name:<30} {stats['count']:>6} {stats['mean_ms']:>10.1f} "
f"{stats['p95_ms']:>10.1f} {stats['pct_of_total']:>7.1f}%")
print(f"{'='*60}\n")
# Usage example
profiler = SkillProfiler("research-skill")
class ResearchSkill:
@profiler.trace("llm_inference")
async def _call_llm(self, prompt: str) -> str: ...
async def execute(self, query: str) -> Dict:
async with profiler.span("web_search", query=query):
results = await self._search_web(query)
async with profiler.span("llm_summarize"):
summary = await self._call_llm(str(results))
profiler.print_report()
return {"summary": summary}
35.3 Tool Call Latency Optimization
Tool calls are typically the highest-latency part of any Skill, primarily due to network I/O.
Strategy 1: HTTP Connection Reuse
# hermes/tools/http_client.py
import aiohttp
from typing import Optional
class OptimizedHttpClient:
_session: Optional[aiohttp.ClientSession] = None
@classmethod
async def get_session(cls) -> aiohttp.ClientSession:
if cls._session is None or cls._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Max total connections
limit_per_host=30, # Max per host
ttl_dns_cache=300, # Cache DNS 5 minutes
keepalive_timeout=60,
enable_cleanup_closed=True
)
cls._session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30, connect=5),
headers={"Accept-Encoding": "gzip, deflate"}
)
return cls._session
Strategy 2: Concurrent Tool Calls
# hermes/tools/concurrent_caller.py
import asyncio
from typing import List, Dict, Any, Callable
class ConcurrentToolCaller:
"""
Convert serial tool calls to parallel.
Serial: total_time = N ร T
Parallel: total_time โ max(T)
"""
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def call_all(self, tool_calls: List[Dict[str, Any]]) -> List[Any]:
tasks = [
self._call_with_limit(call["tool"], call["args"], call.get("id", str(i)))
for i, call in enumerate(tool_calls)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{"id": call.get("id"), "success": not isinstance(r, Exception),
"result": None if isinstance(r, Exception) else r,
"error": str(r) if isinstance(r, Exception) else None}
for r, call in zip(results, tool_calls)
]
async def _call_with_limit(self, tool_fn: Callable, args: Dict, call_id: str):
async with self.semaphore:
return await tool_fn(**args)
# Example: parallel multi-engine search
async def search_parallel(queries: List[str]) -> List[Dict]:
caller = ConcurrentToolCaller(max_concurrent=5)
return await caller.call_all([
{"tool": web_search, "args": {"query": q}, "id": f"q_{i}"}
for i, q in enumerate(queries)
])
35.4 Caching Intermediate Results
Caching delivers the highest ROI of any optimizationโespecially for repeated queries and expensive computations.
# hermes/cache/multi_level_cache.py
import hashlib, json, time
from typing import Any, Optional, Callable
from functools import wraps
class MultiLevelCache:
"""
L1 (memory) โ L2 (Redis) โ L3 (disk)
Hit priority: L1 > L2 > L3 > real request
"""
def __init__(self, l1_ttl=300, l2_ttl=3600):
self.l1 = {}
self.l1_ttl = l1_ttl
self.l2_ttl = l2_ttl
self._redis = None
self._stats = {"l1_hits": 0, "l2_hits": 0, "misses": 0}
def _make_key(self, fn_name: str, args: tuple, kwargs: dict) -> str:
payload = json.dumps({"fn": fn_name, "args": args, "kwargs": kwargs},
sort_keys=True, default=str)
return hashlib.sha256(payload.encode()).hexdigest()[:16]
async def get(self, key: str) -> Optional[Any]:
if key in self.l1:
entry = self.l1[key]
if time.time() < entry["expires_at"]:
self._stats["l1_hits"] += 1
return entry["value"]
del self.l1[key]
redis = await self._get_redis()
if redis:
raw = await redis.get(key)
if raw:
value = json.loads(raw)
self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
self._stats["l2_hits"] += 1
return value
self._stats["misses"] += 1
return None
async def set(self, key: str, value: Any):
self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
redis = await self._get_redis()
if redis:
await redis.setex(key, self.l2_ttl, json.dumps(value, default=str))
def cached(self, ttl: Optional[int] = None):
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
key = self._make_key(func.__name__, args, kwargs)
result = await self.get(key)
if result is not None:
return result
result = await func(*args, **kwargs)
await self.set(key, result)
return result
return wrapper
return decorator
cache = MultiLevelCache()
class WebSearchSkill:
@cache.cached(ttl=3600)
async def search(self, query: str) -> List[Dict]:
return await self._do_real_search(query) # Only called on cache miss
35.5 Memory Optimization
# hermes/profiling/memory_profiler.py
import tracemalloc, gc
class SkillMemoryProfiler:
def start(self):
tracemalloc.start(25)
gc.collect()
def get_top_allocators(self, top_n: int = 10) -> str:
snap = tracemalloc.take_snapshot()
top_stats = snap.statistics("lineno")
lines = [f"Top {top_n} memory allocators:"]
for stat in top_stats[:top_n]:
lines.append(f" {stat.size/1024:.1f} KB - {stat.traceback.format()[0]}")
return "\n".join(lines)
class MemoryOptimizedSkill:
# Use __slots__ to reduce per-instance memory by 40-60%
__slots__ = ["name", "config", "_cache"]
async def process_large_dataset(self, source: str):
"""Use generators for large data โ never load everything at once"""
async for chunk in self._stream_data(source):
result = await self._process_chunk(chunk)
yield result
del chunk, result
gc.collect()
35.6 Building a Performance Benchmark Suite
# hermes/benchmarks/skill_benchmark.py
import asyncio, statistics, time
from dataclasses import dataclass
from typing import Callable, Dict, Any
@dataclass
class BenchmarkResult:
name: str
iterations: int
mean_ms: float
median_ms: float
p95_ms: float
p99_ms: float
min_ms: float
max_ms: float
throughput_rps: float
errors: int
class SkillBenchmark:
async def run(
self, name: str, target_func: Callable,
args: Dict[str, Any] = None, iterations: int = 100,
warmup: int = 10, concurrency: int = 1
) -> BenchmarkResult:
args = args or {}
# Warmup
for _ in range(warmup):
try:
await target_func(**args)
except Exception:
pass
semaphore = asyncio.Semaphore(concurrency)
errors = 0
async def single_run():
nonlocal errors
async with semaphore:
t = time.perf_counter()
try:
await target_func(**args)
return (time.perf_counter() - t) * 1000
except Exception:
errors += 1
return None
start_total = time.time()
results = await asyncio.gather(*[single_run() for _ in range(iterations)])
total_time = time.time() - start_total
durations = sorted(r for r in results if r is not None)
n = len(durations)
result = BenchmarkResult(
name=name, iterations=iterations,
mean_ms=statistics.mean(durations),
median_ms=statistics.median(durations),
p95_ms=durations[int(n * 0.95)],
p99_ms=durations[int(n * 0.99)],
min_ms=min(durations), max_ms=max(durations),
throughput_rps=iterations / total_time,
errors=errors
)
self._print(result)
return result
def _print(self, r: BenchmarkResult):
print(f"\nBenchmark: {r.name}")
print(f" Iterations: {r.iterations} Errors: {r.errors}")
print(f" Mean: {r.mean_ms:.2f}ms Median: {r.median_ms:.2f}ms")
print(f" P95: {r.p95_ms:.2f}ms P99: {r.p99_ms:.2f}ms")
print(f" Throughput: {r.throughput_rps:.1f} req/s")
Performance Optimization Checklist
โก Used SkillProfiler to identify actual bottleneck (not gut feeling)
โก HTTP client uses connection pool (aiohttp.TCPConnector)
โก Repeated tool calls have caching (@cache.cached)
โก Independent tool calls use asyncio.gather (parallel)
โก No blocking sync calls inside async functions
โก Large data uses generators / streaming
โก Hot objects use __slots__
โก Baseline benchmarks recorded in production
โก P95/P99 latency meets SLA requirements
โก Memory validated with tracemalloc โ no leaks
Chapter Summary
The core principle of performance optimization: measure first, then optimize the slowest partโnot the easiest part.
- Execution time profiling:
SkillProfilergives span-level precision to locate real bottlenecks with data - Tool call optimization: Connection reuse, DNS caching, concurrent callsโreduce tool latency by 50โ90%
- Caching: Multi-level cache (L1/L2/L3) delivers the highest ROI; avoid repeating expensive work
- Concurrent calls: Replace serial NรT with parallel max(T)โthe most direct speedup
- Memory optimization:
tracemalloc,__slots__, and streaming as the three core techniques - Benchmarking: Quantify every optimization; record baselines; track regressions
Review Questions
- If LLM inference consumes 80% of total time, how much can caching help? What scenarios are unsuitable for caching?
- When running concurrent tool calls, what happens to overall latency if one tool has a high error rate? How would you design a fail-fast mechanism?
- In a multi-tenant Hermes deployment, can one Skill's performance issues affect other tenants? How would you implement performance isolation?
- Design a continuous performance monitoring system: how would you detect performance regressions in production in real time and trigger automatic alerts?