第 16 章

Adaptive Thinking:Opus 4.7 的自适应推理与 Interleaved Thinking 实战

第十六章:Adaptive Thinking:动态思维深度与任务自适应策略

16.1 什么是 Adaptive Thinking

Adaptive Thinking(自适应思考)是 Extended Thinking 的进阶使用模式——它不是简单地"开启"或"关闭"思考,而是根据任务的实际复杂度、成本约束和质量要求,动态调整推理深度

从工程角度来看,Adaptive Thinking 是一套系统设计模式,涉及:

  1. 任务复杂度评估:在正式推理前快速判断问题难度
  2. 预算动态分配:根据评估结果分配不同的 budget_tokens
  3. 模型梯级选择:将不同复杂度的任务路由到合适的模型
  4. 质量验证循环:检测输出质量,必要时用更多思考预算重试

这一模式在大规模生产系统中尤为重要——对所有请求都使用最大预算既浪费又低效,而对复杂任务使用零思考又会显著降低质量。

16.2 复杂度分类器的设计

轻量级复杂度探针

import anthropic
from enum import Enum

client = anthropic.Anthropic()

class TaskComplexity(Enum):
    TRIVIAL = "trivial"       # 直接回答,无需推理
    SIMPLE = "simple"         # 简单单步推理
    MEDIUM = "medium"         # 多步推理,中等复杂
    COMPLEX = "complex"       # 深度分析,多角度权衡
    EXPERT = "expert"         # 专家级推理,需要全力思考

def classify_complexity(task: str) -> TaskComplexity:
    """使用 Haiku 快速评估任务复杂度,成本极低"""
    
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",  # 最快最便宜的模型
        max_tokens=20,
        system="""评估任务复杂度,只输出一个词:
trivial(简单事实/定义)
simple(一步推理)
medium(多步分析)
complex(深度权衡)
expert(专家级难题)""",
        messages=[{"role": "user", "content": task}]
    )
    
    raw = response.content[0].text.strip().lower()
    
    # 映射到枚举值
    mapping = {
        "trivial": TaskComplexity.TRIVIAL,
        "simple": TaskComplexity.SIMPLE,
        "medium": TaskComplexity.MEDIUM,
        "complex": TaskComplexity.COMPLEX,
        "expert": TaskComplexity.EXPERT
    }
    
    return mapping.get(raw, TaskComplexity.MEDIUM)

# 示例
tasks = [
    "Python 是什么年份创建的?",
    "解释二叉搜索树的查找时间复杂度",
    "比较 PostgreSQL 和 MongoDB 的适用场景",
    "设计一个处理每秒百万请求的分布式缓存系统",
    "证明 P≠NP"
]

for task in tasks:
    complexity = classify_complexity(task)
    print(f"[{complexity.value}] {task[:50]}")

基于规则的复杂度快速判断

对于延迟极敏感的场景,可以用启发式规则替代 LLM 分类:

import re
from typing import Callable

class ComplexityHeuristics:
    """基于规则的任务复杂度快速判断,无 LLM 调用开销"""
    
    # 复杂度信号词
    EXPERT_SIGNALS = [
        "证明", "推导", "优化算法", "时间复杂度分析",
        "架构设计", "分布式", "并发", "CAP 定理",
        "prove", "derive", "optimize", "complexity analysis",
        "architecture", "distributed", "concurrent"
    ]
    
    COMPLEX_SIGNALS = [
        "比较", "权衡", "评估", "分析优缺点", "设计方案",
        "compare", "trade-off", "evaluate", "pros and cons", "design"
    ]
    
    SIMPLE_SIGNALS = [
        "是什么", "定义", "解释", "举例",
        "what is", "define", "explain", "example"
    ]
    
    @classmethod
    def classify(cls, task: str) -> TaskComplexity:
        task_lower = task.lower()
        
        # 超短任务通常是简单问题
        if len(task) < 30:
            return TaskComplexity.TRIVIAL
        
        # 检查复杂信号
        if any(signal in task_lower for signal in cls.EXPERT_SIGNALS):
            return TaskComplexity.EXPERT if len(task) > 100 else TaskComplexity.COMPLEX
        
        if any(signal in task_lower for signal in cls.COMPLEX_SIGNALS):
            return TaskComplexity.COMPLEX
        
        if any(signal in task_lower for signal in cls.SIMPLE_SIGNALS):
            return TaskComplexity.SIMPLE
        
        # 包含代码或数学公式往往更复杂
        if "```" in task or re.search(r'[∑∏∫∂√]|O\(n', task):
            return TaskComplexity.MEDIUM
        
        return TaskComplexity.MEDIUM  # 默认中等

16.3 自适应推理路由器

from dataclasses import dataclass
from typing import Optional

@dataclass
class ThinkingConfig:
    model: str
    budget_tokens: Optional[int]
    max_tokens: int
    description: str

# 推理配置梯级
THINKING_TIERS = {
    TaskComplexity.TRIVIAL: ThinkingConfig(
        model="claude-haiku-4-5-20251001",
        budget_tokens=None,
        max_tokens=256,
        description="直接回答,无推理"
    ),
    TaskComplexity.SIMPLE: ThinkingConfig(
        model="claude-haiku-4-5-20251001",
        budget_tokens=None,
        max_tokens=512,
        description="快速推理,无需扩展思考"
    ),
    TaskComplexity.MEDIUM: ThinkingConfig(
        model="claude-sonnet-4-6",
        budget_tokens=3000,
        max_tokens=6000,
        description="中等思考深度"
    ),
    TaskComplexity.COMPLEX: ThinkingConfig(
        model="claude-sonnet-4-6",
        budget_tokens=8000,
        max_tokens=12000,
        description="深度思考"
    ),
    TaskComplexity.EXPERT: ThinkingConfig(
        model="claude-opus-4-6",
        budget_tokens=20000,
        max_tokens=28000,
        description="全力推理"
    )
}

class AdaptiveThinkingRouter:
    """自适应思维深度路由器"""
    
    def __init__(
        self,
        client: anthropic.Anthropic,
        classifier: str = "llm",  # "llm" 或 "heuristic"
        cost_limit_per_request: float = 0.10  # USD
    ):
        self.client = client
        self.classifier = classifier
        self.cost_limit = cost_limit_per_request
    
    def classify(self, task: str) -> TaskComplexity:
        if self.classifier == "heuristic":
            return ComplexityHeuristics.classify(task)
        else:
            return classify_complexity(task)
    
    def get_config(self, complexity: TaskComplexity) -> ThinkingConfig:
        cfg = THINKING_TIERS[complexity]
        
        # 成本检查:如果预计超出预算,降级
        if cfg.budget_tokens:
            estimated_cost = cfg.budget_tokens * 75 / 1_000_000  # 按输出价格估算
            if estimated_cost > self.cost_limit:
                # 降一级
                lower_complexity = TaskComplexity(
                    list(TaskComplexity)[
                        list(TaskComplexity).index(complexity) - 1
                    ].value
                )
                return THINKING_TIERS.get(lower_complexity, cfg)
        
        return cfg
    
    def solve(
        self,
        task: str,
        system: str = "",
        force_complexity: Optional[TaskComplexity] = None
    ) -> dict:
        """自适应解决任务,返回答案和元数据"""
        
        complexity = force_complexity or self.classify(task)
        cfg = self.get_config(complexity)
        
        kwargs = {
            "model": cfg.model,
            "max_tokens": cfg.max_tokens,
            "messages": [{"role": "user", "content": task}]
        }
        
        if system:
            kwargs["system"] = system
        
        if cfg.budget_tokens:
            kwargs["thinking"] = {
                "type": "enabled",
                "budget_tokens": cfg.budget_tokens
            }
        
        response = self.client.messages.create(**kwargs)
        
        thinking_content = ""
        answer = ""
        
        for block in response.content:
            if block.type == "thinking":
                thinking_content = block.thinking
            elif block.type == "text":
                answer = block.text
        
        return {
            "answer": answer,
            "complexity": complexity.value,
            "model": cfg.model,
            "description": cfg.description,
            "thinking_used": bool(thinking_content),
            "thinking_chars": len(thinking_content),
            "usage": {
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens
            }
        }

# 使用示例
router = AdaptiveThinkingRouter(client, classifier="heuristic")

test_questions = [
    "什么是 HTTP?",
    "解释 REST API 的设计原则",
    "比较 GraphQL 和 REST,什么时候该用哪个?",
    "设计一个支持亿级用户的消息系统架构"
]

for q in test_questions:
    result = router.solve(q)
    print(f"\n问题: {q[:50]}")
    print(f"复杂度: {result['complexity']} | 模型: {result['model']}")
    print(f"使用思考: {result['thinking_used']} | 输出: {result['usage']['output_tokens']} tokens")

16.4 质量验证与重试机制

对于关键任务,可以实现质量检测后动态提升思考深度:

from typing import Callable

def solve_with_quality_gate(
    client: anthropic.Anthropic,
    task: str,
    quality_checker: Callable[[str], float],  # 返回 0-1 的质量分
    min_quality: float = 0.8,
    max_attempts: int = 3
) -> dict:
    """
    带质量门控的自适应推理:
    - 先用低成本模式尝试
    - 如果质量不够,自动提升推理深度重试
    """
    
    budgets = [0, 3000, 10000]  # 递增的思考预算
    models = [
        "claude-haiku-4-5-20251001",
        "claude-sonnet-4-6",
        "claude-opus-4-6"
    ]
    
    for attempt in range(min(max_attempts, len(budgets))):
        budget = budgets[attempt]
        model = models[attempt]
        
        kwargs = {
            "model": model,
            "max_tokens": budget + 2048 if budget > 0 else 2048,
            "messages": [{"role": "user", "content": task}]
        }
        
        if budget > 0:
            kwargs["thinking"] = {"type": "enabled", "budget_tokens": budget}
        
        response = client.messages.create(**kwargs)
        answer = next(
            (b.text for b in response.content if b.type == "text"), ""
        )
        
        # 评估质量
        quality = quality_checker(answer)
        
        if quality >= min_quality:
            return {
                "answer": answer,
                "attempts": attempt + 1,
                "final_model": model,
                "budget_used": budget,
                "quality_score": quality
            }
        
        print(f"第{attempt+1}次尝试质量不足({quality:.2f} < {min_quality}),提升推理深度...")
    
    # 返回最后一次的结果(即使质量不满足)
    return {
        "answer": answer,
        "attempts": max_attempts,
        "final_model": models[max_attempts - 1],
        "budget_used": budgets[max_attempts - 1],
        "quality_score": quality,
        "warning": "未能达到质量要求"
    }

# 示例:用 LLM 评估答案完整性作为质量检测器
def llm_quality_checker(answer: str) -> float:
    """用 Haiku 快速评分答案质量"""
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=10,
        messages=[{
            "role": "user",
            "content": f"这个技术回答的质量评分(0.0-1.0,只输出数字):\n\n{answer[:500]}"
        }]
    )
    try:
        return float(response.content[0].text.strip())
    except (ValueError, IndexError):
        return 0.5

16.5 多阶段推理管道

对于极其复杂的问题,可以设计多阶段推理管道:

def multi_stage_reasoning(
    client: anthropic.Anthropic,
    complex_problem: str
) -> dict:
    """
    多阶段推理管道:
    1. 分解:将复杂问题分解为子问题
    2. 分析:对每个子问题进行独立推理
    3. 综合:整合各子问题答案得出最终结论
    """
    
    # 阶段 1:问题分解
    decompose_response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2000,
        thinking={"type": "enabled", "budget_tokens": 2000},
        messages=[{
            "role": "user",
            "content": f"""将以下复杂问题分解为3-5个独立的子问题,每个子问题可以单独分析。
            
问题:{complex_problem}

以 JSON 格式输出:{{"subproblems": ["子问题1", "子问题2", ...]}}"""
        }, {"role": "assistant", "content": "{"}]
    )
    
    import json
    try:
        decomposition = json.loads("{" + decompose_response.content[-1].text)
        subproblems = decomposition.get("subproblems", [complex_problem])
    except:
        subproblems = [complex_problem]
    
    # 阶段 2:并行分析子问题
    sub_answers = []
    
    for i, subproblem in enumerate(subproblems):
        print(f"分析子问题 {i+1}/{len(subproblems)}: {subproblem[:50]}...")
        
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=8000,
            thinking={"type": "enabled", "budget_tokens": 5000},
            messages=[{"role": "user", "content": subproblem}]
        )
        
        answer = next(
            (b.text for b in response.content if b.type == "text"), ""
        )
        sub_answers.append({
            "subproblem": subproblem,
            "analysis": answer
        })
    
    # 阶段 3:综合最终答案
    synthesis_input = f"原始问题:{complex_problem}\n\n"
    for item in sub_answers:
        synthesis_input += f"子问题:{item['subproblem']}\n分析:{item['analysis']}\n\n"
    synthesis_input += "请综合以上分析,给出完整的最终答案。"
    
    final_response = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=16000,
        thinking={"type": "enabled", "budget_tokens": 10000},
        messages=[{"role": "user", "content": synthesis_input}]
    )
    
    final_answer = next(
        (b.text for b in final_response.content if b.type == "text"), ""
    )
    
    return {
        "subproblems": subproblems,
        "sub_analyses": sub_answers,
        "final_answer": final_answer,
        "stages": 3
    }

16.6 实时自适应:在对话中动态调整

class AdaptiveConversationManager:
    """
    在多轮对话中动态调整思考深度
    - 跟踪问题难度趋势
    - 自动升降级
    - 成本预算控制
    """
    
    def __init__(
        self,
        client: anthropic.Anthropic,
        total_budget_usd: float = 1.0
    ):
        self.client = client
        self.total_budget = total_budget_usd
        self.spent_budget = 0.0
        self.conversation_history = []
        self.turn_complexities = []
    
    def add_turn(self, user_message: str) -> str:
        """处理一轮对话,自动选择推理深度"""
        
        # 检查剩余预算
        remaining = self.total_budget - self.spent_budget
        
        if remaining < 0.001:
            # 预算耗尽,切换到最便宜模式
            complexity = TaskComplexity.TRIVIAL
        else:
            # 评估复杂度
            complexity = ComplexityHeuristics.classify(user_message)
            
            # 趋势调整:如果最近几轮都很复杂,提升默认级别
            if len(self.turn_complexities) >= 3:
                recent = self.turn_complexities[-3:]
                complex_count = sum(
                    1 for c in recent
                    if c in (TaskComplexity.COMPLEX, TaskComplexity.EXPERT)
                )
                if complex_count >= 2 and complexity == TaskComplexity.MEDIUM:
                    complexity = TaskComplexity.COMPLEX  # 升级
        
        self.turn_complexities.append(complexity)
        cfg = THINKING_TIERS[complexity]
        
        # 构建请求
        self.conversation_history.append({
            "role": "user", "content": user_message
        })
        
        kwargs = {
            "model": cfg.model,
            "max_tokens": cfg.max_tokens,
            "messages": self.conversation_history
        }
        
        if cfg.budget_tokens:
            kwargs["thinking"] = {
                "type": "enabled",
                "budget_tokens": cfg.budget_tokens
            }
        
        response = self.client.messages.create(**kwargs)
        
        # 更新成本跟踪(opus 价格近似)
        cost = (
            response.usage.input_tokens * 15 +
            response.usage.output_tokens * 75
        ) / 1_000_000
        self.spent_budget += cost
        
        # 提取并保存答案
        answer = next(
            (b.text for b in response.content if b.type == "text"), ""
        )
        
        # 保存助手回复(仅保留文本,不保留 thinking 以控制历史长度)
        self.conversation_history.append({
            "role": "assistant", "content": answer
        })
        
        return answer
    
    def get_stats(self) -> dict:
        return {
            "turns": len(self.turn_complexities),
            "complexity_distribution": {
                c.value: self.turn_complexities.count(c)
                for c in TaskComplexity
                if self.turn_complexities.count(c) > 0
            },
            "spent_usd": round(self.spent_budget, 5),
            "remaining_usd": round(self.total_budget - self.spent_budget, 5)
        }

16.7 Adaptive Thinking 与 Streaming 结合

import asyncio
import anthropic

async def adaptive_stream(
    client: anthropic.AsyncAnthropic,
    task: str,
    show_thinking: bool = False
) -> str:
    """异步流式自适应推理"""
    
    # 同步评估复杂度(可以改为异步)
    complexity = ComplexityHeuristics.classify(task)
    cfg = THINKING_TIERS[complexity]
    
    kwargs = {
        "model": cfg.model,
        "max_tokens": cfg.max_tokens,
        "messages": [{"role": "user", "content": task}]
    }
    
    if cfg.budget_tokens:
        kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}
    
    print(f"[复杂度: {complexity.value}, 模型: {cfg.model}]")
    
    answer_parts = []
    
    async with client.messages.stream(**kwargs) as stream:
        async for event in stream:
            if event.type == "content_block_delta":
                if event.delta.type == "thinking_delta" and show_thinking:
                    print(event.delta.thinking, end="", flush=True)
                elif event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                    answer_parts.append(event.delta.text)
    
    return "".join(answer_parts)

# 并行处理多个任务,每个任务独立自适应
async def batch_adaptive(tasks: list[str]) -> list[str]:
    client = anthropic.AsyncAnthropic()
    return await asyncio.gather(*[adaptive_stream(client, t) for t in tasks])

16.8 生产部署的监控与调优

指标追踪

import time
from collections import defaultdict
from dataclasses import dataclass, field

@dataclass
class ThinkingMetrics:
    """追踪自适应思考系统的运行指标"""
    
    requests_by_complexity: dict = field(default_factory=lambda: defaultdict(int))
    total_cost_by_complexity: dict = field(default_factory=lambda: defaultdict(float))
    avg_latency_by_complexity: dict = field(default_factory=lambda: defaultdict(list))
    quality_scores: list = field(default_factory=list)
    
    def record(
        self,
        complexity: str,
        cost_usd: float,
        latency_s: float,
        quality: float = None
    ):
        self.requests_by_complexity[complexity] += 1
        self.total_cost_by_complexity[complexity] += cost_usd
        self.avg_latency_by_complexity[complexity].append(latency_s)
        if quality is not None:
            self.quality_scores.append(quality)
    
    def summary(self) -> dict:
        return {
            "total_requests": sum(self.requests_by_complexity.values()),
            "total_cost_usd": round(sum(self.total_cost_by_complexity.values()), 4),
            "by_complexity": {
                c: {
                    "count": self.requests_by_complexity[c],
                    "total_cost": round(self.total_cost_by_complexity[c], 5),
                    "avg_latency_s": round(
                        sum(self.avg_latency_by_complexity[c]) /
                        len(self.avg_latency_by_complexity[c]), 2
                    ) if self.avg_latency_by_complexity[c] else 0
                }
                for c in self.requests_by_complexity
            },
            "avg_quality": round(
                sum(self.quality_scores) / len(self.quality_scores), 3
            ) if self.quality_scores else None
        }

调优建议

根据生产指标,持续优化分类器的准确性:

def tune_classifier_thresholds(
    historical_data: list[dict],  # [{"task": str, "complexity": str, "quality": float}]
    target_quality: float = 0.85
) -> dict:
    """
    基于历史数据分析哪些任务分类错误,
    返回调整建议
    """
    misclassified = []
    
    for item in historical_data:
        if item["quality"] < target_quality:
            misclassified.append(item)
    
    # 分析哪些类别的任务质量最差
    quality_by_complexity = defaultdict(list)
    for item in historical_data:
        quality_by_complexity[item["complexity"]].append(item["quality"])
    
    avg_quality = {
        c: sum(qs) / len(qs)
        for c, qs in quality_by_complexity.items()
    }
    
    recommendations = []
    for complexity, avg_q in avg_quality.items():
        if avg_q < target_quality:
            recommendations.append(
                f"复杂度 '{complexity}' 平均质量 {avg_q:.2f},建议提升到更高思考预算层级"
            )
    
    return {
        "avg_quality_by_complexity": avg_quality,
        "misclassification_rate": len(misclassified) / len(historical_data),
        "recommendations": recommendations
    }

小结

Adaptive Thinking 将 Extended Thinking 从单一的开/关功能升华为精细化的成本-质量管理系统:

  1. 复杂度分级路由:轻量级任务用 Haiku 零思考,复杂任务用 Opus 全力推理
  2. 两种分类器:LLM 分类器(精确但有额外延迟)vs 启发式规则(零延迟但需要人工维护)
  3. 质量门控重试:先低成本尝试,质量不达标自动升级
  4. 多阶段管道:分解→并行分析→综合,适用于极复杂问题
  5. 实时预算控制:在对话中跟踪总成本,防止超支
  6. 生产监控:追踪各复杂度层级的成本、延迟和质量,持续调优分类阈值

Adaptive Thinking 的终极目标是:每一分推理算力都花在刀刃上

本章评分
4.5  / 5  (23 评分)

💬 留言讨论