第 58 章

Token 预算管理与工具集优化

第58章:Token 预算管理与工具集优化

Token 是 Agent 的燃料,预算是行程的上限。一个没有 Token 预算意识的 Agent,就像开车不看油表——等到上下文窗口溢出才发现已经无路可走。


58.1 Token 预算框架

为什么需要 Token 预算管理

Hermes Agent 的上下文窗口是有限资源。以 Claude 3.5 Sonnet 为例,上下文窗口为 200K tokens,但这并不意味着可以随意填充。原因如下:

Token 预算分层架构

总 Token 预算(例:100,000 tokens)
├── 系统提示(System Prompt):5,000 tokens(5%)
├── 任务描述与用户输入:2,000 tokens(2%)
├── MEMORY.md 内容:3,000 tokens(3%)
├── 工具定义(Schema):8,000 tokens(8%)
├── 对话历史(压缩后):20,000 tokens(20%)
├── 工具调用结果:30,000 tokens(30%)
├── 当前推理缓冲:22,000 tokens(22%)
└── 安全余量(防止溢出):10,000 tokens(10%)

各任务类型的推荐预算配置

任务类型 系统提示 工具定义 历史记录 工具结果 推理缓冲
简单问答 2K 1K 5K 5K 10K
代码生成 3K 3K 10K 20K 15K
数据分析 3K 5K 8K 40K 15K
文档撰写 4K 2K 15K 10K 20K
多步研究 5K 8K 20K 30K 20K
系统运维 3K 6K 10K 35K 15K

58.2 动态工具加载

为什么要动态加载工具

Hermes Agent 支持数十种工具,但每个工具的 Schema 定义平均占用 200-500 tokens。如果一次性加载所有工具:

工具总数:50个 × 平均 300 tokens = 15,000 tokens
占总预算:15%(以 100K 预算为例)

而实际上,一次任务通常只用到 3-5 个工具。动态加载可以节省 70-80% 的工具定义 Token。

工具分类与按需加载策略

from typing import Dict, List, Optional, Set
import json

# 工具元数据注册表
TOOL_REGISTRY = {
    # 搜索类工具
    "web_search": {
        "category": "search",
        "token_cost": 280,
        "triggers": ["搜索", "查找", "search", "find", "look up", "research"],
        "schema": {...}
    },
    "arxiv_search": {
        "category": "search",
        "token_cost": 320,
        "triggers": ["论文", "paper", "arxiv", "academic", "研究"],
        "schema": {...}
    },
    # 代码类工具
    "code_executor": {
        "category": "code",
        "token_cost": 450,
        "triggers": ["运行", "执行", "代码", "run", "execute", "code", "script"],
        "schema": {...}
    },
    "code_linter": {
        "category": "code",
        "token_cost": 280,
        "triggers": ["检查", "lint", "审查", "review", "analyze code"],
        "schema": {...}
    },
    # 文件类工具
    "file_reader": {
        "category": "file",
        "token_cost": 220,
        "triggers": ["读取", "read", "open file", "load file", "文件内容"],
        "schema": {...}
    },
    "file_writer": {
        "category": "file",
        "token_cost": 240,
        "triggers": ["写入", "保存", "write", "save", "create file"],
        "schema": {...}
    },
    # 数据类工具
    "sql_executor": {
        "category": "data",
        "token_cost": 380,
        "triggers": ["数据库", "SQL", "查询", "database", "query"],
        "schema": {...}
    },
    "csv_analyzer": {
        "category": "data",
        "token_cost": 300,
        "triggers": ["CSV", "表格", "数据分析", "spreadsheet", "data"],
        "schema": {...}
    },
}

class DynamicToolLoader:
    """
    动态工具加载器
    根据任务描述按需加载工具,最小化 Token 消耗
    """
    
    def __init__(self, token_budget: int = 8000):
        self.token_budget = token_budget
        self.loaded_tools: Dict[str, dict] = {}
        self.always_loaded: Set[str] = set()  # 强制预加载的工具
    
    def set_always_loaded(self, tool_names: List[str]):
        """设置始终加载的核心工具(如通用工具)"""
        self.always_loaded = set(tool_names)
    
    def infer_needed_tools(self, task_description: str) -> List[str]:
        """
        根据任务描述推断需要的工具
        使用关键词匹配 + 类别推断
        """
        task_lower = task_description.lower()
        needed = set(self.always_loaded)
        
        for tool_name, meta in TOOL_REGISTRY.items():
            for trigger in meta["triggers"]:
                if trigger.lower() in task_lower:
                    needed.add(tool_name)
                    # 自动加载同类别工具(通常相关)
                    category = meta["category"]
                    for other_tool, other_meta in TOOL_REGISTRY.items():
                        if other_meta["category"] == category:
                            needed.add(other_tool)
                    break
        
        return list(needed)
    
    def load_tools_for_task(
        self, 
        task_description: str,
        explicit_tools: Optional[List[str]] = None
    ) -> Dict[str, dict]:
        """
        为特定任务加载工具集
        
        Args:
            task_description: 任务描述文本
            explicit_tools: 明确指定要加载的工具名称列表
        
        Returns:
            已加载工具的字典 {tool_name: schema}
        """
        if explicit_tools:
            needed_tools = set(explicit_tools) | self.always_loaded
        else:
            needed_tools = set(self.infer_needed_tools(task_description))
        
        # 按 Token 成本排序,优先加载低成本工具
        sorted_tools = sorted(
            needed_tools,
            key=lambda t: TOOL_REGISTRY.get(t, {}).get("token_cost", 999)
        )
        
        loaded = {}
        total_tokens = 0
        
        for tool_name in sorted_tools:
            if tool_name not in TOOL_REGISTRY:
                continue
            
            tool_meta = TOOL_REGISTRY[tool_name]
            cost = tool_meta["token_cost"]
            
            if total_tokens + cost <= self.token_budget:
                loaded[tool_name] = tool_meta["schema"]
                total_tokens += cost
            else:
                import logging
                logging.getLogger('agent').warning(
                    f"Tool {tool_name} skipped: would exceed token budget "
                    f"({total_tokens + cost} > {self.token_budget})"
                )
        
        self.loaded_tools = loaded
        return loaded
    
    def get_token_usage(self) -> dict:
        """获取当前工具集的 Token 使用统计"""
        total = sum(
            TOOL_REGISTRY[name]["token_cost"] 
            for name in self.loaded_tools 
            if name in TOOL_REGISTRY
        )
        return {
            "loaded_tools": list(self.loaded_tools.keys()),
            "tool_count": len(self.loaded_tools),
            "token_cost": total,
            "budget_used_pct": f"{total / self.token_budget * 100:.1f}%"
        }
    
    def unload_tool(self, tool_name: str):
        """卸载不再需要的工具,释放 Token 空间"""
        if tool_name in self.loaded_tools:
            del self.loaded_tools[tool_name]

# 使用示例
loader = DynamicToolLoader(token_budget=8000)
loader.set_always_loaded(["file_reader"])  # 文件读取工具始终加载

task = "帮我搜索最新的 AI 论文,并将结果保存到 CSV 文件"
tools = loader.load_tools_for_task(task)

print("已加载工具:", loader.get_token_usage())
# 输出示例:
# 已加载工具: {
#   'loaded_tools': ['file_reader', 'web_search', 'arxiv_search', 'csv_analyzer', 'file_writer'],
#   'tool_count': 5,
#   'token_cost': 1360,
#   'budget_used_pct': '17.0%'
# }

58.3 MEMORY.md 瘦身策略

MEMORY.md 的 Token 消耗问题

Hermes Agent 的 MEMORY.md 文件随着使用不断膨胀,可能达到 5000-10000 tokens。每次对话都会全量注入,造成大量浪费。

瘦身策略一:分级存储

# MEMORY.md 分级结构

## 热记忆(Hot Memory)- 每次必读(≤500 tokens)
- 用户基本偏好
- 当前进行中的项目(最多3个)
- 最近的关键决策

## 温记忆(Warm Memory)- 按需读取(≤2000 tokens)
- 项目详细背景
- 工具使用历史摘要
- 常用配置参数

## 冷记忆(Cold Memory)- 归档文件(无限制)
- 历史会话摘要
- 已完成项目记录
- 错误分析报告

瘦身策略二:自动摘要压缩

import re
from typing import Tuple

class MemoryOptimizer:
    """
    MEMORY.md 自动优化器
    监控 Token 使用并触发压缩
    """
    
    def __init__(self, memory_path: str, token_limit: int = 3000):
        self.memory_path = memory_path
        self.token_limit = token_limit
    
    def estimate_tokens(self, text: str) -> int:
        """快速估算 Token 数(约 4 字符/token)"""
        return len(text) // 4
    
    def read_memory(self) -> str:
        with open(self.memory_path, 'r', encoding='utf-8') as f:
            return f.read()
    
    def write_memory(self, content: str):
        with open(self.memory_path, 'w', encoding='utf-8') as f:
            f.write(content)
    
    def extract_sections(self, content: str) -> dict:
        """提取各个 markdown 章节"""
        sections = {}
        current_section = "preamble"
        current_content = []
        
        for line in content.split('\n'):
            if line.startswith('## '):
                if current_content:
                    sections[current_section] = '\n'.join(current_content)
                current_section = line[3:].strip()
                current_content = [line]
            else:
                current_content.append(line)
        
        if current_content:
            sections[current_section] = '\n'.join(current_content)
        
        return sections
    
    def compress_old_entries(self, content: str, keep_days: int = 7) -> str:
        """
        压缩旧条目:将超过 keep_days 天的详细记录替换为摘要
        """
        from datetime import datetime, timedelta
        cutoff_date = datetime.now() - timedelta(days=keep_days)
        
        # 匹配日期格式的行(如 2024-01-15)
        date_pattern = re.compile(r'(\d{4}-\d{2}-\d{2})')
        lines = content.split('\n')
        compressed_lines = []
        skip_block = False
        archived_count = 0
        
        for line in lines:
            match = date_pattern.search(line)
            if match:
                try:
                    line_date = datetime.strptime(match.group(1), '%Y-%m-%d')
                    if line_date < cutoff_date:
                        skip_block = True
                        archived_count += 1
                        continue
                    else:
                        skip_block = False
                except ValueError:
                    pass
            
            if not skip_block:
                compressed_lines.append(line)
        
        if archived_count > 0:
            compressed_lines.append(f"\n<!-- {archived_count} old entries archived -->")
        
        return '\n'.join(compressed_lines)
    
    def auto_trim(self, llm_summarizer=None) -> Tuple[int, int]:
        """
        自动裁剪 MEMORY.md 到 Token 限制以内
        
        Returns:
            (before_tokens, after_tokens)
        """
        content = self.read_memory()
        before_tokens = self.estimate_tokens(content)
        
        if before_tokens <= self.token_limit:
            return before_tokens, before_tokens
        
        # 第一步:压缩旧条目
        content = self.compress_old_entries(content)
        
        # 第二步:如果还超出,使用 LLM 摘要(如果提供)
        if self.estimate_tokens(content) > self.token_limit and llm_summarizer:
            content = llm_summarizer(content, max_tokens=self.token_limit)
        
        # 第三步:强制截断(最后手段)
        while self.estimate_tokens(content) > self.token_limit:
            lines = content.split('\n')
            # 删除最旧的非标题行
            for i in range(len(lines) - 1, -1, -1):
                if lines[i] and not lines[i].startswith('#'):
                    lines.pop(i)
                    break
            content = '\n'.join(lines)
        
        self.write_memory(content)
        after_tokens = self.estimate_tokens(content)
        return before_tokens, after_tokens

58.4 系统提示压缩技巧

原始系统提示 vs 压缩版对比

原始版(约 800 tokens):

You are Hermes, an autonomous AI agent developed by NousResearch. 
You are designed to help users accomplish complex tasks by breaking 
them down into smaller steps and using various tools available to you.
You should always be helpful, accurate, and efficient. When you are 
not sure about something, you should ask the user for clarification
rather than making assumptions. You have access to the following tools:
[TOOLS LIST]
When using tools, always verify the output before proceeding...
[更多规则描述...]

压缩版(约 200 tokens):

You: Hermes Agent (NousResearch). Mode: autonomous task execution.
Rules: ①Break tasks into steps ②Verify tool outputs ③Ask when unclear ④Log all actions
Tools: {TOOLS_PLACEHOLDER}
Format: JSON for structured data, markdown for reports.

压缩原则

原则 示例
用列表代替段落 ①②③ 代替 "First... Second... Third..."
删除冗余礼貌语 删除 "You should always be..." 等废话
占位符替代内容 {TOOLS_PLACEHOLDER} 动态注入
用缩写 "ctx" 代替 "context","req" 代替 "requirement"
删除显而易见的规则 LLM 本身就会做的事不需要重复说明
class PromptCompressor:
    """系统提示压缩器"""
    
    REDUNDANT_PHRASES = [
        "You should always",
        "It is important that you",
        "Please make sure to",
        "You must",
        "Always remember to",
        "Be sure to",
    ]
    
    @staticmethod
    def compress(prompt: str) -> str:
        """基础文本压缩"""
        lines = prompt.split('\n')
        compressed = []
        
        for line in lines:
            # 跳过空行(保留一个分隔符)
            if not line.strip():
                if compressed and compressed[-1] != '':
                    compressed.append('')
                continue
            
            # 删除冗余前缀
            cleaned = line
            for phrase in PromptCompressor.REDUNDANT_PHRASES:
                cleaned = cleaned.replace(phrase, "")
            
            # 压缩多个空格
            import re
            cleaned = re.sub(r'  +', ' ', cleaned).strip()
            
            if cleaned:
                compressed.append(cleaned)
        
        return '\n'.join(compressed)
    
    @staticmethod
    def estimate_savings(original: str, compressed: str) -> dict:
        orig_tokens = len(original) // 4
        comp_tokens = len(compressed) // 4
        return {
            "original_tokens": orig_tokens,
            "compressed_tokens": comp_tokens,
            "saved_tokens": orig_tokens - comp_tokens,
            "compression_ratio": f"{(1 - comp_tokens/orig_tokens)*100:.1f}%"
        }

58.5 Token 预算监控代码实现

import time
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from collections import deque

@dataclass
class TokenUsageRecord:
    timestamp: float
    session_id: str
    component: str  # 'system_prompt', 'tools', 'history', 'tool_results', 'output'
    tokens_used: int
    operation: str  # 'input' or 'output'

class TokenBudgetMonitor:
    """
    Token 预算实时监控器
    跟踪各组件的 Token 消耗,触发预警
    """
    
    def __init__(self, total_budget: int = 100_000):
        self.total_budget = total_budget
        self.current_usage: Dict[str, int] = {
            'system_prompt': 0,
            'task_description': 0,
            'memory': 0,
            'tools': 0,
            'history': 0,
            'tool_results': 0,
            'output_buffer': 0,
        }
        self.history: deque = deque(maxlen=1000)
        self.alert_threshold = 0.85  # 85% 时触发警告
        self.logger = logging.getLogger('agent.token_monitor')
    
    def record_usage(
        self, 
        component: str, 
        tokens: int, 
        session_id: str = "",
        operation: str = "input"
    ):
        """记录一次 Token 使用"""
        self.current_usage[component] = self.current_usage.get(component, 0) + tokens
        
        record = TokenUsageRecord(
            timestamp=time.time(),
            session_id=session_id,
            component=component,
            tokens_used=tokens,
            operation=operation
        )
        self.history.append(record)
        
        # 检查是否超出预警阈值
        self._check_alerts()
    
    def _check_alerts(self):
        total_used = sum(self.current_usage.values())
        usage_ratio = total_used / self.total_budget
        
        if usage_ratio >= 1.0:
            self.logger.error(
                f"TOKEN BUDGET EXCEEDED: {total_used}/{self.total_budget} "
                f"({usage_ratio*100:.1f}%)"
            )
        elif usage_ratio >= self.alert_threshold:
            self.logger.warning(
                f"Token budget at {usage_ratio*100:.1f}%: "
                f"{total_used}/{self.total_budget}"
            )
    
    def get_budget_status(self) -> dict:
        """获取当前预算状态报告"""
        total_used = sum(self.current_usage.values())
        remaining = self.total_budget - total_used
        
        return {
            "total_budget": self.total_budget,
            "total_used": total_used,
            "remaining": remaining,
            "usage_percentage": f"{total_used/self.total_budget*100:.1f}%",
            "breakdown": {
                component: {
                    "tokens": tokens,
                    "percentage": f"{tokens/self.total_budget*100:.1f}%"
                }
                for component, tokens in self.current_usage.items()
            },
            "alert": total_used > self.total_budget * self.alert_threshold
        }
    
    def reset_for_new_session(self):
        """新会话开始时重置计数器"""
        for key in self.current_usage:
            self.current_usage[key] = 0
    
    def suggest_optimizations(self) -> List[str]:
        """根据使用情况提出优化建议"""
        suggestions = []
        total_used = sum(self.current_usage.values())
        
        # 检查各组件比例
        for component, tokens in self.current_usage.items():
            ratio = tokens / total_used if total_used > 0 else 0
            
            if component == 'tools' and ratio > 0.15:
                suggestions.append(
                    f"工具定义占 {ratio*100:.1f}%,建议启用动态工具加载"
                )
            elif component == 'history' and ratio > 0.25:
                suggestions.append(
                    f"对话历史占 {ratio*100:.1f}%,建议启用滚动摘要压缩"
                )
            elif component == 'tool_results' and ratio > 0.35:
                suggestions.append(
                    f"工具结果占 {ratio*100:.1f}%,建议只保留关键信息"
                )
            elif component == 'memory' and ratio > 0.05:
                suggestions.append(
                    f"MEMORY.md 占 {ratio*100:.1f}%,建议执行瘦身优化"
                )
        
        return suggestions


# 实战使用示例
monitor = TokenBudgetMonitor(total_budget=100_000)

# 模拟各组件的 Token 注入
monitor.record_usage('system_prompt', 3200, 'sess_001')
monitor.record_usage('memory', 2800, 'sess_001')
monitor.record_usage('tools', 4500, 'sess_001')
monitor.record_usage('task_description', 450, 'sess_001')
monitor.record_usage('history', 18000, 'sess_001')
monitor.record_usage('tool_results', 32000, 'sess_001')

status = monitor.get_budget_status()
print(f"Token 使用率: {status['usage_percentage']}")
print(f"剩余 Token: {status['remaining']:,}")

suggestions = monitor.suggest_optimizations()
for suggestion in suggestions:
    print(f"优化建议: {suggestion}")

58.6 不同任务类型的推荐预算配置

预算配置最佳实践

# 预设预算配置模板
BUDGET_PROFILES = {
    "simple_qa": {
        "total": 20_000,
        "system_prompt": 1_000,
        "memory": 500,
        "tools": 1_000,
        "history": 5_000,
        "tool_results": 5_000,
        "output_buffer": 5_000,
        "safety_margin": 2_500,
    },
    "code_generation": {
        "total": 60_000,
        "system_prompt": 2_000,
        "memory": 1_000,
        "tools": 4_000,
        "history": 10_000,
        "tool_results": 25_000,
        "output_buffer": 12_000,
        "safety_margin": 6_000,
    },
    "data_analysis": {
        "total": 80_000,
        "system_prompt": 2_500,
        "memory": 1_500,
        "tools": 5_000,
        "history": 8_000,
        "tool_results": 45_000,
        "output_buffer": 12_000,
        "safety_margin": 6_000,
    },
    "long_research": {
        "total": 150_000,
        "system_prompt": 4_000,
        "memory": 3_000,
        "tools": 8_000,
        "history": 30_000,
        "tool_results": 70_000,
        "output_buffer": 25_000,
        "safety_margin": 10_000,
    },
}

def get_budget_profile(task_type: str) -> dict:
    """获取任务类型对应的预算配置"""
    return BUDGET_PROFILES.get(task_type, BUDGET_PROFILES["simple_qa"])

动态预算调整策略

当任务执行过程中发现预算不足时,Agent 应按以下优先级削减:

  1. 压缩历史记录:将早期对话替换为摘要(节省 60-80%)
  2. 卸载非核心工具:动态移除当前步骤不需要的工具定义
  3. 截断工具结果:只保留工具输出的前 N 行关键信息
  4. 压缩 MEMORY.md:临时只加载热记忆部分
  5. 降级推理深度:减少 CoT(Chain of Thought)的详细程度

本章小结

Token 预算管理是 Hermes Agent 生产化的核心能力:

  1. 分层预算框架:将总预算分配给各组件,并设置安全余量,避免上下文溢出
  2. 动态工具加载:根据任务描述按需加载工具,节省 70-80% 的工具定义 Token
  3. MEMORY.md 瘦身:分级存储 + 自动摘要压缩,控制记忆 Token 在 3K 以内
  4. 系统提示压缩:通过规范化语言和占位符技巧,减少 60-75% 的提示 Token
  5. 实时监控:TokenBudgetMonitor 追踪各组件消耗,在超标前触发优化动作

思考题

  1. 如果任务在执行中途突然需要新工具,但 Token 预算已近上限,如何设计"工具换入换出"机制?
  2. MEMORY.md 中什么样的内容应该永远不被压缩?如何标记这些"锚点记忆"?
  3. 当需要分析一个 100KB 的大文件时,如何在 Token 预算限制下设计分块处理策略?
  4. 对于需要长时间运行(超过 10 轮工具调用)的任务,如何设计滚动上下文窗口?
本章评分
4.5  / 5  (3 评分)

💬 留言讨论