第 59 章
Amazon Bedrock(Mantle)集成:SigV4 认证 / 区域端点 / 配额申请完整指南
第五十九章:与 LangChain 集成:混合 Agent 架构的最佳实践
59.1 为什么选择 LangChain + Claude 的组合
LangChain 是目前最流行的 LLM 应用框架之一,提供了链式调用、Agent 编排、记忆管理、向量存储等一整套构建 LLM 应用的基础设施。Claude 则是在复杂推理、指令遵循和安全性方面表现卓越的模型。将两者结合,可以充分发挥各自的优势:
- LangChain 提供框架能力:工具生态、链式编排、多模态 Retrieval、Memory 管理
- Claude 提供推理能力:更长的上下文窗口(200K tokens)、更强的指令遵循、更少的幻觉
尤其在构建需要访问多种外部工具、维护对话历史、结合知识库检索的 Agent 时,LangChain + Claude 的组合能显著减少自建基础设施的工作量。
59.2 环境配置与 ChatAnthropic 初始化
59.2.1 安装依赖
pip install langchain langchain-anthropic langchain-community
pip install langchain-chroma # 向量存储(可选)
59.2.2 ChatAnthropic 基础配置
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
# 基础初始化
llm = ChatAnthropic(
model="claude-opus-4-5",
anthropic_api_key="your-api-key", # 或设置环境变量 ANTHROPIC_API_KEY
max_tokens=4096,
temperature=0, # 对于需要确定性输出的任务设为 0
timeout=60, # 请求超时(秒)
max_retries=3, # 自动重试次数
)
# 简单调用
response = llm.invoke([HumanMessage(content="解释一下量子纠缠")])
print(response.content)
# 带 System Prompt 的调用
messages = [
SystemMessage(content="你是一个专业的金融分析师,用简洁的语言回答问题。"),
HumanMessage(content="什么是市盈率?")
]
response = llm.invoke(messages)
print(response.content)
59.2.3 Streaming 支持
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-opus-4-5", streaming=True)
# 流式输出
for chunk in llm.stream([HumanMessage(content="写一首关于秋天的诗")]):
print(chunk.content, end="", flush=True)
59.3 构建 LCEL 链(LangChain Expression Language)
LCEL 是 LangChain 的核心链式编排语法,使用 | 管道操作符连接各个组件。
59.3.1 基础 Prompt + LLM 链
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatAnthropic(model="claude-opus-4-5")
# 创建 Prompt 模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的{domain}顾问,用{language}回答。"),
("human", "{question}")
])
# 输出解析器
parser = StrOutputParser()
# 使用 | 连接组成链
chain = prompt | llm | parser
# 调用链
result = chain.invoke({
"domain": "法律",
"language": "中文",
"question": "劳动合同试用期最长可以多久?"
})
print(result)
59.3.2 带 RAG 的链
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_chroma import Chroma
from langchain_anthropic import AnthropicEmbeddings
# 初始化向量存储和检索器
embeddings = AnthropicEmbeddings(model="voyage-3")
vectorstore = Chroma(
collection_name="company_docs",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# RAG Prompt 模板
RAG_PROMPT = ChatPromptTemplate.from_messages([
("system", """你是公司的内部知识助手。根据以下检索到的文档回答问题。
如果文档中没有相关信息,请如实说明,不要编造答案。
检索到的文档:
{context}"""),
("human", "{question}")
])
llm = ChatAnthropic(model="claude-opus-4-5", temperature=0)
def format_docs(docs):
return "\n\n---\n\n".join(
f"来源:{doc.metadata.get('source', '未知')}\n{doc.page_content}"
for doc in docs
)
# 构建 RAG 链
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough()
}
| RAG_PROMPT
| llm
| StrOutputParser()
)
answer = rag_chain.invoke("公司的年假政策是什么?")
print(answer)
59.3.3 Sequential Chain(顺序链)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-opus-4-5")
parser = StrOutputParser()
# 第一步:分析用户反馈
analyze_prompt = ChatPromptTemplate.from_template(
"分析以下用户反馈,提取核心问题点(用 JSON 格式输出):\n\n{feedback}"
)
# 第二步:生成改进建议
suggest_prompt = ChatPromptTemplate.from_template(
"基于以下分析结果,给出具体的产品改进建议:\n\n{analysis}"
)
# 第三步:生成执行计划
plan_prompt = ChatPromptTemplate.from_template(
"将以下改进建议转化为可执行的季度 OKR:\n\n{suggestions}"
)
# 链式连接三个步骤
analyze_chain = analyze_prompt | llm | parser
suggest_chain = suggest_prompt | llm | parser
plan_chain = plan_prompt | llm | parser
# 组合成完整流水线
full_pipeline = (
analyze_chain
| (lambda analysis: {"analysis": analysis})
| suggest_chain
| (lambda suggestions: {"suggestions": suggestions})
| plan_chain
)
result = full_pipeline.invoke({
"feedback": "产品加载太慢,而且界面不直观,新用户根本不知道怎么用"
})
print(result)
59.4 构建 LangChain Agent
LangChain Agent 让 Claude 能够动态决策使用哪些工具,非常适合复杂的多步骤任务。
59.4.1 使用内置工具的 Agent
from langchain_anthropic import ChatAnthropic
from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# 初始化工具
search_tool = DuckDuckGoSearchRun(name="web_search")
wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
tools = [search_tool, wiki_tool]
# 创建 Agent Prompt(注意 agent_scratchpad 占位符是必须的)
agent_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个有用的研究助手。使用搜索工具来获取最新信息,
使用维基百科工具来获取背景知识。综合多个来源给出准确的回答。"""),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
# 初始化 LLM(绑定工具)
llm = ChatAnthropic(model="claude-opus-4-5", temperature=0)
# 创建 Agent
agent = create_tool_calling_agent(llm, tools, agent_prompt)
# 创建 Agent Executor
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # 打印推理过程
max_iterations=5,
handle_parsing_errors=True
)
# 执行 Agent
result = agent_executor.invoke({
"input": "2024年诺贝尔物理学奖得主是谁?他们做了什么研究?"
})
print(result["output"])
59.4.2 自定义工具的 Agent
from langchain_core.tools import tool
from typing import Annotated
# 使用 @tool 装饰器定义自定义工具
@tool
def query_customer_database(customer_id: Annotated[str, "客户 ID"]) -> str:
"""查询客户的基本信息和历史订单。仅用于查询,不修改数据。"""
# 模拟数据库查询
mock_data = {
"C001": {
"name": "张三",
"email": "[email protected]",
"orders": ["ORD-001", "ORD-002"],
"total_spent": 2999.0
}
}
customer = mock_data.get(customer_id)
if not customer:
return f"未找到客户 ID: {customer_id}"
return str(customer)
@tool
def calculate_discount(
total_amount: Annotated[float, "订单总金额"],
customer_level: Annotated[str, "客户等级:silver/gold/platinum"]
) -> str:
"""根据客户等级计算折扣金额。"""
discount_rates = {"silver": 0.05, "gold": 0.10, "platinum": 0.15}
rate = discount_rates.get(customer_level, 0)
discount = total_amount * rate
return f"折扣率:{rate*100}%,折扣金额:{discount:.2f},实付:{total_amount - discount:.2f}"
@tool
def send_notification(
customer_email: Annotated[str, "客户邮箱"],
message: Annotated[str, "通知内容"]
) -> str:
"""向客户发送通知邮件(测试环境下仅打印,不真实发送)。"""
print(f"[MOCK EMAIL] To: {customer_email}\nContent: {message}")
return f"通知已发送至 {customer_email}"
# 将自定义工具加入 Agent
custom_tools = [query_customer_database, calculate_discount, send_notification]
llm = ChatAnthropic(model="claude-opus-4-5")
agent = create_tool_calling_agent(llm, custom_tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=custom_tools, verbose=True)
result = agent_executor.invoke({
"input": "查询客户 C001 的信息,计算他 gold 会员身份下购买 5000 元商品的折扣,然后发邮件通知他"
})
59.5 添加记忆(Memory)
59.5.1 使用 ConversationBufferMemory
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
llm = ChatAnthropic(model="claude-opus-4-5")
# 使用 LCEL 方式配置持久化对话历史
store = {} # 实际应用中应使用 Redis 或数据库
def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
if session_id not in store:
store[session_id] = InMemoryChatMessageHistory()
return store[session_id]
# 带历史的 Prompt
prompt_with_history = ChatPromptTemplate.from_messages([
("system", "你是一个有记忆的 AI 助手。记住用户告诉你的信息。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain_with_history = RunnableWithMessageHistory(
prompt_with_history | llm | StrOutputParser(),
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# 多轮对话
session_id = "user_001_session_1"
chain_with_history.invoke(
{"input": "我叫李明,是一名软件工程师。"},
config={"configurable": {"session_id": session_id}}
)
response = chain_with_history.invoke(
{"input": "你还记得我的名字和职业吗?"},
config={"configurable": {"session_id": session_id}}
)
print(response) # 应该能回忆起李明是软件工程师
59.6 混合 Agent 架构:Claude + 专用模型
在复杂的企业场景中,单一模型往往不是最优方案。混合 Agent 架构让不同的任务由最适合的模型处理。
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableBranch, RunnableLambda
# Claude 处理复杂推理和内容生成
claude = ChatAnthropic(model="claude-opus-4-5", temperature=0)
# GPT-4o-mini 处理简单分类和提取(成本更低)
gpt_mini = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 分类任务(用便宜的模型)
classify_prompt = ChatPromptTemplate.from_template(
"将以下用户请求分类为:simple_query/complex_analysis/creative_writing\n\n请求:{input}\n\n只输出分类名称:"
)
classify_chain = classify_prompt | gpt_mini | StrOutputParser()
# Claude 处理复杂分析
complex_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个深度分析专家,提供详细、有见解的分析。"),
("human", "{input}")
])
complex_chain = complex_prompt | claude | StrOutputParser()
# 简单查询(用便宜的模型)
simple_prompt = ChatPromptTemplate.from_template("简洁回答:{input}")
simple_chain = simple_prompt | gpt_mini | StrOutputParser()
# 路由逻辑
def route(info):
classification = info["classification"]
if "complex" in classification or "creative" in classification:
return complex_chain
return simple_chain
# 完整混合架构
hybrid_chain = (
{"input": RunnablePassthrough(), "classification": classify_chain}
| RunnableLambda(route)
)
# 测试
result1 = hybrid_chain.invoke("帮我分析中国房地产市场2025年的走势及其对银行业的影响")
result2 = hybrid_chain.invoke("Python 中如何判断一个字符串是否为空?")
59.7 错误处理与生产化配置
59.7.1 LangChain 的错误处理机制
from langchain_core.runnables import RunnableWithFallbacks
from langchain_anthropic import ChatAnthropic
# 主模型:Claude Opus(高质量,高成本)
primary_llm = ChatAnthropic(model="claude-opus-4-5")
# 备用模型:Claude Haiku(低成本,快速)
fallback_llm = ChatAnthropic(model="claude-haiku-4-5")
# 配置自动降级
llm_with_fallback = primary_llm.with_fallbacks([fallback_llm])
# 如果 Opus 失败,自动切换到 Haiku
try:
response = llm_with_fallback.invoke("你好")
except Exception as e:
print(f"所有模型均失败: {e}")
59.7.2 回调与可观测性
from langchain_core.callbacks import BaseCallbackHandler
import time
class PerformanceCallback(BaseCallbackHandler):
"""性能监控回调"""
def __init__(self):
self.start_times = {}
self.metrics = []
def on_llm_start(self, serialized, prompts, **kwargs):
run_id = kwargs.get("run_id", "unknown")
self.start_times[str(run_id)] = time.time()
def on_llm_end(self, response, **kwargs):
run_id = str(kwargs.get("run_id", "unknown"))
if run_id in self.start_times:
elapsed = time.time() - self.start_times[run_id]
token_usage = response.llm_output.get("usage", {}) if response.llm_output else {}
self.metrics.append({
"elapsed_s": round(elapsed, 2),
"input_tokens": token_usage.get("input_tokens", 0),
"output_tokens": token_usage.get("output_tokens", 0)
})
def on_tool_start(self, serialized, input_str, **kwargs):
tool_name = serialized.get("name", "unknown")
print(f"[Tool Call] {tool_name}: {input_str[:100]}")
callback = PerformanceCallback()
llm = ChatAnthropic(model="claude-opus-4-5", callbacks=[callback])
llm.invoke("解释机器学习中的过拟合问题")
print(f"调用指标: {callback.metrics}")
59.8 最佳实践总结
Prompt 设计建议:
# Claude 在 LangChain 中的最佳 System Prompt 结构
BEST_SYSTEM_PROMPT = """你是{role}。
## 能力
- {capability_1}
- {capability_2}
## 行为准则
- 始终基于提供的上下文回答,不要编造信息
- 如果不确定,明确说明
- 结构化输出时使用 Markdown 格式
## 约束
- 不要执行{constraint_1}
- 不要透露{constraint_2}"""
性能优化建议:
- 使用
temperature=0获得确定性输出(适合结构化任务) - 对长文档使用
max_tokens限制,避免不必要的长输出 - 在 Agent 中设置
max_iterations防止无限循环 - 使用
streaming=True提升用户体验感知速度 - 对高频调用启用 LangChain 的 SQLite 缓存
小结
LangChain 与 Claude 的集成通过 ChatAnthropic 类实现,支持 LCEL 链式编排、Agent 工具调用、RAG 检索增强等核心场景。混合 Agent 架构允许按任务复杂度路由到不同模型,在质量与成本之间取得平衡。生产化时需重点关注:回调监控可观测性、自动降级策略、对话历史的持久化存储,以及 Agent 的迭代次数限制。