Chapter 35

Skill Performance Profiling and Optimization

Chapter 35: Skill Performance Profiling and Optimization

A Skill that performs flawlessly in development can grind to a halt in production. Performance problems rarely live where you expect them: you assume LLM inference is the bottleneck, but it turns out a single tool call is waiting on DNS resolution. You think concurrency is sufficient, but memory fragmentation is triggering the garbage collector every few seconds. This chapter provides a systematic performance profiling methodologyโ€”from measurement to optimizationโ€”with concrete tools and code at every step.


35.1 Root Cause Classification

Before optimizing, you must measure. Optimization without measurement is the most expensive form of guesswork.

Performance Problem Taxonomy

Skill Performance Issues
โ”œโ”€โ”€ Latency
โ”‚   โ”œโ”€โ”€ LLM inference latency (time-to-first-token)
โ”‚   โ”œโ”€โ”€ Tool call latency (primarily network I/O)
โ”‚   โ”œโ”€โ”€ Skill load latency (cold-start)
โ”‚   โ””โ”€โ”€ Serialization latency (parallelizable but not parallelized)
โ”‚
โ”œโ”€โ”€ Throughput
โ”‚   โ”œโ”€โ”€ Insufficient concurrency (semaphore too small)
โ”‚   โ”œโ”€โ”€ Event loop blocking (sync code mixed into async)
โ”‚   โ””โ”€โ”€ Connection pool exhaustion (HTTP/DB)
โ”‚
โ””โ”€โ”€ Resource Usage
    โ”œโ”€โ”€ Memory leak (long-lived object accumulation)
    โ”œโ”€โ”€ CPU-intensive ops blocking the event loop
    โ””โ”€โ”€ Excessive disk I/O (over-logging)

Typical Latency Benchmarks

Operation Typical Latency Optimized Target
LLM inference (local 7B) 2000โ€“8000ms ~1500ms (quantized)
LLM API call (cloud) 500โ€“3000ms ~300ms (streaming + cache)
HTTP tool call 100โ€“2000ms ~50ms (keep-alive + concurrency)
Local DB query 1โ€“100ms <1ms (indexed + pool)
File I/O 0.1โ€“50ms <0.1ms (mmap)
Skill serialization 1โ€“20ms <1ms (MessagePack)

35.2 Execution Time Distribution Analysis

Built-in Profiler: SkillProfiler

# hermes/profiling/skill_profiler.py
import time, statistics, functools
from contextlib import asynccontextmanager
from typing import Dict, List, Any, Callable
from dataclasses import dataclass, field

@dataclass
class SpanRecord:
    name: str
    start_time: float
    end_time: float
    metadata: Dict = field(default_factory=dict)
    error: str = None
    
    @property
    def duration_ms(self) -> float:
        return (self.end_time - self.start_time) * 1000
    
    @property
    def success(self) -> bool:
        return self.error is None


class SkillProfiler:
    def __init__(self, skill_name: str):
        self.skill_name = skill_name
        self.spans: List[SpanRecord] = []
        self._session_start = time.time()
    
    @asynccontextmanager
    async def span(self, name: str, **metadata):
        start = time.time()
        error = None
        try:
            yield
        except Exception as e:
            error = str(e)
            raise
        finally:
            self.spans.append(SpanRecord(
                name=name, start_time=start,
                end_time=time.time(), metadata=metadata, error=error
            ))
    
    def trace(self, span_name: str):
        def decorator(func: Callable):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                async with self.span(span_name):
                    return await func(*args, **kwargs)
            return wrapper
        return decorator
    
    def get_report(self) -> Dict:
        total_ms = (time.time() - self._session_start) * 1000
        by_name: Dict[str, List[float]] = {}
        
        for span in self.spans:
            by_name.setdefault(span.name, []).append(span.duration_ms)
        
        operations = {}
        for name, durations in by_name.items():
            s = sorted(durations)
            operations[name] = {
                "count": len(durations),
                "total_ms": sum(durations),
                "mean_ms": statistics.mean(durations),
                "p95_ms": s[int(len(s) * 0.95)],
                "max_ms": max(durations),
                "pct_of_total": sum(durations) / total_ms * 100
            }
        
        sorted_ops = sorted(
            operations.items(), key=lambda x: x[1]["total_ms"], reverse=True
        )
        return {
            "skill": self.skill_name,
            "session_total_ms": total_ms,
            "operations": dict(sorted_ops),
            "bottleneck": sorted_ops[0][0] if sorted_ops else None
        }
    
    def print_report(self):
        report = self.get_report()
        print(f"\n{'='*60}")
        print(f"Skill Profile: {report['skill']}")
        print(f"Total session: {report['session_total_ms']:.1f}ms  |  Bottleneck: {report['bottleneck']}")
        print(f"\n{'Operation':<30} {'Count':>6} {'Mean(ms)':>10} {'P95(ms)':>10} {'Share':>8}")
        print("-" * 70)
        for name, stats in report["operations"].items():
            print(f"{name:<30} {stats['count']:>6} {stats['mean_ms']:>10.1f} "
                  f"{stats['p95_ms']:>10.1f} {stats['pct_of_total']:>7.1f}%")
        print(f"{'='*60}\n")


# Usage example
profiler = SkillProfiler("research-skill")

class ResearchSkill:
    @profiler.trace("llm_inference")
    async def _call_llm(self, prompt: str) -> str: ...
    
    async def execute(self, query: str) -> Dict:
        async with profiler.span("web_search", query=query):
            results = await self._search_web(query)
        async with profiler.span("llm_summarize"):
            summary = await self._call_llm(str(results))
        profiler.print_report()
        return {"summary": summary}

35.3 Tool Call Latency Optimization

Tool calls are typically the highest-latency part of any Skill, primarily due to network I/O.

Strategy 1: HTTP Connection Reuse

# hermes/tools/http_client.py
import aiohttp
from typing import Optional

class OptimizedHttpClient:
    _session: Optional[aiohttp.ClientSession] = None
    
    @classmethod
    async def get_session(cls) -> aiohttp.ClientSession:
        if cls._session is None or cls._session.closed:
            connector = aiohttp.TCPConnector(
                limit=100,              # Max total connections
                limit_per_host=30,      # Max per host
                ttl_dns_cache=300,      # Cache DNS 5 minutes
                keepalive_timeout=60,
                enable_cleanup_closed=True
            )
            cls._session = aiohttp.ClientSession(
                connector=connector,
                timeout=aiohttp.ClientTimeout(total=30, connect=5),
                headers={"Accept-Encoding": "gzip, deflate"}
            )
        return cls._session

Strategy 2: Concurrent Tool Calls

# hermes/tools/concurrent_caller.py
import asyncio
from typing import List, Dict, Any, Callable

class ConcurrentToolCaller:
    """
    Convert serial tool calls to parallel.
    Serial: total_time = N ร— T
    Parallel: total_time โ‰ˆ max(T)
    """
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call_all(self, tool_calls: List[Dict[str, Any]]) -> List[Any]:
        tasks = [
            self._call_with_limit(call["tool"], call["args"], call.get("id", str(i)))
            for i, call in enumerate(tool_calls)
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [
            {"id": call.get("id"), "success": not isinstance(r, Exception),
             "result": None if isinstance(r, Exception) else r,
             "error": str(r) if isinstance(r, Exception) else None}
            for r, call in zip(results, tool_calls)
        ]
    
    async def _call_with_limit(self, tool_fn: Callable, args: Dict, call_id: str):
        async with self.semaphore:
            return await tool_fn(**args)

# Example: parallel multi-engine search
async def search_parallel(queries: List[str]) -> List[Dict]:
    caller = ConcurrentToolCaller(max_concurrent=5)
    return await caller.call_all([
        {"tool": web_search, "args": {"query": q}, "id": f"q_{i}"}
        for i, q in enumerate(queries)
    ])

35.4 Caching Intermediate Results

Caching delivers the highest ROI of any optimizationโ€”especially for repeated queries and expensive computations.

# hermes/cache/multi_level_cache.py
import hashlib, json, time
from typing import Any, Optional, Callable
from functools import wraps

class MultiLevelCache:
    """
    L1 (memory) โ†’ L2 (Redis) โ†’ L3 (disk)
    Hit priority: L1 > L2 > L3 > real request
    """
    def __init__(self, l1_ttl=300, l2_ttl=3600):
        self.l1 = {}
        self.l1_ttl = l1_ttl
        self.l2_ttl = l2_ttl
        self._redis = None
        self._stats = {"l1_hits": 0, "l2_hits": 0, "misses": 0}
    
    def _make_key(self, fn_name: str, args: tuple, kwargs: dict) -> str:
        payload = json.dumps({"fn": fn_name, "args": args, "kwargs": kwargs},
                             sort_keys=True, default=str)
        return hashlib.sha256(payload.encode()).hexdigest()[:16]
    
    async def get(self, key: str) -> Optional[Any]:
        if key in self.l1:
            entry = self.l1[key]
            if time.time() < entry["expires_at"]:
                self._stats["l1_hits"] += 1
                return entry["value"]
            del self.l1[key]
        
        redis = await self._get_redis()
        if redis:
            raw = await redis.get(key)
            if raw:
                value = json.loads(raw)
                self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
                self._stats["l2_hits"] += 1
                return value
        
        self._stats["misses"] += 1
        return None
    
    async def set(self, key: str, value: Any):
        self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
        redis = await self._get_redis()
        if redis:
            await redis.setex(key, self.l2_ttl, json.dumps(value, default=str))
    
    def cached(self, ttl: Optional[int] = None):
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                key = self._make_key(func.__name__, args, kwargs)
                result = await self.get(key)
                if result is not None:
                    return result
                result = await func(*args, **kwargs)
                await self.set(key, result)
                return result
            return wrapper
        return decorator

cache = MultiLevelCache()

class WebSearchSkill:
    @cache.cached(ttl=3600)
    async def search(self, query: str) -> List[Dict]:
        return await self._do_real_search(query)  # Only called on cache miss

35.5 Memory Optimization

# hermes/profiling/memory_profiler.py
import tracemalloc, gc

class SkillMemoryProfiler:
    def start(self):
        tracemalloc.start(25)
        gc.collect()
    
    def get_top_allocators(self, top_n: int = 10) -> str:
        snap = tracemalloc.take_snapshot()
        top_stats = snap.statistics("lineno")
        lines = [f"Top {top_n} memory allocators:"]
        for stat in top_stats[:top_n]:
            lines.append(f"  {stat.size/1024:.1f} KB - {stat.traceback.format()[0]}")
        return "\n".join(lines)


class MemoryOptimizedSkill:
    # Use __slots__ to reduce per-instance memory by 40-60%
    __slots__ = ["name", "config", "_cache"]
    
    async def process_large_dataset(self, source: str):
        """Use generators for large data โ€” never load everything at once"""
        async for chunk in self._stream_data(source):
            result = await self._process_chunk(chunk)
            yield result
            del chunk, result
            gc.collect()

35.6 Building a Performance Benchmark Suite

# hermes/benchmarks/skill_benchmark.py
import asyncio, statistics, time
from dataclasses import dataclass
from typing import Callable, Dict, Any

@dataclass
class BenchmarkResult:
    name: str
    iterations: int
    mean_ms: float
    median_ms: float
    p95_ms: float
    p99_ms: float
    min_ms: float
    max_ms: float
    throughput_rps: float
    errors: int

class SkillBenchmark:
    async def run(
        self, name: str, target_func: Callable,
        args: Dict[str, Any] = None, iterations: int = 100,
        warmup: int = 10, concurrency: int = 1
    ) -> BenchmarkResult:
        args = args or {}
        
        # Warmup
        for _ in range(warmup):
            try:
                await target_func(**args)
            except Exception:
                pass
        
        semaphore = asyncio.Semaphore(concurrency)
        errors = 0
        
        async def single_run():
            nonlocal errors
            async with semaphore:
                t = time.perf_counter()
                try:
                    await target_func(**args)
                    return (time.perf_counter() - t) * 1000
                except Exception:
                    errors += 1
                    return None
        
        start_total = time.time()
        results = await asyncio.gather(*[single_run() for _ in range(iterations)])
        total_time = time.time() - start_total
        
        durations = sorted(r for r in results if r is not None)
        n = len(durations)
        
        result = BenchmarkResult(
            name=name, iterations=iterations,
            mean_ms=statistics.mean(durations),
            median_ms=statistics.median(durations),
            p95_ms=durations[int(n * 0.95)],
            p99_ms=durations[int(n * 0.99)],
            min_ms=min(durations), max_ms=max(durations),
            throughput_rps=iterations / total_time,
            errors=errors
        )
        self._print(result)
        return result
    
    def _print(self, r: BenchmarkResult):
        print(f"\nBenchmark: {r.name}")
        print(f"  Iterations: {r.iterations}  Errors: {r.errors}")
        print(f"  Mean: {r.mean_ms:.2f}ms  Median: {r.median_ms:.2f}ms")
        print(f"  P95:  {r.p95_ms:.2f}ms  P99: {r.p99_ms:.2f}ms")
        print(f"  Throughput: {r.throughput_rps:.1f} req/s")

Performance Optimization Checklist

โ–ก Used SkillProfiler to identify actual bottleneck (not gut feeling)
โ–ก HTTP client uses connection pool (aiohttp.TCPConnector)
โ–ก Repeated tool calls have caching (@cache.cached)
โ–ก Independent tool calls use asyncio.gather (parallel)
โ–ก No blocking sync calls inside async functions
โ–ก Large data uses generators / streaming
โ–ก Hot objects use __slots__
โ–ก Baseline benchmarks recorded in production
โ–ก P95/P99 latency meets SLA requirements
โ–ก Memory validated with tracemalloc โ€” no leaks

Chapter Summary

The core principle of performance optimization: measure first, then optimize the slowest partโ€”not the easiest part.

  1. Execution time profiling: SkillProfiler gives span-level precision to locate real bottlenecks with data
  2. Tool call optimization: Connection reuse, DNS caching, concurrent callsโ€”reduce tool latency by 50โ€“90%
  3. Caching: Multi-level cache (L1/L2/L3) delivers the highest ROI; avoid repeating expensive work
  4. Concurrent calls: Replace serial Nร—T with parallel max(T)โ€”the most direct speedup
  5. Memory optimization: tracemalloc, __slots__, and streaming as the three core techniques
  6. Benchmarking: Quantify every optimization; record baselines; track regressions

Review Questions

  1. If LLM inference consumes 80% of total time, how much can caching help? What scenarios are unsuitable for caching?
  2. When running concurrent tool calls, what happens to overall latency if one tool has a high error rate? How would you design a fail-fast mechanism?
  3. In a multi-tenant Hermes deployment, can one Skill's performance issues affect other tenants? How would you implement performance isolation?
  4. Design a continuous performance monitoring system: how would you detect performance regressions in production in real time and trigger automatic alerts?
Rate this chapter
4.8  / 5  (3 ratings)

๐Ÿ’ฌ Comments