第 35 章

Skill 性能剖析与优化

第35章:Skill 性能剖析与优化

一个 Skill 在开发环境跑得很好,部署到生产后却响应缓慢——这是 Agent 系统中最常见的性能陷阱。性能问题往往不在你预想的地方:你以为是 LLM 推理慢,结果是某个工具调用在等待网络;你以为并发够了,结果是内存碎片导致 GC 频繁触发。本章提供系统化的性能剖析方法论,从测量到优化,每一步都有具体工具和代码。


35.1 性能问题的根本原因分类

在开始优化之前,必须先测量。不测量就优化是最大的浪费。

性能问题分类树

Skill 性能问题
├── 延迟(Latency)问题
│   ├── LLM 推理延迟(首个 token 到达时间)
│   ├── 工具调用延迟(网络 I/O 为主)
│   ├── Skill 加载延迟(冷启动问题)
│   └── 串行化延迟(可并行但未并行)
│
├── 吞吐量(Throughput)问题
│   ├── 并发数不足(semaphore 过小)
│   ├── 事件循环阻塞(同步代码混入 async)
│   └── 连接池耗尽(HTTP/DB 连接不够)
│
└── 资源占用问题
    ├── 内存泄漏(长生命周期对象积累)
    ├── CPU 密集型操作阻塞事件循环
    └── 磁盘 I/O(日志过度写入)

典型执行时间分布(基准数据)

操作类型 典型延迟 优化上限
LLM 推理(本地 7B 模型) 2000-8000ms ~1500ms(量化+批处理)
LLM API 调用(云端) 500-3000ms ~300ms(流式+缓存)
HTTP 工具调用 100-2000ms ~50ms(连接复用+并发)
本地数据库查询 1-100ms <1ms(索引+连接池)
文件读写 0.1-50ms <0.1ms(内存映射)
Skill 序列化/反序列化 1-20ms <1ms(MessagePack)

35.2 Skill 执行时间分布分析

内置分析器:SkillProfiler

# hermes/profiling/skill_profiler.py
import time
import asyncio
import functools
import statistics
from contextlib import asynccontextmanager
from typing import Dict, List, Any, Callable
from dataclasses import dataclass, field
import json

@dataclass
class SpanRecord:
    """单次操作的时间记录"""
    name: str
    start_time: float
    end_time: float
    metadata: Dict = field(default_factory=dict)
    error: str = None
    
    @property
    def duration_ms(self) -> float:
        return (self.end_time - self.start_time) * 1000
    
    @property
    def success(self) -> bool:
        return self.error is None


class SkillProfiler:
    """
    Skill 执行性能剖析器
    
    使用方式:
    1. 装饰器模式:@profiler.trace("my_operation")
    2. 上下文管理器:async with profiler.span("my_operation"):
    3. 手动记录:profiler.record(name, start, end)
    """
    
    def __init__(self, skill_name: str):
        self.skill_name = skill_name
        self.spans: List[SpanRecord] = []
        self._session_start = time.time()
    
    @asynccontextmanager
    async def span(self, name: str, **metadata):
        """异步上下文管理器,自动记录时间"""
        start = time.time()
        error = None
        try:
            yield
        except Exception as e:
            error = str(e)
            raise
        finally:
            end = time.time()
            self.spans.append(SpanRecord(
                name=name,
                start_time=start,
                end_time=end,
                metadata=metadata,
                error=error
            ))
    
    def trace(self, span_name: str):
        """装饰器:自动 trace 函数执行时间"""
        def decorator(func: Callable):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                async with self.span(span_name):
                    return await func(*args, **kwargs)
            return wrapper
        return decorator
    
    def get_report(self) -> Dict:
        """生成性能报告"""
        total_session = (time.time() - self._session_start) * 1000
        
        # 按操作名称聚合
        by_name: Dict[str, List[float]] = {}
        for span in self.spans:
            if span.name not in by_name:
                by_name[span.name] = []
            by_name[span.name].append(span.duration_ms)
        
        operations = {}
        for name, durations in by_name.items():
            operations[name] = {
                "count": len(durations),
                "total_ms": sum(durations),
                "mean_ms": statistics.mean(durations),
                "median_ms": statistics.median(durations),
                "p95_ms": sorted(durations)[int(len(durations) * 0.95)] if len(durations) >= 20 else max(durations),
                "max_ms": max(durations),
                "min_ms": min(durations),
                "pct_of_total": sum(durations) / total_session * 100
            }
        
        # 按总时间排序(找出最大瓶颈)
        sorted_ops = sorted(
            operations.items(),
            key=lambda x: x[1]["total_ms"],
            reverse=True
        )
        
        return {
            "skill": self.skill_name,
            "session_total_ms": total_session,
            "operations": dict(sorted_ops),
            "bottleneck": sorted_ops[0][0] if sorted_ops else None,
            "errors": [s for s in self.spans if not s.success]
        }
    
    def print_report(self):
        """打印可读的性能报告"""
        report = self.get_report()
        print(f"\n{'='*60}")
        print(f"Skill 性能报告: {report['skill']}")
        print(f"总会话时间: {report['session_total_ms']:.1f}ms")
        print(f"主要瓶颈: {report['bottleneck']}")
        print(f"\n{'操作名称':<30} {'调用次数':>6} {'均值(ms)':>10} {'P95(ms)':>10} {'占比':>8}")
        print("-" * 70)
        
        for name, stats in report["operations"].items():
            print(
                f"{name:<30} {stats['count']:>6} "
                f"{stats['mean_ms']:>10.1f} "
                f"{stats['p95_ms']:>10.1f} "
                f"{stats['pct_of_total']:>7.1f}%"
            )
        print(f"{'='*60}\n")


# 使用示例
profiler = SkillProfiler("research-skill")

class ResearchSkill:
    
    @profiler.trace("llm_inference")
    async def _call_llm(self, prompt: str) -> str:
        # LLM 推理
        pass
    
    async def execute(self, query: str) -> Dict:
        async with profiler.span("web_search", query=query):
            search_results = await self._search_web(query)
        
        async with profiler.span("result_parsing"):
            parsed = self._parse_results(search_results)
        
        async with profiler.span("llm_summarize"):
            summary = await self._call_llm(f"总结:{parsed}")
        
        profiler.print_report()
        return {"summary": summary}

35.3 工具调用延迟优化

工具调用(Tool Call)通常是 Skill 中延迟最大的部分,主要原因是网络 I/O。

优化策略一:HTTP 连接复用

# hermes/tools/http_client.py
import aiohttp
from typing import Optional

class OptimizedHttpClient:
    """优化的 HTTP 客户端,复用连接,减少握手开销"""
    
    _session: Optional[aiohttp.ClientSession] = None
    
    @classmethod
    async def get_session(cls) -> aiohttp.ClientSession:
        if cls._session is None or cls._session.closed:
            # TCP 连接池配置
            connector = aiohttp.TCPConnector(
                limit=100,              # 最大并发连接数
                limit_per_host=30,      # 每个 host 最大连接数
                ttl_dns_cache=300,      # DNS 缓存 5 分钟
                use_dns_cache=True,
                keepalive_timeout=60,   # 保持连接 60 秒
                enable_cleanup_closed=True
            )
            cls._session = aiohttp.ClientSession(
                connector=connector,
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    connect=5,
                    sock_read=25
                ),
                headers={
                    "User-Agent": "Hermes-Agent/1.0",
                    "Accept-Encoding": "gzip, deflate"  # 启用压缩
                }
            )
        return cls._session
    
    @classmethod
    async def get(cls, url: str, **kwargs) -> dict:
        session = await cls.get_session()
        async with session.get(url, **kwargs) as resp:
            resp.raise_for_status()
            return await resp.json()
    
    @classmethod
    async def close(cls):
        if cls._session and not cls._session.closed:
            await cls._session.close()

优化策略二:并发工具调用

# hermes/tools/concurrent_caller.py
import asyncio
from typing import List, Dict, Any, Callable, Coroutine

class ConcurrentToolCaller:
    """
    并发工具调用器
    
    将原本串行的工具调用改为并行,
    在工具间无依赖时可将总时间从 N*T 降低至 max(T)
    """
    
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call_all(
        self,
        tool_calls: List[Dict[str, Any]]
    ) -> List[Any]:
        """
        并发执行所有工具调用
        
        tool_calls 格式:
        [
          {"tool": search_func, "args": {"query": "..."}, "id": "search_1"},
          {"tool": fetch_func,  "args": {"url": "..."},   "id": "fetch_1"},
        ]
        """
        tasks = [
            self._call_with_limit(call["tool"], call["args"], call.get("id", str(i)))
            for i, call in enumerate(tool_calls)
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        output = []
        for result, call in zip(results, tool_calls):
            if isinstance(result, Exception):
                output.append({
                    "id": call.get("id"),
                    "success": False,
                    "error": str(result),
                    "result": None
                })
            else:
                output.append({
                    "id": call.get("id"),
                    "success": True,
                    "error": None,
                    "result": result
                })
        return output
    
    async def _call_with_limit(self, tool_fn: Callable, args: Dict, call_id: str):
        async with self.semaphore:
            return await tool_fn(**args)


# 示例:将串行搜索改为并发搜索
async def search_parallel(queries: List[str]) -> List[Dict]:
    caller = ConcurrentToolCaller(max_concurrent=5)
    
    tool_calls = [
        {"tool": web_search, "args": {"query": q}, "id": f"search_{i}"}
        for i, q in enumerate(queries)
    ]
    
    # 串行版本:总时间 = N × 单次搜索时间(~500ms)
    # 并发版本:总时间 ≈ 单次搜索时间(~500ms)
    return await caller.call_all(tool_calls)

35.4 缓存中间结果

缓存是性能优化中投入产出比最高的手段,尤其适合重复查询和昂贵计算。

多层缓存架构

# hermes/cache/multi_level_cache.py
import hashlib
import json
import time
import asyncio
from typing import Any, Optional, Callable
from functools import wraps

class MultiLevelCache:
    """
    多层缓存:L1(内存)→ L2(Redis)→ L3(磁盘)
    
    命中率优先级:L1 > L2 > L3 > 真实请求
    """
    
    def __init__(
        self,
        l1_max_size: int = 1000,    # 内存缓存条目数
        l1_ttl: int = 300,           # 内存缓存 5 分钟
        l2_ttl: int = 3600,          # Redis 缓存 1 小时
        l3_ttl: int = 86400,         # 磁盘缓存 24 小时
    ):
        self.l1 = {}          # 简单内存字典(生产用 LRU)
        self.l1_ttl = l1_ttl
        self.l2_ttl = l2_ttl
        self.l3_ttl = l3_ttl
        self._redis = None    # 懒加载
        self._stats = {"l1_hits": 0, "l2_hits": 0, "l3_hits": 0, "misses": 0}
    
    def _make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """生成缓存键(函数名+参数的哈希)"""
        payload = json.dumps({
            "fn": func_name,
            "args": args,
            "kwargs": kwargs
        }, sort_keys=True, default=str)
        return hashlib.sha256(payload.encode()).hexdigest()[:16]
    
    async def get(self, key: str) -> Optional[Any]:
        # L1: 内存
        if key in self.l1:
            entry = self.l1[key]
            if time.time() < entry["expires_at"]:
                self._stats["l1_hits"] += 1
                return entry["value"]
            del self.l1[key]
        
        # L2: Redis
        redis = await self._get_redis()
        if redis:
            raw = await redis.get(key)
            if raw:
                value = json.loads(raw)
                # 回填 L1
                self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
                self._stats["l2_hits"] += 1
                return value
        
        self._stats["misses"] += 1
        return None
    
    async def set(self, key: str, value: Any):
        # 写入 L1
        self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
        
        # 写入 L2
        redis = await self._get_redis()
        if redis:
            await redis.setex(key, self.l2_ttl, json.dumps(value, default=str))
    
    def cached(self, ttl: Optional[int] = None):
        """函数级缓存装饰器"""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                key = self._make_key(func.__name__, args, kwargs)
                cached_result = await self.get(key)
                if cached_result is not None:
                    return cached_result
                result = await func(*args, **kwargs)
                await self.set(key, result)
                return result
            return wrapper
        return decorator
    
    def print_stats(self):
        total = sum(self._stats.values())
        if total == 0:
            return
        print(f"缓存命中率统计:")
        print(f"  L1 (内存): {self._stats['l1_hits']/total*100:.1f}%")
        print(f"  L2 (Redis): {self._stats['l2_hits']/total*100:.1f}%")
        print(f"  未命中: {self._stats['misses']/total*100:.1f}%")
    
    async def _get_redis(self):
        if self._redis is None:
            try:
                import aioredis
                self._redis = await aioredis.create_redis_pool("redis://localhost")
            except Exception:
                pass  # Redis 不可用时降级为仅 L1
        return self._redis


# 全局缓存实例
cache = MultiLevelCache()

class WebSearchSkill:
    
    @cache.cached(ttl=3600)  # 搜索结果缓存 1 小时
    async def search(self, query: str, engine: str = "google") -> List[Dict]:
        # 真实搜索请求(昂贵)
        return await self._do_real_search(query, engine)

35.5 内存占用优化

内存问题检测

# hermes/profiling/memory_profiler.py
import tracemalloc
import gc
import sys
from typing import List, Tuple

class SkillMemoryProfiler:
    """Skill 内存占用分析器"""
    
    def __init__(self):
        self.snapshots: List = []
    
    def start(self):
        tracemalloc.start(25)  # 记录 25 层调用栈
        gc.collect()
    
    def snapshot(self, label: str = ""):
        snap = tracemalloc.take_snapshot()
        self.snapshots.append((label, snap))
        return snap
    
    def compare(self, from_label: str, to_label: str) -> List[str]:
        """比较两个快照之间的内存变化"""
        from_snap = next(s for l, s in self.snapshots if l == from_label)
        to_snap = next(s for l, s in self.snapshots if l == to_label)
        
        top_stats = to_snap.compare_to(from_snap, "lineno")
        
        lines = [f"内存变化 ({from_label} → {to_label}):"]
        for stat in top_stats[:10]:
            lines.append(f"  {stat}")
        return lines
    
    def get_top_allocators(self, top_n: int = 10) -> str:
        """获取当前内存分配最多的代码位置"""
        snap = tracemalloc.take_snapshot()
        top_stats = snap.statistics("lineno")
        
        lines = [f"内存分配 Top {top_n}:"]
        for stat in top_stats[:top_n]:
            lines.append(f"  {stat.size/1024:.1f} KB - {stat.traceback.format()[0]}")
        return "\n".join(lines)
    
    def stop(self):
        tracemalloc.stop()


# 内存优化技巧
class MemoryOptimizedSkill:
    """展示内存优化技术的 Skill"""
    
    # 使用 __slots__ 减少实例内存(约 40-60% 节省)
    __slots__ = ["name", "config", "_cache"]
    
    def __init__(self, name: str, config: dict):
        self.name = name
        self.config = config
        self._cache = {}
    
    async def process_large_dataset(self, data_source: str):
        """使用生成器处理大数据,避免全量加载到内存"""
        async for chunk in self._stream_data(data_source):
            result = await self._process_chunk(chunk)
            yield result  # 生成器,不积累结果
            del chunk, result  # 显式释放
            gc.collect()  # 大数据处理时定期 GC
    
    async def _stream_data(self, source: str):
        """流式读取,避免一次性加载全部数据"""
        async with aiofiles.open(source) as f:
            buffer = []
            async for line in f:
                buffer.append(line)
                if len(buffer) >= 1000:
                    yield buffer
                    buffer = []
            if buffer:
                yield buffer

35.6 性能基准测试工具搭建

基准测试框架

# hermes/benchmarks/skill_benchmark.py
import asyncio
import statistics
import time
from typing import Callable, Dict, Any, List
from dataclasses import dataclass

@dataclass
class BenchmarkResult:
    """基准测试结果"""
    name: str
    iterations: int
    mean_ms: float
    median_ms: float
    p95_ms: float
    p99_ms: float
    min_ms: float
    max_ms: float
    throughput_rps: float  # 每秒请求数
    errors: int

class SkillBenchmark:
    """Skill 性能基准测试工具"""
    
    async def run(
        self,
        name: str,
        target_func: Callable,
        args: Dict[str, Any] = None,
        iterations: int = 100,
        warmup: int = 10,
        concurrency: int = 1
    ) -> BenchmarkResult:
        """
        运行基准测试
        
        Args:
            name: 测试名称
            target_func: 被测试的异步函数
            args: 函数参数
            iterations: 测试迭代次数
            warmup: 预热次数(不计入统计)
            concurrency: 并发数
        """
        args = args or {}
        
        # 预热
        print(f"预热中 ({warmup} 次)...")
        for _ in range(warmup):
            try:
                await target_func(**args)
            except Exception:
                pass
        
        # 正式测试
        print(f"基准测试: {name} ({iterations} 次, 并发={concurrency})...")
        durations = []
        errors = 0
        start_total = time.time()
        
        semaphore = asyncio.Semaphore(concurrency)
        
        async def single_run():
            nonlocal errors
            async with semaphore:
                t_start = time.perf_counter()
                try:
                    await target_func(**args)
                    return (time.perf_counter() - t_start) * 1000
                except Exception as e:
                    errors += 1
                    return None
        
        tasks = [single_run() for _ in range(iterations)]
        results = await asyncio.gather(*tasks)
        
        durations = [r for r in results if r is not None]
        total_time = time.time() - start_total
        
        if not durations:
            raise RuntimeError("所有测试均失败")
        
        durations.sort()
        
        result = BenchmarkResult(
            name=name,
            iterations=iterations,
            mean_ms=statistics.mean(durations),
            median_ms=statistics.median(durations),
            p95_ms=durations[int(len(durations) * 0.95)],
            p99_ms=durations[int(len(durations) * 0.99)],
            min_ms=min(durations),
            max_ms=max(durations),
            throughput_rps=iterations / total_time,
            errors=errors
        )
        
        self._print_result(result)
        return result
    
    def _print_result(self, r: BenchmarkResult):
        print(f"\n基准测试结果: {r.name}")
        print(f"  迭代次数: {r.iterations} (错误: {r.errors})")
        print(f"  均值:   {r.mean_ms:.2f}ms")
        print(f"  中位数: {r.median_ms:.2f}ms")
        print(f"  P95:    {r.p95_ms:.2f}ms")
        print(f"  P99:    {r.p99_ms:.2f}ms")
        print(f"  最小值: {r.min_ms:.2f}ms")
        print(f"  最大值: {r.max_ms:.2f}ms")
        print(f"  吞吐量: {r.throughput_rps:.1f} req/s")
    
    async def compare(
        self,
        baseline: Callable,
        optimized: Callable,
        args: Dict = None,
        iterations: int = 100
    ) -> Dict:
        """对比基准版本和优化版本的性能"""
        print("对比测试:基准版本 vs 优化版本")
        
        base_result = await self.run("基准版本", baseline, args, iterations)
        opt_result  = await self.run("优化版本", optimized, args, iterations)
        
        improvement = (base_result.mean_ms - opt_result.mean_ms) / base_result.mean_ms * 100
        
        print(f"\n性能提升对比:")
        print(f"  均值改进: {improvement:.1f}%")
        print(f"  {'提升' if improvement > 0 else '退化'}: "
              f"{base_result.mean_ms:.1f}ms → {opt_result.mean_ms:.1f}ms")
        
        return {
            "baseline": base_result,
            "optimized": opt_result,
            "improvement_pct": improvement
        }


# 使用示例
async def run_benchmarks():
    benchmark = SkillBenchmark()
    
    # 单项测试
    await benchmark.run(
        name="web_search_cached",
        target_func=web_search_skill.search,
        args={"query": "Python asyncio best practices"},
        iterations=100,
        warmup=5,
        concurrency=10
    )
    
    # 对比测试
    await benchmark.compare(
        baseline=web_search_serial,
        optimized=web_search_parallel,
        args={"queries": ["python", "asyncio", "performance"]},
        iterations=50
    )

性能优化检查清单

性能优化 Checklist

□ 已使用 SkillProfiler 定位实际瓶颈(而非凭感觉)
□ HTTP 客户端使用连接池(aiohttp.TCPConnector)
□ 重复的工具调用已添加缓存(@cache.cached)
□ 独立的工具调用已改为并发(asyncio.gather)
□ 没有在 async 函数中调用同步阻塞操作
□ 大数据处理使用生成器/流式处理
□ 高频创建的对象使用 __slots__
□ 生产环境已运行基准测试并记录基线
□ P95/P99 延迟符合 SLA 要求
□ 内存使用已通过 tracemalloc 验证无泄漏

本章小结

性能优化的核心原则:先测量,后优化;优化最慢的那部分,而不是最容易优化的部分

  1. 执行时间分布分析SkillProfiler 提供 span 级别的精确测量,让你用数据找到真正的瓶颈
  2. 工具调用延迟优化:HTTP 连接复用、DNS 缓存、并发调用,可将工具调用时间降低 50-90%
  3. 缓存中间结果:多层缓存(L1/L2/L3)投入产出比最高,避免重复的昂贵调用
  4. 并发工具调用:将串行 N×T 改为并发 max(T),是最直接的提速手段
  5. 内存优化tracemalloc__slots__、流式处理三板斧
  6. 基准测试:用数据说话,记录基线,量化优化效果

思考题

  1. 如果 LLM 推理是最大瓶颈(占总时间 80%),缓存在这里能帮多大忙?有哪些不适合缓存的场景?
  2. 并发工具调用时,如果其中一个工具的错误率很高,会对整体延迟产生什么影响?如何设计"快速失败"机制?
  3. 在多租户 Hermes 部署中,一个 Skill 的性能问题是否会影响其他租户?如何实现性能隔离?
  4. 设计一个持续性能监控方案:如何在生产环境中实时检测性能退化(regression),并自动告警?
本章评分
4.8  / 5  (3 评分)

💬 留言讨论