第 8 章

多轮对话设计:上下文裁剪、状态管理与 200K Window 最优利用

第八章:多轮对话管理:历史压缩、上下文剪枝与会话持久化

8.1 多轮对话的核心挑战

Claude API 本身是无状态的——每次请求都是独立的 HTTP 调用。"对话"的存在是客户端的幻觉:你把对话历史作为 messages 参数传给 API,模型才能"记住"之前说过的话。

这个设计有深刻的工程含义:

对话的 token 消耗模式:

第 1 轮:
  输入 = system(500) + user_1(100) = 600 tokens
  
第 5 轮:
  输入 = system(500) + user_1(100) + assistant_1(200) + ... + user_5(100) = 2500+ tokens

第 20 轮:
  输入 = system(500) + 历史对话(8000+) + user_20(100) = 8600+ tokens

第 50 轮(如不压缩):
  输入可能超过 20K tokens → 接近 200K 窗口 → 成本和延迟暴增

多轮对话管理需要解决三个问题:

  1. 上下文长度控制:如何在历史无限增长的情况下保持合理的 token 预算
  2. 关键信息保留:压缩历史时不能丢失对当前任务重要的信息
  3. 会话持久化:如何在服务重启、用户跨设备访问等场景下恢复对话

8.2 对话历史的数据模型

在设计多轮对话系统之前,先建立清晰的数据模型:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any

class MessageRole(str, Enum):
    USER = "user"
    ASSISTANT = "assistant"

@dataclass
class Message:
    role: MessageRole
    content: str | list[dict]  # 字符串或内容块数组
    timestamp: datetime = field(default_factory=datetime.now)
    token_count: int = 0  # 可选:缓存的 token 计数
    metadata: dict = field(default_factory=dict)
    
    def to_api_format(self) -> dict:
        """转换为 API 请求格式"""
        return {
            "role": self.role.value,
            "content": self.content
        }

@dataclass
class Conversation:
    session_id: str
    system_prompt: str
    messages: list[Message] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    last_updated: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)
    
    def add_message(self, role: MessageRole, content: str | list[dict], **kwargs):
        msg = Message(role=role, content=content, **kwargs)
        self.messages.append(msg)
        self.last_updated = datetime.now()
        return msg
    
    def to_api_messages(self) -> list[dict]:
        """转换为 API 请求的 messages 数组"""
        return [msg.to_api_format() for msg in self.messages]
    
    def estimated_tokens(self) -> int:
        """粗略估算当前对话的 token 消耗"""
        total_chars = len(self.system_prompt)
        for msg in self.messages:
            if isinstance(msg.content, str):
                total_chars += len(msg.content)
            elif isinstance(msg.content, list):
                total_chars += sum(
                    len(block.get("text", ""))
                    for block in msg.content
                    if isinstance(block, dict)
                )
        return total_chars // 4  # 英文近似值

8.3 上下文窗口管理策略

策略一:滑动窗口(保留最近 N 轮)

最简单的策略:只保留最近 N 轮对话,丢弃更早的消息:

import anthropic
from typing import Optional

client = anthropic.Anthropic()

class SlidingWindowConversation:
    """
    保留最近 N 轮对话的滑动窗口管理器
    """
    
    def __init__(
        self,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        max_turns: int = 10,  # 保留最近 10 轮(20 条消息)
        max_tokens_per_reply: int = 1024
    ):
        self.system = system_prompt
        self.model = model
        self.max_turns = max_turns
        self.max_tokens = max_tokens_per_reply
        self._history: list[dict] = []  # 完整历史(内存中)
        self._display_history: list[dict] = []  # 发给 API 的历史
    
    def chat(self, user_message: str) -> str:
        # 添加用户消息
        user_turn = {"role": "user", "content": user_message}
        self._history.append(user_turn)
        
        # 构建发给 API 的消息(只取最近 N 轮)
        recent_messages = self._history[-(self.max_turns * 2):]  # 每轮 2 条消息
        
        response = client.messages.create(
            model=self.model,
            max_tokens=self.max_tokens,
            system=self.system,
            messages=recent_messages
        )
        
        assistant_reply = response.content[0].text
        
        # 将助手响应加入历史
        assistant_turn = {"role": "assistant", "content": assistant_reply}
        self._history.append(assistant_turn)
        
        return assistant_reply
    
    def get_context_stats(self) -> dict:
        return {
            "total_turns": len(self._history) // 2,
            "active_turns": self.max_turns,
            "total_messages_stored": len(self._history),
            "messages_sent_to_api": min(len(self._history), self.max_turns * 2)
        }

适用场景:闲聊机器人、日常问答助手(早期信息与当前问题无关)

缺点:如果第 1 轮用户提供了关键背景信息(如"我是一名 Python 开发者,在做 SaaS 项目"),第 12 轮时这个信息会被丢弃。

策略二:摘要压缩(总结早期历史)

在滑动窗口的基础上,对被丢弃的历史进行摘要,注入为上下文:

class SummarizingConversation:
    """
    当历史超出 token 预算时,自动压缩早期历史
    """
    
    def __init__(
        self,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        max_history_tokens: int = 8000,
        summary_model: str = "claude-haiku-4-5-20251001"
    ):
        self.system = system_prompt
        self.model = model
        self.max_history_tokens = max_history_tokens
        self.summary_model = summary_model
        
        self.messages: list[dict] = []
        self.summary: Optional[str] = None  # 被压缩的历史摘要
        self._summary_token_count: int = 0
    
    def _estimate_tokens(self, messages: list[dict]) -> int:
        """估算消息列表的 token 数"""
        return sum(
            len(m.get("content", "") if isinstance(m.get("content"), str) 
                else str(m.get("content", ""))) // 4
            for m in messages
        )
    
    def _compress_history(self):
        """将最早的 50% 消息压缩为摘要"""
        if len(self.messages) < 4:
            return
        
        # 取前一半消息做摘要
        split_point = len(self.messages) // 2
        old_messages = self.messages[:split_point]
        self.messages = self.messages[split_point:]
        
        # 构建摘要
        existing_summary = f"[之前的对话摘要]\n{self.summary}\n\n" if self.summary else ""
        old_text = "\n".join(
            f"{'用户' if m['role'] == 'user' else 'Claude'}: "
            f"{str(m.get('content', ''))[:300]}"
            for m in old_messages
        )
        
        summary_response = client.messages.create(
            model=self.summary_model,
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": f"""{existing_summary}请用 3-5 句话概括以下对话中的关键信息、
决定事项和用户提供的重要背景:

{old_text}"""
            }]
        )
        
        self.summary = summary_response.content[0].text
        self._summary_token_count = len(self.summary) // 4
    
    def _build_api_messages(self) -> list[dict]:
        """构建发给 API 的消息列表(包含摘要注入)"""
        messages = list(self.messages)
        
        if self.summary:
            # 在最早的消息前注入摘要(作为用户提问)
            summary_injection = [
                {
                    "role": "user",
                    "content": f"[对话上下文摘要]\n{self.summary}\n\n"
                               f"(以上是之前对话的摘要,请在回答时参考这些背景信息。)"
                },
                {
                    "role": "assistant",
                    "content": "好的,我已了解之前的对话背景,请继续。"
                }
            ]
            messages = summary_injection + messages
        
        return messages
    
    def chat(self, user_message: str) -> str:
        # 检查是否需要压缩
        current_tokens = self._estimate_tokens(self.messages)
        if current_tokens > self.max_history_tokens:
            self._compress_history()
        
        # 添加用户消息
        self.messages.append({"role": "user", "content": user_message})
        
        # 构建 API 请求
        api_messages = self._build_api_messages()
        
        response = client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=self.system,
            messages=api_messages
        )
        
        assistant_reply = response.content[0].text
        self.messages.append({"role": "assistant", "content": assistant_reply})
        
        return assistant_reply

策略三:关键信息提取(结构化记忆)

对于长期对话,识别并持久化关键信息,而不是压缩自然语言:

import json

class StructuredMemoryConversation:
    """
    使用结构化记忆代替自然语言摘要
    更精确地保留用户偏好、决策和关键背景
    """
    
    def __init__(self, system_prompt: str, model: str = "claude-sonnet-4-6"):
        self.system = system_prompt
        self.model = model
        self.messages: list[dict] = []
        self.memory: dict = {
            "user_profile": {},      # 用户基本信息
            "preferences": {},        # 用户偏好
            "decisions": [],          # 做出的决定
            "context": {},           # 任务上下文
            "unresolved": []         # 未解决的问题
        }
    
    def _extract_memory_updates(self, conversation_turn: tuple[str, str]) -> dict:
        """
        从对话轮次中提取需要记忆的信息
        返回内存更新字典(可能是空的)
        """
        user_msg, assistant_msg = conversation_turn
        
        extract_prompt = f"""从以下对话轮次中提取需要长期记住的关键信息。
只提取真正重要的信息(用户说明了什么、做了什么决定、有什么偏好等)。
如果没有值得记忆的信息,返回空对象 {{}}.

返回 JSON 格式:
{{
  "user_profile": {{...}},    // 用户身份、背景信息(如有)
  "preferences": {{...}},     // 用户偏好(如有)
  "decisions": ["..."],       // 做出的决定(如有)
  "context": {{...}},         // 任务上下文更新(如有)
  "unresolved": ["..."]       // 遗留问题(如有)
}}

用户:{user_msg[:500]}
助手:{assistant_msg[:500]}"""
        
        response = client.messages.create(
            model="claude-haiku-4-5-20251001",  # 便宜的模型做提取
            max_tokens=400,
            messages=[{"role": "user", "content": extract_prompt}]
        )
        
        try:
            import re
            json_text = response.content[0].text.strip()
            # 提取 JSON 对象
            match = re.search(r'\{.*\}', json_text, re.DOTALL)
            if match:
                return json.loads(match.group())
        except (json.JSONDecodeError, Exception):
            pass
        
        return {}
    
    def _update_memory(self, updates: dict):
        """合并内存更新"""
        for key in ["user_profile", "preferences", "context"]:
            if key in updates and updates[key]:
                self.memory[key].update(updates[key])
        
        for key in ["decisions", "unresolved"]:
            if key in updates and updates[key]:
                self.memory[key].extend(updates[key])
                # 去重
                self.memory[key] = list(dict.fromkeys(self.memory[key]))
    
    def _memory_to_context(self) -> str:
        """将结构化记忆转换为上下文文本"""
        if not any(self.memory.values()):
            return ""
        
        parts = ["[对话记忆]"]
        
        if self.memory["user_profile"]:
            parts.append(f"用户信息:{json.dumps(self.memory['user_profile'], ensure_ascii=False)}")
        
        if self.memory["preferences"]:
            parts.append(f"用户偏好:{json.dumps(self.memory['preferences'], ensure_ascii=False)}")
        
        if self.memory["decisions"]:
            decisions_str = ";".join(self.memory["decisions"][-5:])  # 最近 5 个
            parts.append(f"已做决定:{decisions_str}")
        
        if self.memory["context"]:
            parts.append(f"任务上下文:{json.dumps(self.memory['context'], ensure_ascii=False)}")
        
        return "\n".join(parts)
    
    def chat(self, user_message: str) -> str:
        # 构建带记忆上下文的消息
        memory_context = self._memory_to_context()
        
        # 保留最近 5 轮对话
        recent_messages = self.messages[-10:]
        
        if memory_context and not recent_messages:
            # 第一条消息注入记忆上下文
            recent_messages = [
                {"role": "user", "content": f"{memory_context}\n\n{user_message}"}
            ]
        else:
            recent_messages.append({"role": "user", "content": user_message})
        
        response = client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=self.system,
            messages=recent_messages
        )
        
        assistant_reply = response.content[0].text
        
        # 更新完整历史
        self.messages.append({"role": "user", "content": user_message})
        self.messages.append({"role": "assistant", "content": assistant_reply})
        
        # 异步更新记忆(实际生产中应在后台执行)
        updates = self._extract_memory_updates((user_message, assistant_reply))
        if updates:
            self._update_memory(updates)
        
        return assistant_reply

8.4 会话持久化设计

数据库设计(PostgreSQL 示例)

-- 会话表
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id VARCHAR(255) NOT NULL,
    title VARCHAR(500),
    system_prompt TEXT NOT NULL,
    model VARCHAR(100) NOT NULL DEFAULT 'claude-sonnet-4-6',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    metadata JSONB DEFAULT '{}',
    is_archived BOOLEAN DEFAULT FALSE
);

-- 消息表
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant')),
    content TEXT NOT NULL,          -- 文本内容
    content_blocks JSONB,           -- 多模态内容块(可选)
    input_tokens INT DEFAULT 0,
    output_tokens INT DEFAULT 0,
    model_used VARCHAR(100),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    metadata JSONB DEFAULT '{}'
);

-- 对话摘要表(存储压缩后的历史摘要)
CREATE TABLE conversation_summaries (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    summary TEXT NOT NULL,
    covers_up_to_message_id UUID REFERENCES messages(id),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 索引
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_messages_conversation_id ON messages(conversation_id);
CREATE INDEX idx_messages_created_at ON messages(conversation_id, created_at);

Python 持久化层实现

import asyncpg
import asyncio
from uuid import UUID, uuid4

class ConversationStore:
    """
    基于 PostgreSQL 的对话持久化层
    """
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self._pool = None
    
    async def initialize(self):
        self._pool = await asyncpg.create_pool(self.dsn)
    
    async def create_conversation(
        self,
        user_id: str,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        title: str = None,
        metadata: dict = None
    ) -> str:
        """创建新会话,返回 session_id"""
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                """INSERT INTO conversations 
                   (user_id, system_prompt, model, title, metadata)
                   VALUES ($1, $2, $3, $4, $5)
                   RETURNING id""",
                user_id, system_prompt, model,
                title or "新对话",
                asyncpg.Record({'metadata': metadata or {}})
            )
            return str(row['id'])
    
    async def append_message(
        self,
        conversation_id: str,
        role: str,
        content: str,
        input_tokens: int = 0,
        output_tokens: int = 0,
        model_used: str = None
    ) -> str:
        """追加消息,返回消息 ID"""
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                """INSERT INTO messages 
                   (conversation_id, role, content, input_tokens, output_tokens, model_used)
                   VALUES ($1, $2, $3, $4, $5, $6)
                   RETURNING id""",
                conversation_id, role, content,
                input_tokens, output_tokens, model_used
            )
            
            # 更新会话的最后更新时间
            await conn.execute(
                "UPDATE conversations SET last_updated = NOW() WHERE id = $1",
                conversation_id
            )
            
            return str(row['id'])
    
    async def get_recent_messages(
        self,
        conversation_id: str,
        limit: int = 20
    ) -> list[dict]:
        """获取最近 N 条消息"""
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """SELECT role, content, created_at, input_tokens, output_tokens
                   FROM messages
                   WHERE conversation_id = $1
                   ORDER BY created_at DESC
                   LIMIT $2""",
                conversation_id, limit
            )
            
            # 反转顺序(最旧的在前)
            return [
                {"role": row['role'], "content": row['content']}
                for row in reversed(rows)
            ]
    
    async def save_summary(
        self,
        conversation_id: str,
        summary: str,
        covers_up_to_message_id: str = None
    ):
        """保存对话摘要"""
        async with self._pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO conversation_summaries 
                   (conversation_id, summary, covers_up_to_message_id)
                   VALUES ($1, $2, $3)""",
                conversation_id, summary, 
                UUID(covers_up_to_message_id) if covers_up_to_message_id else None
            )
    
    async def get_latest_summary(self, conversation_id: str) -> str | None:
        """获取最新摘要"""
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                """SELECT summary FROM conversation_summaries
                   WHERE conversation_id = $1
                   ORDER BY created_at DESC
                   LIMIT 1""",
                conversation_id
            )
            return row['summary'] if row else None

8.5 完整的持久化对话助手

将以上所有组件整合为一个完整的对话助手:

import anthropic
from typing import Optional

class PersistentConversationAssistant:
    """
    完整的持久化多轮对话助手,包含:
    - 自动历史压缩
    - 数据库持久化
    - Token 成本追踪
    """
    
    def __init__(
        self,
        store: ConversationStore,
        model: str = "claude-sonnet-4-6",
        max_context_tokens: int = 12000,  # 发给 API 的最大 token 数(历史部分)
        summary_model: str = "claude-haiku-4-5-20251001"
    ):
        self.store = store
        self.client = anthropic.Anthropic()
        self.model = model
        self.max_context_tokens = max_context_tokens
        self.summary_model = summary_model
    
    async def start_conversation(
        self,
        user_id: str,
        system_prompt: str,
        **kwargs
    ) -> str:
        """开始新对话,返回 session_id"""
        return await self.store.create_conversation(
            user_id=user_id,
            system_prompt=system_prompt,
            model=self.model,
            **kwargs
        )
    
    async def chat(
        self,
        session_id: str,
        user_message: str,
        system_prompt: str  # 传入 system prompt(或从数据库读取)
    ) -> dict:
        """
        发送消息并获取回复
        返回:{
            "reply": str,
            "input_tokens": int,
            "output_tokens": int,
            "context_truncated": bool
        }
        """
        # 1. 获取最近历史
        recent_messages = await self.store.get_recent_messages(session_id, limit=30)
        
        # 2. 检查 token 数量,必要时注入摘要
        context_messages = await self._build_context(session_id, recent_messages)
        
        # 3. 添加当前用户消息
        context_messages.append({"role": "user", "content": user_message})
        
        # 4. 调用 API
        response = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=system_prompt,
            messages=context_messages
        )
        
        reply = response.content[0].text
        
        # 5. 持久化用户消息和助手回复
        await self.store.append_message(
            session_id, "user", user_message,
            input_tokens=response.usage.input_tokens
        )
        await self.store.append_message(
            session_id, "assistant", reply,
            output_tokens=response.usage.output_tokens,
            model_used=response.model
        )
        
        # 6. 检查是否需要后台生成摘要
        if len(recent_messages) > 20:
            # 实际生产中应该用后台任务执行
            asyncio.create_task(
                self._generate_summary_if_needed(session_id, recent_messages)
            )
        
        return {
            "reply": reply,
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
            "context_truncated": len(context_messages) < len(recent_messages) + 1
        }
    
    async def _build_context(
        self,
        session_id: str,
        recent_messages: list[dict]
    ) -> list[dict]:
        """构建发给 API 的上下文消息,包含摘要注入"""
        # 估算当前历史的 token 数
        estimated = sum(len(m.get("content", "")) // 4 for m in recent_messages)
        
        if estimated <= self.max_context_tokens:
            return list(recent_messages)
        
        # 需要截断:获取摘要 + 最近消息
        summary = await self.store.get_latest_summary(session_id)
        
        # 取最近 10 条消息
        trimmed = recent_messages[-10:]
        
        if summary:
            # 注入摘要
            return [
                {
                    "role": "user",
                    "content": f"[之前对话摘要]\n{summary}\n\n请基于以上背景继续对话。"
                },
                {
                    "role": "assistant",
                    "content": "我已了解之前的对话背景。"
                },
                *trimmed
            ]
        
        return trimmed
    
    async def _generate_summary_if_needed(
        self,
        session_id: str,
        messages: list[dict]
    ):
        """在后台生成对话摘要"""
        if len(messages) < 10:
            return
        
        # 取前 2/3 的历史生成摘要
        to_summarize = messages[:len(messages) * 2 // 3]
        conversation_text = "\n".join(
            f"{'用户' if m['role'] == 'user' else 'Claude'}: {m['content'][:300]}"
            for m in to_summarize
        )
        
        summary_response = self.client.messages.create(
            model=self.summary_model,
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""请用 5-8 句话概括以下对话的关键内容,
包括:主题、用户提供的背景信息、做出的决定、待解决的问题。

{conversation_text}"""
            }]
        )
        
        await self.store.save_summary(session_id, summary_response.content[0].text)

8.6 Token 成本追踪与预算控制

from collections import defaultdict
from datetime import date

class CostTracker:
    """
    按用户和会话追踪 API 成本
    """
    
    PRICING = {
        "claude-opus-4-6": {"input": 15.0, "output": 75.0},
        "claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
        "claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
    }
    
    def __init__(self):
        self._usage: dict[str, dict] = defaultdict(lambda: {
            "input_tokens": 0,
            "output_tokens": 0,
            "cost_usd": 0.0,
            "requests": 0
        })
    
    def record(
        self,
        user_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int
    ):
        prices = self.PRICING.get(model, {"input": 3.0, "output": 15.0})
        cost = (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000
        
        key = f"{user_id}:{date.today().isoformat()}"
        self._usage[key]["input_tokens"] += input_tokens
        self._usage[key]["output_tokens"] += output_tokens
        self._usage[key]["cost_usd"] += cost
        self._usage[key]["requests"] += 1
    
    def get_daily_cost(self, user_id: str) -> float:
        key = f"{user_id}:{date.today().isoformat()}"
        return self._usage[key]["cost_usd"]
    
    def check_budget(self, user_id: str, daily_limit: float) -> bool:
        """检查用户是否超出每日预算"""
        return self.get_daily_cost(user_id) < daily_limit

小结

多轮对话管理是构建实际 Claude 应用的核心工程问题,不是可以之后再考虑的细节:

  1. API 无状态设计:每次请求都需要传入完整对话历史;随着对话增长,成本和延迟都会线性增加

  2. 三种上下文管理策略

    • 滑动窗口:最简单,适合无需长期记忆的场景
    • 摘要压缩:保留早期信息的语义,适合大多数生产场景
    • 结构化记忆:最精确,适合需要跟踪用户配置、决策的复杂场景
  3. 持久化设计

    • 将 messages 存入数据库(而不是仅在内存中)
    • 为 conversation 和 message 建立独立表
    • 异步生成和存储摘要,避免阻塞主对话流程
  4. 成本追踪:记录每次请求的 token 消耗,按用户和日期统计,实现预算控制

  5. 实践建议

    • 对于简单应用:滑动窗口(最近 10-15 轮)即可
    • 对于客服/助手:摘要压缩 + PostgreSQL 持久化
    • 对于复杂 Agent:结构化记忆 + 向量数据库长期记忆

多轮对话管理的设计决策会贯穿整个系统架构。第一周就做好这个决策,会节省你后来大量的重构工作。

本章评分
4.8  / 5  (65 评分)

💬 留言讨论