第 8 章
多轮对话设计:上下文裁剪、状态管理与 200K Window 最优利用
第八章:多轮对话管理:历史压缩、上下文剪枝与会话持久化
8.1 多轮对话的核心挑战
Claude API 本身是无状态的——每次请求都是独立的 HTTP 调用。"对话"的存在是客户端的幻觉:你把对话历史作为 messages 参数传给 API,模型才能"记住"之前说过的话。
这个设计有深刻的工程含义:
对话的 token 消耗模式:
第 1 轮:
输入 = system(500) + user_1(100) = 600 tokens
第 5 轮:
输入 = system(500) + user_1(100) + assistant_1(200) + ... + user_5(100) = 2500+ tokens
第 20 轮:
输入 = system(500) + 历史对话(8000+) + user_20(100) = 8600+ tokens
第 50 轮(如不压缩):
输入可能超过 20K tokens → 接近 200K 窗口 → 成本和延迟暴增
多轮对话管理需要解决三个问题:
- 上下文长度控制:如何在历史无限增长的情况下保持合理的 token 预算
- 关键信息保留:压缩历史时不能丢失对当前任务重要的信息
- 会话持久化:如何在服务重启、用户跨设备访问等场景下恢复对话
8.2 对话历史的数据模型
在设计多轮对话系统之前,先建立清晰的数据模型:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
class MessageRole(str, Enum):
USER = "user"
ASSISTANT = "assistant"
@dataclass
class Message:
role: MessageRole
content: str | list[dict] # 字符串或内容块数组
timestamp: datetime = field(default_factory=datetime.now)
token_count: int = 0 # 可选:缓存的 token 计数
metadata: dict = field(default_factory=dict)
def to_api_format(self) -> dict:
"""转换为 API 请求格式"""
return {
"role": self.role.value,
"content": self.content
}
@dataclass
class Conversation:
session_id: str
system_prompt: str
messages: list[Message] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
def add_message(self, role: MessageRole, content: str | list[dict], **kwargs):
msg = Message(role=role, content=content, **kwargs)
self.messages.append(msg)
self.last_updated = datetime.now()
return msg
def to_api_messages(self) -> list[dict]:
"""转换为 API 请求的 messages 数组"""
return [msg.to_api_format() for msg in self.messages]
def estimated_tokens(self) -> int:
"""粗略估算当前对话的 token 消耗"""
total_chars = len(self.system_prompt)
for msg in self.messages:
if isinstance(msg.content, str):
total_chars += len(msg.content)
elif isinstance(msg.content, list):
total_chars += sum(
len(block.get("text", ""))
for block in msg.content
if isinstance(block, dict)
)
return total_chars // 4 # 英文近似值
8.3 上下文窗口管理策略
策略一:滑动窗口(保留最近 N 轮)
最简单的策略:只保留最近 N 轮对话,丢弃更早的消息:
import anthropic
from typing import Optional
client = anthropic.Anthropic()
class SlidingWindowConversation:
"""
保留最近 N 轮对话的滑动窗口管理器
"""
def __init__(
self,
system_prompt: str,
model: str = "claude-sonnet-4-6",
max_turns: int = 10, # 保留最近 10 轮(20 条消息)
max_tokens_per_reply: int = 1024
):
self.system = system_prompt
self.model = model
self.max_turns = max_turns
self.max_tokens = max_tokens_per_reply
self._history: list[dict] = [] # 完整历史(内存中)
self._display_history: list[dict] = [] # 发给 API 的历史
def chat(self, user_message: str) -> str:
# 添加用户消息
user_turn = {"role": "user", "content": user_message}
self._history.append(user_turn)
# 构建发给 API 的消息(只取最近 N 轮)
recent_messages = self._history[-(self.max_turns * 2):] # 每轮 2 条消息
response = client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=self.system,
messages=recent_messages
)
assistant_reply = response.content[0].text
# 将助手响应加入历史
assistant_turn = {"role": "assistant", "content": assistant_reply}
self._history.append(assistant_turn)
return assistant_reply
def get_context_stats(self) -> dict:
return {
"total_turns": len(self._history) // 2,
"active_turns": self.max_turns,
"total_messages_stored": len(self._history),
"messages_sent_to_api": min(len(self._history), self.max_turns * 2)
}
适用场景:闲聊机器人、日常问答助手(早期信息与当前问题无关)
缺点:如果第 1 轮用户提供了关键背景信息(如"我是一名 Python 开发者,在做 SaaS 项目"),第 12 轮时这个信息会被丢弃。
策略二:摘要压缩(总结早期历史)
在滑动窗口的基础上,对被丢弃的历史进行摘要,注入为上下文:
class SummarizingConversation:
"""
当历史超出 token 预算时,自动压缩早期历史
"""
def __init__(
self,
system_prompt: str,
model: str = "claude-sonnet-4-6",
max_history_tokens: int = 8000,
summary_model: str = "claude-haiku-4-5-20251001"
):
self.system = system_prompt
self.model = model
self.max_history_tokens = max_history_tokens
self.summary_model = summary_model
self.messages: list[dict] = []
self.summary: Optional[str] = None # 被压缩的历史摘要
self._summary_token_count: int = 0
def _estimate_tokens(self, messages: list[dict]) -> int:
"""估算消息列表的 token 数"""
return sum(
len(m.get("content", "") if isinstance(m.get("content"), str)
else str(m.get("content", ""))) // 4
for m in messages
)
def _compress_history(self):
"""将最早的 50% 消息压缩为摘要"""
if len(self.messages) < 4:
return
# 取前一半消息做摘要
split_point = len(self.messages) // 2
old_messages = self.messages[:split_point]
self.messages = self.messages[split_point:]
# 构建摘要
existing_summary = f"[之前的对话摘要]\n{self.summary}\n\n" if self.summary else ""
old_text = "\n".join(
f"{'用户' if m['role'] == 'user' else 'Claude'}: "
f"{str(m.get('content', ''))[:300]}"
for m in old_messages
)
summary_response = client.messages.create(
model=self.summary_model,
max_tokens=400,
messages=[{
"role": "user",
"content": f"""{existing_summary}请用 3-5 句话概括以下对话中的关键信息、
决定事项和用户提供的重要背景:
{old_text}"""
}]
)
self.summary = summary_response.content[0].text
self._summary_token_count = len(self.summary) // 4
def _build_api_messages(self) -> list[dict]:
"""构建发给 API 的消息列表(包含摘要注入)"""
messages = list(self.messages)
if self.summary:
# 在最早的消息前注入摘要(作为用户提问)
summary_injection = [
{
"role": "user",
"content": f"[对话上下文摘要]\n{self.summary}\n\n"
f"(以上是之前对话的摘要,请在回答时参考这些背景信息。)"
},
{
"role": "assistant",
"content": "好的,我已了解之前的对话背景,请继续。"
}
]
messages = summary_injection + messages
return messages
def chat(self, user_message: str) -> str:
# 检查是否需要压缩
current_tokens = self._estimate_tokens(self.messages)
if current_tokens > self.max_history_tokens:
self._compress_history()
# 添加用户消息
self.messages.append({"role": "user", "content": user_message})
# 构建 API 请求
api_messages = self._build_api_messages()
response = client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system,
messages=api_messages
)
assistant_reply = response.content[0].text
self.messages.append({"role": "assistant", "content": assistant_reply})
return assistant_reply
策略三:关键信息提取(结构化记忆)
对于长期对话,识别并持久化关键信息,而不是压缩自然语言:
import json
class StructuredMemoryConversation:
"""
使用结构化记忆代替自然语言摘要
更精确地保留用户偏好、决策和关键背景
"""
def __init__(self, system_prompt: str, model: str = "claude-sonnet-4-6"):
self.system = system_prompt
self.model = model
self.messages: list[dict] = []
self.memory: dict = {
"user_profile": {}, # 用户基本信息
"preferences": {}, # 用户偏好
"decisions": [], # 做出的决定
"context": {}, # 任务上下文
"unresolved": [] # 未解决的问题
}
def _extract_memory_updates(self, conversation_turn: tuple[str, str]) -> dict:
"""
从对话轮次中提取需要记忆的信息
返回内存更新字典(可能是空的)
"""
user_msg, assistant_msg = conversation_turn
extract_prompt = f"""从以下对话轮次中提取需要长期记住的关键信息。
只提取真正重要的信息(用户说明了什么、做了什么决定、有什么偏好等)。
如果没有值得记忆的信息,返回空对象 {{}}.
返回 JSON 格式:
{{
"user_profile": {{...}}, // 用户身份、背景信息(如有)
"preferences": {{...}}, // 用户偏好(如有)
"decisions": ["..."], // 做出的决定(如有)
"context": {{...}}, // 任务上下文更新(如有)
"unresolved": ["..."] // 遗留问题(如有)
}}
用户:{user_msg[:500]}
助手:{assistant_msg[:500]}"""
response = client.messages.create(
model="claude-haiku-4-5-20251001", # 便宜的模型做提取
max_tokens=400,
messages=[{"role": "user", "content": extract_prompt}]
)
try:
import re
json_text = response.content[0].text.strip()
# 提取 JSON 对象
match = re.search(r'\{.*\}', json_text, re.DOTALL)
if match:
return json.loads(match.group())
except (json.JSONDecodeError, Exception):
pass
return {}
def _update_memory(self, updates: dict):
"""合并内存更新"""
for key in ["user_profile", "preferences", "context"]:
if key in updates and updates[key]:
self.memory[key].update(updates[key])
for key in ["decisions", "unresolved"]:
if key in updates and updates[key]:
self.memory[key].extend(updates[key])
# 去重
self.memory[key] = list(dict.fromkeys(self.memory[key]))
def _memory_to_context(self) -> str:
"""将结构化记忆转换为上下文文本"""
if not any(self.memory.values()):
return ""
parts = ["[对话记忆]"]
if self.memory["user_profile"]:
parts.append(f"用户信息:{json.dumps(self.memory['user_profile'], ensure_ascii=False)}")
if self.memory["preferences"]:
parts.append(f"用户偏好:{json.dumps(self.memory['preferences'], ensure_ascii=False)}")
if self.memory["decisions"]:
decisions_str = ";".join(self.memory["decisions"][-5:]) # 最近 5 个
parts.append(f"已做决定:{decisions_str}")
if self.memory["context"]:
parts.append(f"任务上下文:{json.dumps(self.memory['context'], ensure_ascii=False)}")
return "\n".join(parts)
def chat(self, user_message: str) -> str:
# 构建带记忆上下文的消息
memory_context = self._memory_to_context()
# 保留最近 5 轮对话
recent_messages = self.messages[-10:]
if memory_context and not recent_messages:
# 第一条消息注入记忆上下文
recent_messages = [
{"role": "user", "content": f"{memory_context}\n\n{user_message}"}
]
else:
recent_messages.append({"role": "user", "content": user_message})
response = client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system,
messages=recent_messages
)
assistant_reply = response.content[0].text
# 更新完整历史
self.messages.append({"role": "user", "content": user_message})
self.messages.append({"role": "assistant", "content": assistant_reply})
# 异步更新记忆(实际生产中应在后台执行)
updates = self._extract_memory_updates((user_message, assistant_reply))
if updates:
self._update_memory(updates)
return assistant_reply
8.4 会话持久化设计
数据库设计(PostgreSQL 示例)
-- 会话表
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(255) NOT NULL,
title VARCHAR(500),
system_prompt TEXT NOT NULL,
model VARCHAR(100) NOT NULL DEFAULT 'claude-sonnet-4-6',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
metadata JSONB DEFAULT '{}',
is_archived BOOLEAN DEFAULT FALSE
);
-- 消息表
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL, -- 文本内容
content_blocks JSONB, -- 多模态内容块(可选)
input_tokens INT DEFAULT 0,
output_tokens INT DEFAULT 0,
model_used VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
-- 对话摘要表(存储压缩后的历史摘要)
CREATE TABLE conversation_summaries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
summary TEXT NOT NULL,
covers_up_to_message_id UUID REFERENCES messages(id),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- 索引
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_messages_conversation_id ON messages(conversation_id);
CREATE INDEX idx_messages_created_at ON messages(conversation_id, created_at);
Python 持久化层实现
import asyncpg
import asyncio
from uuid import UUID, uuid4
class ConversationStore:
"""
基于 PostgreSQL 的对话持久化层
"""
def __init__(self, dsn: str):
self.dsn = dsn
self._pool = None
async def initialize(self):
self._pool = await asyncpg.create_pool(self.dsn)
async def create_conversation(
self,
user_id: str,
system_prompt: str,
model: str = "claude-sonnet-4-6",
title: str = None,
metadata: dict = None
) -> str:
"""创建新会话,返回 session_id"""
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""INSERT INTO conversations
(user_id, system_prompt, model, title, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING id""",
user_id, system_prompt, model,
title or "新对话",
asyncpg.Record({'metadata': metadata or {}})
)
return str(row['id'])
async def append_message(
self,
conversation_id: str,
role: str,
content: str,
input_tokens: int = 0,
output_tokens: int = 0,
model_used: str = None
) -> str:
"""追加消息,返回消息 ID"""
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""INSERT INTO messages
(conversation_id, role, content, input_tokens, output_tokens, model_used)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id""",
conversation_id, role, content,
input_tokens, output_tokens, model_used
)
# 更新会话的最后更新时间
await conn.execute(
"UPDATE conversations SET last_updated = NOW() WHERE id = $1",
conversation_id
)
return str(row['id'])
async def get_recent_messages(
self,
conversation_id: str,
limit: int = 20
) -> list[dict]:
"""获取最近 N 条消息"""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT role, content, created_at, input_tokens, output_tokens
FROM messages
WHERE conversation_id = $1
ORDER BY created_at DESC
LIMIT $2""",
conversation_id, limit
)
# 反转顺序(最旧的在前)
return [
{"role": row['role'], "content": row['content']}
for row in reversed(rows)
]
async def save_summary(
self,
conversation_id: str,
summary: str,
covers_up_to_message_id: str = None
):
"""保存对话摘要"""
async with self._pool.acquire() as conn:
await conn.execute(
"""INSERT INTO conversation_summaries
(conversation_id, summary, covers_up_to_message_id)
VALUES ($1, $2, $3)""",
conversation_id, summary,
UUID(covers_up_to_message_id) if covers_up_to_message_id else None
)
async def get_latest_summary(self, conversation_id: str) -> str | None:
"""获取最新摘要"""
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""SELECT summary FROM conversation_summaries
WHERE conversation_id = $1
ORDER BY created_at DESC
LIMIT 1""",
conversation_id
)
return row['summary'] if row else None
8.5 完整的持久化对话助手
将以上所有组件整合为一个完整的对话助手:
import anthropic
from typing import Optional
class PersistentConversationAssistant:
"""
完整的持久化多轮对话助手,包含:
- 自动历史压缩
- 数据库持久化
- Token 成本追踪
"""
def __init__(
self,
store: ConversationStore,
model: str = "claude-sonnet-4-6",
max_context_tokens: int = 12000, # 发给 API 的最大 token 数(历史部分)
summary_model: str = "claude-haiku-4-5-20251001"
):
self.store = store
self.client = anthropic.Anthropic()
self.model = model
self.max_context_tokens = max_context_tokens
self.summary_model = summary_model
async def start_conversation(
self,
user_id: str,
system_prompt: str,
**kwargs
) -> str:
"""开始新对话,返回 session_id"""
return await self.store.create_conversation(
user_id=user_id,
system_prompt=system_prompt,
model=self.model,
**kwargs
)
async def chat(
self,
session_id: str,
user_message: str,
system_prompt: str # 传入 system prompt(或从数据库读取)
) -> dict:
"""
发送消息并获取回复
返回:{
"reply": str,
"input_tokens": int,
"output_tokens": int,
"context_truncated": bool
}
"""
# 1. 获取最近历史
recent_messages = await self.store.get_recent_messages(session_id, limit=30)
# 2. 检查 token 数量,必要时注入摘要
context_messages = await self._build_context(session_id, recent_messages)
# 3. 添加当前用户消息
context_messages.append({"role": "user", "content": user_message})
# 4. 调用 API
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
system=system_prompt,
messages=context_messages
)
reply = response.content[0].text
# 5. 持久化用户消息和助手回复
await self.store.append_message(
session_id, "user", user_message,
input_tokens=response.usage.input_tokens
)
await self.store.append_message(
session_id, "assistant", reply,
output_tokens=response.usage.output_tokens,
model_used=response.model
)
# 6. 检查是否需要后台生成摘要
if len(recent_messages) > 20:
# 实际生产中应该用后台任务执行
asyncio.create_task(
self._generate_summary_if_needed(session_id, recent_messages)
)
return {
"reply": reply,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"context_truncated": len(context_messages) < len(recent_messages) + 1
}
async def _build_context(
self,
session_id: str,
recent_messages: list[dict]
) -> list[dict]:
"""构建发给 API 的上下文消息,包含摘要注入"""
# 估算当前历史的 token 数
estimated = sum(len(m.get("content", "")) // 4 for m in recent_messages)
if estimated <= self.max_context_tokens:
return list(recent_messages)
# 需要截断:获取摘要 + 最近消息
summary = await self.store.get_latest_summary(session_id)
# 取最近 10 条消息
trimmed = recent_messages[-10:]
if summary:
# 注入摘要
return [
{
"role": "user",
"content": f"[之前对话摘要]\n{summary}\n\n请基于以上背景继续对话。"
},
{
"role": "assistant",
"content": "我已了解之前的对话背景。"
},
*trimmed
]
return trimmed
async def _generate_summary_if_needed(
self,
session_id: str,
messages: list[dict]
):
"""在后台生成对话摘要"""
if len(messages) < 10:
return
# 取前 2/3 的历史生成摘要
to_summarize = messages[:len(messages) * 2 // 3]
conversation_text = "\n".join(
f"{'用户' if m['role'] == 'user' else 'Claude'}: {m['content'][:300]}"
for m in to_summarize
)
summary_response = self.client.messages.create(
model=self.summary_model,
max_tokens=500,
messages=[{
"role": "user",
"content": f"""请用 5-8 句话概括以下对话的关键内容,
包括:主题、用户提供的背景信息、做出的决定、待解决的问题。
{conversation_text}"""
}]
)
await self.store.save_summary(session_id, summary_response.content[0].text)
8.6 Token 成本追踪与预算控制
from collections import defaultdict
from datetime import date
class CostTracker:
"""
按用户和会话追踪 API 成本
"""
PRICING = {
"claude-opus-4-6": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
"claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
}
def __init__(self):
self._usage: dict[str, dict] = defaultdict(lambda: {
"input_tokens": 0,
"output_tokens": 0,
"cost_usd": 0.0,
"requests": 0
})
def record(
self,
user_id: str,
model: str,
input_tokens: int,
output_tokens: int
):
prices = self.PRICING.get(model, {"input": 3.0, "output": 15.0})
cost = (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000
key = f"{user_id}:{date.today().isoformat()}"
self._usage[key]["input_tokens"] += input_tokens
self._usage[key]["output_tokens"] += output_tokens
self._usage[key]["cost_usd"] += cost
self._usage[key]["requests"] += 1
def get_daily_cost(self, user_id: str) -> float:
key = f"{user_id}:{date.today().isoformat()}"
return self._usage[key]["cost_usd"]
def check_budget(self, user_id: str, daily_limit: float) -> bool:
"""检查用户是否超出每日预算"""
return self.get_daily_cost(user_id) < daily_limit
小结
多轮对话管理是构建实际 Claude 应用的核心工程问题,不是可以之后再考虑的细节:
-
API 无状态设计:每次请求都需要传入完整对话历史;随着对话增长,成本和延迟都会线性增加
-
三种上下文管理策略:
- 滑动窗口:最简单,适合无需长期记忆的场景
- 摘要压缩:保留早期信息的语义,适合大多数生产场景
- 结构化记忆:最精确,适合需要跟踪用户配置、决策的复杂场景
-
持久化设计:
- 将 messages 存入数据库(而不是仅在内存中)
- 为 conversation 和 message 建立独立表
- 异步生成和存储摘要,避免阻塞主对话流程
-
成本追踪:记录每次请求的 token 消耗,按用户和日期统计,实现预算控制
-
实践建议:
- 对于简单应用:滑动窗口(最近 10-15 轮)即可
- 对于客服/助手:摘要压缩 + PostgreSQL 持久化
- 对于复杂 Agent:结构化记忆 + 向量数据库长期记忆
多轮对话管理的设计决策会贯穿整个系统架构。第一周就做好这个决策,会节省你后来大量的重构工作。