第 35 章
Skill 性能剖析与优化
第35章:Skill 性能剖析与优化
一个 Skill 在开发环境跑得很好,部署到生产后却响应缓慢——这是 Agent 系统中最常见的性能陷阱。性能问题往往不在你预想的地方:你以为是 LLM 推理慢,结果是某个工具调用在等待网络;你以为并发够了,结果是内存碎片导致 GC 频繁触发。本章提供系统化的性能剖析方法论,从测量到优化,每一步都有具体工具和代码。
35.1 性能问题的根本原因分类
在开始优化之前,必须先测量。不测量就优化是最大的浪费。
性能问题分类树
Skill 性能问题
├── 延迟(Latency)问题
│ ├── LLM 推理延迟(首个 token 到达时间)
│ ├── 工具调用延迟(网络 I/O 为主)
│ ├── Skill 加载延迟(冷启动问题)
│ └── 串行化延迟(可并行但未并行)
│
├── 吞吐量(Throughput)问题
│ ├── 并发数不足(semaphore 过小)
│ ├── 事件循环阻塞(同步代码混入 async)
│ └── 连接池耗尽(HTTP/DB 连接不够)
│
└── 资源占用问题
├── 内存泄漏(长生命周期对象积累)
├── CPU 密集型操作阻塞事件循环
└── 磁盘 I/O(日志过度写入)
典型执行时间分布(基准数据)
| 操作类型 | 典型延迟 | 优化上限 |
|---|---|---|
| LLM 推理(本地 7B 模型) | 2000-8000ms | ~1500ms(量化+批处理) |
| LLM API 调用(云端) | 500-3000ms | ~300ms(流式+缓存) |
| HTTP 工具调用 | 100-2000ms | ~50ms(连接复用+并发) |
| 本地数据库查询 | 1-100ms | <1ms(索引+连接池) |
| 文件读写 | 0.1-50ms | <0.1ms(内存映射) |
| Skill 序列化/反序列化 | 1-20ms | <1ms(MessagePack) |
35.2 Skill 执行时间分布分析
内置分析器:SkillProfiler
# hermes/profiling/skill_profiler.py
import time
import asyncio
import functools
import statistics
from contextlib import asynccontextmanager
from typing import Dict, List, Any, Callable
from dataclasses import dataclass, field
import json
@dataclass
class SpanRecord:
"""单次操作的时间记录"""
name: str
start_time: float
end_time: float
metadata: Dict = field(default_factory=dict)
error: str = None
@property
def duration_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
@property
def success(self) -> bool:
return self.error is None
class SkillProfiler:
"""
Skill 执行性能剖析器
使用方式:
1. 装饰器模式:@profiler.trace("my_operation")
2. 上下文管理器:async with profiler.span("my_operation"):
3. 手动记录:profiler.record(name, start, end)
"""
def __init__(self, skill_name: str):
self.skill_name = skill_name
self.spans: List[SpanRecord] = []
self._session_start = time.time()
@asynccontextmanager
async def span(self, name: str, **metadata):
"""异步上下文管理器,自动记录时间"""
start = time.time()
error = None
try:
yield
except Exception as e:
error = str(e)
raise
finally:
end = time.time()
self.spans.append(SpanRecord(
name=name,
start_time=start,
end_time=end,
metadata=metadata,
error=error
))
def trace(self, span_name: str):
"""装饰器:自动 trace 函数执行时间"""
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with self.span(span_name):
return await func(*args, **kwargs)
return wrapper
return decorator
def get_report(self) -> Dict:
"""生成性能报告"""
total_session = (time.time() - self._session_start) * 1000
# 按操作名称聚合
by_name: Dict[str, List[float]] = {}
for span in self.spans:
if span.name not in by_name:
by_name[span.name] = []
by_name[span.name].append(span.duration_ms)
operations = {}
for name, durations in by_name.items():
operations[name] = {
"count": len(durations),
"total_ms": sum(durations),
"mean_ms": statistics.mean(durations),
"median_ms": statistics.median(durations),
"p95_ms": sorted(durations)[int(len(durations) * 0.95)] if len(durations) >= 20 else max(durations),
"max_ms": max(durations),
"min_ms": min(durations),
"pct_of_total": sum(durations) / total_session * 100
}
# 按总时间排序(找出最大瓶颈)
sorted_ops = sorted(
operations.items(),
key=lambda x: x[1]["total_ms"],
reverse=True
)
return {
"skill": self.skill_name,
"session_total_ms": total_session,
"operations": dict(sorted_ops),
"bottleneck": sorted_ops[0][0] if sorted_ops else None,
"errors": [s for s in self.spans if not s.success]
}
def print_report(self):
"""打印可读的性能报告"""
report = self.get_report()
print(f"\n{'='*60}")
print(f"Skill 性能报告: {report['skill']}")
print(f"总会话时间: {report['session_total_ms']:.1f}ms")
print(f"主要瓶颈: {report['bottleneck']}")
print(f"\n{'操作名称':<30} {'调用次数':>6} {'均值(ms)':>10} {'P95(ms)':>10} {'占比':>8}")
print("-" * 70)
for name, stats in report["operations"].items():
print(
f"{name:<30} {stats['count']:>6} "
f"{stats['mean_ms']:>10.1f} "
f"{stats['p95_ms']:>10.1f} "
f"{stats['pct_of_total']:>7.1f}%"
)
print(f"{'='*60}\n")
# 使用示例
profiler = SkillProfiler("research-skill")
class ResearchSkill:
@profiler.trace("llm_inference")
async def _call_llm(self, prompt: str) -> str:
# LLM 推理
pass
async def execute(self, query: str) -> Dict:
async with profiler.span("web_search", query=query):
search_results = await self._search_web(query)
async with profiler.span("result_parsing"):
parsed = self._parse_results(search_results)
async with profiler.span("llm_summarize"):
summary = await self._call_llm(f"总结:{parsed}")
profiler.print_report()
return {"summary": summary}
35.3 工具调用延迟优化
工具调用(Tool Call)通常是 Skill 中延迟最大的部分,主要原因是网络 I/O。
优化策略一:HTTP 连接复用
# hermes/tools/http_client.py
import aiohttp
from typing import Optional
class OptimizedHttpClient:
"""优化的 HTTP 客户端,复用连接,减少握手开销"""
_session: Optional[aiohttp.ClientSession] = None
@classmethod
async def get_session(cls) -> aiohttp.ClientSession:
if cls._session is None or cls._session.closed:
# TCP 连接池配置
connector = aiohttp.TCPConnector(
limit=100, # 最大并发连接数
limit_per_host=30, # 每个 host 最大连接数
ttl_dns_cache=300, # DNS 缓存 5 分钟
use_dns_cache=True,
keepalive_timeout=60, # 保持连接 60 秒
enable_cleanup_closed=True
)
cls._session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=30,
connect=5,
sock_read=25
),
headers={
"User-Agent": "Hermes-Agent/1.0",
"Accept-Encoding": "gzip, deflate" # 启用压缩
}
)
return cls._session
@classmethod
async def get(cls, url: str, **kwargs) -> dict:
session = await cls.get_session()
async with session.get(url, **kwargs) as resp:
resp.raise_for_status()
return await resp.json()
@classmethod
async def close(cls):
if cls._session and not cls._session.closed:
await cls._session.close()
优化策略二:并发工具调用
# hermes/tools/concurrent_caller.py
import asyncio
from typing import List, Dict, Any, Callable, Coroutine
class ConcurrentToolCaller:
"""
并发工具调用器
将原本串行的工具调用改为并行,
在工具间无依赖时可将总时间从 N*T 降低至 max(T)
"""
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def call_all(
self,
tool_calls: List[Dict[str, Any]]
) -> List[Any]:
"""
并发执行所有工具调用
tool_calls 格式:
[
{"tool": search_func, "args": {"query": "..."}, "id": "search_1"},
{"tool": fetch_func, "args": {"url": "..."}, "id": "fetch_1"},
]
"""
tasks = [
self._call_with_limit(call["tool"], call["args"], call.get("id", str(i)))
for i, call in enumerate(tool_calls)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for result, call in zip(results, tool_calls):
if isinstance(result, Exception):
output.append({
"id": call.get("id"),
"success": False,
"error": str(result),
"result": None
})
else:
output.append({
"id": call.get("id"),
"success": True,
"error": None,
"result": result
})
return output
async def _call_with_limit(self, tool_fn: Callable, args: Dict, call_id: str):
async with self.semaphore:
return await tool_fn(**args)
# 示例:将串行搜索改为并发搜索
async def search_parallel(queries: List[str]) -> List[Dict]:
caller = ConcurrentToolCaller(max_concurrent=5)
tool_calls = [
{"tool": web_search, "args": {"query": q}, "id": f"search_{i}"}
for i, q in enumerate(queries)
]
# 串行版本:总时间 = N × 单次搜索时间(~500ms)
# 并发版本:总时间 ≈ 单次搜索时间(~500ms)
return await caller.call_all(tool_calls)
35.4 缓存中间结果
缓存是性能优化中投入产出比最高的手段,尤其适合重复查询和昂贵计算。
多层缓存架构
# hermes/cache/multi_level_cache.py
import hashlib
import json
import time
import asyncio
from typing import Any, Optional, Callable
from functools import wraps
class MultiLevelCache:
"""
多层缓存:L1(内存)→ L2(Redis)→ L3(磁盘)
命中率优先级:L1 > L2 > L3 > 真实请求
"""
def __init__(
self,
l1_max_size: int = 1000, # 内存缓存条目数
l1_ttl: int = 300, # 内存缓存 5 分钟
l2_ttl: int = 3600, # Redis 缓存 1 小时
l3_ttl: int = 86400, # 磁盘缓存 24 小时
):
self.l1 = {} # 简单内存字典(生产用 LRU)
self.l1_ttl = l1_ttl
self.l2_ttl = l2_ttl
self.l3_ttl = l3_ttl
self._redis = None # 懒加载
self._stats = {"l1_hits": 0, "l2_hits": 0, "l3_hits": 0, "misses": 0}
def _make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""生成缓存键(函数名+参数的哈希)"""
payload = json.dumps({
"fn": func_name,
"args": args,
"kwargs": kwargs
}, sort_keys=True, default=str)
return hashlib.sha256(payload.encode()).hexdigest()[:16]
async def get(self, key: str) -> Optional[Any]:
# L1: 内存
if key in self.l1:
entry = self.l1[key]
if time.time() < entry["expires_at"]:
self._stats["l1_hits"] += 1
return entry["value"]
del self.l1[key]
# L2: Redis
redis = await self._get_redis()
if redis:
raw = await redis.get(key)
if raw:
value = json.loads(raw)
# 回填 L1
self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
self._stats["l2_hits"] += 1
return value
self._stats["misses"] += 1
return None
async def set(self, key: str, value: Any):
# 写入 L1
self.l1[key] = {"value": value, "expires_at": time.time() + self.l1_ttl}
# 写入 L2
redis = await self._get_redis()
if redis:
await redis.setex(key, self.l2_ttl, json.dumps(value, default=str))
def cached(self, ttl: Optional[int] = None):
"""函数级缓存装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
key = self._make_key(func.__name__, args, kwargs)
cached_result = await self.get(key)
if cached_result is not None:
return cached_result
result = await func(*args, **kwargs)
await self.set(key, result)
return result
return wrapper
return decorator
def print_stats(self):
total = sum(self._stats.values())
if total == 0:
return
print(f"缓存命中率统计:")
print(f" L1 (内存): {self._stats['l1_hits']/total*100:.1f}%")
print(f" L2 (Redis): {self._stats['l2_hits']/total*100:.1f}%")
print(f" 未命中: {self._stats['misses']/total*100:.1f}%")
async def _get_redis(self):
if self._redis is None:
try:
import aioredis
self._redis = await aioredis.create_redis_pool("redis://localhost")
except Exception:
pass # Redis 不可用时降级为仅 L1
return self._redis
# 全局缓存实例
cache = MultiLevelCache()
class WebSearchSkill:
@cache.cached(ttl=3600) # 搜索结果缓存 1 小时
async def search(self, query: str, engine: str = "google") -> List[Dict]:
# 真实搜索请求(昂贵)
return await self._do_real_search(query, engine)
35.5 内存占用优化
内存问题检测
# hermes/profiling/memory_profiler.py
import tracemalloc
import gc
import sys
from typing import List, Tuple
class SkillMemoryProfiler:
"""Skill 内存占用分析器"""
def __init__(self):
self.snapshots: List = []
def start(self):
tracemalloc.start(25) # 记录 25 层调用栈
gc.collect()
def snapshot(self, label: str = ""):
snap = tracemalloc.take_snapshot()
self.snapshots.append((label, snap))
return snap
def compare(self, from_label: str, to_label: str) -> List[str]:
"""比较两个快照之间的内存变化"""
from_snap = next(s for l, s in self.snapshots if l == from_label)
to_snap = next(s for l, s in self.snapshots if l == to_label)
top_stats = to_snap.compare_to(from_snap, "lineno")
lines = [f"内存变化 ({from_label} → {to_label}):"]
for stat in top_stats[:10]:
lines.append(f" {stat}")
return lines
def get_top_allocators(self, top_n: int = 10) -> str:
"""获取当前内存分配最多的代码位置"""
snap = tracemalloc.take_snapshot()
top_stats = snap.statistics("lineno")
lines = [f"内存分配 Top {top_n}:"]
for stat in top_stats[:top_n]:
lines.append(f" {stat.size/1024:.1f} KB - {stat.traceback.format()[0]}")
return "\n".join(lines)
def stop(self):
tracemalloc.stop()
# 内存优化技巧
class MemoryOptimizedSkill:
"""展示内存优化技术的 Skill"""
# 使用 __slots__ 减少实例内存(约 40-60% 节省)
__slots__ = ["name", "config", "_cache"]
def __init__(self, name: str, config: dict):
self.name = name
self.config = config
self._cache = {}
async def process_large_dataset(self, data_source: str):
"""使用生成器处理大数据,避免全量加载到内存"""
async for chunk in self._stream_data(data_source):
result = await self._process_chunk(chunk)
yield result # 生成器,不积累结果
del chunk, result # 显式释放
gc.collect() # 大数据处理时定期 GC
async def _stream_data(self, source: str):
"""流式读取,避免一次性加载全部数据"""
async with aiofiles.open(source) as f:
buffer = []
async for line in f:
buffer.append(line)
if len(buffer) >= 1000:
yield buffer
buffer = []
if buffer:
yield buffer
35.6 性能基准测试工具搭建
基准测试框架
# hermes/benchmarks/skill_benchmark.py
import asyncio
import statistics
import time
from typing import Callable, Dict, Any, List
from dataclasses import dataclass
@dataclass
class BenchmarkResult:
"""基准测试结果"""
name: str
iterations: int
mean_ms: float
median_ms: float
p95_ms: float
p99_ms: float
min_ms: float
max_ms: float
throughput_rps: float # 每秒请求数
errors: int
class SkillBenchmark:
"""Skill 性能基准测试工具"""
async def run(
self,
name: str,
target_func: Callable,
args: Dict[str, Any] = None,
iterations: int = 100,
warmup: int = 10,
concurrency: int = 1
) -> BenchmarkResult:
"""
运行基准测试
Args:
name: 测试名称
target_func: 被测试的异步函数
args: 函数参数
iterations: 测试迭代次数
warmup: 预热次数(不计入统计)
concurrency: 并发数
"""
args = args or {}
# 预热
print(f"预热中 ({warmup} 次)...")
for _ in range(warmup):
try:
await target_func(**args)
except Exception:
pass
# 正式测试
print(f"基准测试: {name} ({iterations} 次, 并发={concurrency})...")
durations = []
errors = 0
start_total = time.time()
semaphore = asyncio.Semaphore(concurrency)
async def single_run():
nonlocal errors
async with semaphore:
t_start = time.perf_counter()
try:
await target_func(**args)
return (time.perf_counter() - t_start) * 1000
except Exception as e:
errors += 1
return None
tasks = [single_run() for _ in range(iterations)]
results = await asyncio.gather(*tasks)
durations = [r for r in results if r is not None]
total_time = time.time() - start_total
if not durations:
raise RuntimeError("所有测试均失败")
durations.sort()
result = BenchmarkResult(
name=name,
iterations=iterations,
mean_ms=statistics.mean(durations),
median_ms=statistics.median(durations),
p95_ms=durations[int(len(durations) * 0.95)],
p99_ms=durations[int(len(durations) * 0.99)],
min_ms=min(durations),
max_ms=max(durations),
throughput_rps=iterations / total_time,
errors=errors
)
self._print_result(result)
return result
def _print_result(self, r: BenchmarkResult):
print(f"\n基准测试结果: {r.name}")
print(f" 迭代次数: {r.iterations} (错误: {r.errors})")
print(f" 均值: {r.mean_ms:.2f}ms")
print(f" 中位数: {r.median_ms:.2f}ms")
print(f" P95: {r.p95_ms:.2f}ms")
print(f" P99: {r.p99_ms:.2f}ms")
print(f" 最小值: {r.min_ms:.2f}ms")
print(f" 最大值: {r.max_ms:.2f}ms")
print(f" 吞吐量: {r.throughput_rps:.1f} req/s")
async def compare(
self,
baseline: Callable,
optimized: Callable,
args: Dict = None,
iterations: int = 100
) -> Dict:
"""对比基准版本和优化版本的性能"""
print("对比测试:基准版本 vs 优化版本")
base_result = await self.run("基准版本", baseline, args, iterations)
opt_result = await self.run("优化版本", optimized, args, iterations)
improvement = (base_result.mean_ms - opt_result.mean_ms) / base_result.mean_ms * 100
print(f"\n性能提升对比:")
print(f" 均值改进: {improvement:.1f}%")
print(f" {'提升' if improvement > 0 else '退化'}: "
f"{base_result.mean_ms:.1f}ms → {opt_result.mean_ms:.1f}ms")
return {
"baseline": base_result,
"optimized": opt_result,
"improvement_pct": improvement
}
# 使用示例
async def run_benchmarks():
benchmark = SkillBenchmark()
# 单项测试
await benchmark.run(
name="web_search_cached",
target_func=web_search_skill.search,
args={"query": "Python asyncio best practices"},
iterations=100,
warmup=5,
concurrency=10
)
# 对比测试
await benchmark.compare(
baseline=web_search_serial,
optimized=web_search_parallel,
args={"queries": ["python", "asyncio", "performance"]},
iterations=50
)
性能优化检查清单
性能优化 Checklist
□ 已使用 SkillProfiler 定位实际瓶颈(而非凭感觉)
□ HTTP 客户端使用连接池(aiohttp.TCPConnector)
□ 重复的工具调用已添加缓存(@cache.cached)
□ 独立的工具调用已改为并发(asyncio.gather)
□ 没有在 async 函数中调用同步阻塞操作
□ 大数据处理使用生成器/流式处理
□ 高频创建的对象使用 __slots__
□ 生产环境已运行基准测试并记录基线
□ P95/P99 延迟符合 SLA 要求
□ 内存使用已通过 tracemalloc 验证无泄漏
本章小结
性能优化的核心原则:先测量,后优化;优化最慢的那部分,而不是最容易优化的部分。
- 执行时间分布分析:
SkillProfiler提供 span 级别的精确测量,让你用数据找到真正的瓶颈 - 工具调用延迟优化:HTTP 连接复用、DNS 缓存、并发调用,可将工具调用时间降低 50-90%
- 缓存中间结果:多层缓存(L1/L2/L3)投入产出比最高,避免重复的昂贵调用
- 并发工具调用:将串行 N×T 改为并发 max(T),是最直接的提速手段
- 内存优化:
tracemalloc、__slots__、流式处理三板斧 - 基准测试:用数据说话,记录基线,量化优化效果
思考题
- 如果 LLM 推理是最大瓶颈(占总时间 80%),缓存在这里能帮多大忙?有哪些不适合缓存的场景?
- 并发工具调用时,如果其中一个工具的错误率很高,会对整体延迟产生什么影响?如何设计"快速失败"机制?
- 在多租户 Hermes 部署中,一个 Skill 的性能问题是否会影响其他租户?如何实现性能隔离?
- 设计一个持续性能监控方案:如何在生产环境中实时检测性能退化(regression),并自动告警?