第 59 章

Amazon Bedrock(Mantle)集成:SigV4 认证 / 区域端点 / 配额申请完整指南

第五十九章:与 LangChain 集成:混合 Agent 架构的最佳实践

59.1 为什么选择 LangChain + Claude 的组合

LangChain 是目前最流行的 LLM 应用框架之一,提供了链式调用、Agent 编排、记忆管理、向量存储等一整套构建 LLM 应用的基础设施。Claude 则是在复杂推理、指令遵循和安全性方面表现卓越的模型。将两者结合,可以充分发挥各自的优势:

尤其在构建需要访问多种外部工具、维护对话历史、结合知识库检索的 Agent 时,LangChain + Claude 的组合能显著减少自建基础设施的工作量。

59.2 环境配置与 ChatAnthropic 初始化

59.2.1 安装依赖

pip install langchain langchain-anthropic langchain-community
pip install langchain-chroma  # 向量存储(可选)

59.2.2 ChatAnthropic 基础配置

from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage

# 基础初始化
llm = ChatAnthropic(
    model="claude-opus-4-5",
    anthropic_api_key="your-api-key",  # 或设置环境变量 ANTHROPIC_API_KEY
    max_tokens=4096,
    temperature=0,      # 对于需要确定性输出的任务设为 0
    timeout=60,         # 请求超时(秒)
    max_retries=3,      # 自动重试次数
)

# 简单调用
response = llm.invoke([HumanMessage(content="解释一下量子纠缠")])
print(response.content)

# 带 System Prompt 的调用
messages = [
    SystemMessage(content="你是一个专业的金融分析师,用简洁的语言回答问题。"),
    HumanMessage(content="什么是市盈率?")
]
response = llm.invoke(messages)
print(response.content)

59.2.3 Streaming 支持

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-opus-4-5", streaming=True)

# 流式输出
for chunk in llm.stream([HumanMessage(content="写一首关于秋天的诗")]):
    print(chunk.content, end="", flush=True)

59.3 构建 LCEL 链(LangChain Expression Language)

LCEL 是 LangChain 的核心链式编排语法,使用 | 管道操作符连接各个组件。

59.3.1 基础 Prompt + LLM 链

from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

llm = ChatAnthropic(model="claude-opus-4-5")

# 创建 Prompt 模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的{domain}顾问,用{language}回答。"),
    ("human", "{question}")
])

# 输出解析器
parser = StrOutputParser()

# 使用 | 连接组成链
chain = prompt | llm | parser

# 调用链
result = chain.invoke({
    "domain": "法律",
    "language": "中文",
    "question": "劳动合同试用期最长可以多久?"
})
print(result)

59.3.2 带 RAG 的链

from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_chroma import Chroma
from langchain_anthropic import AnthropicEmbeddings

# 初始化向量存储和检索器
embeddings = AnthropicEmbeddings(model="voyage-3")
vectorstore = Chroma(
    collection_name="company_docs",
    embedding_function=embeddings,
    persist_directory="./chroma_db"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

# RAG Prompt 模板
RAG_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """你是公司的内部知识助手。根据以下检索到的文档回答问题。
如果文档中没有相关信息,请如实说明,不要编造答案。

检索到的文档:
{context}"""),
    ("human", "{question}")
])

llm = ChatAnthropic(model="claude-opus-4-5", temperature=0)

def format_docs(docs):
    return "\n\n---\n\n".join(
        f"来源:{doc.metadata.get('source', '未知')}\n{doc.page_content}"
        for doc in docs
    )

# 构建 RAG 链
rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough()
    }
    | RAG_PROMPT
    | llm
    | StrOutputParser()
)

answer = rag_chain.invoke("公司的年假政策是什么?")
print(answer)

59.3.3 Sequential Chain(顺序链)

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-opus-4-5")
parser = StrOutputParser()

# 第一步:分析用户反馈
analyze_prompt = ChatPromptTemplate.from_template(
    "分析以下用户反馈,提取核心问题点(用 JSON 格式输出):\n\n{feedback}"
)

# 第二步:生成改进建议
suggest_prompt = ChatPromptTemplate.from_template(
    "基于以下分析结果,给出具体的产品改进建议:\n\n{analysis}"
)

# 第三步:生成执行计划
plan_prompt = ChatPromptTemplate.from_template(
    "将以下改进建议转化为可执行的季度 OKR:\n\n{suggestions}"
)

# 链式连接三个步骤
analyze_chain = analyze_prompt | llm | parser
suggest_chain = suggest_prompt | llm | parser
plan_chain = plan_prompt | llm | parser

# 组合成完整流水线
full_pipeline = (
    analyze_chain
    | (lambda analysis: {"analysis": analysis})
    | suggest_chain
    | (lambda suggestions: {"suggestions": suggestions})
    | plan_chain
)

result = full_pipeline.invoke({
    "feedback": "产品加载太慢,而且界面不直观,新用户根本不知道怎么用"
})
print(result)

59.4 构建 LangChain Agent

LangChain Agent 让 Claude 能够动态决策使用哪些工具,非常适合复杂的多步骤任务。

59.4.1 使用内置工具的 Agent

from langchain_anthropic import ChatAnthropic
from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

# 初始化工具
search_tool = DuckDuckGoSearchRun(name="web_search")
wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())

tools = [search_tool, wiki_tool]

# 创建 Agent Prompt(注意 agent_scratchpad 占位符是必须的)
agent_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个有用的研究助手。使用搜索工具来获取最新信息,
使用维基百科工具来获取背景知识。综合多个来源给出准确的回答。"""),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

# 初始化 LLM(绑定工具)
llm = ChatAnthropic(model="claude-opus-4-5", temperature=0)

# 创建 Agent
agent = create_tool_calling_agent(llm, tools, agent_prompt)

# 创建 Agent Executor
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,  # 打印推理过程
    max_iterations=5,
    handle_parsing_errors=True
)

# 执行 Agent
result = agent_executor.invoke({
    "input": "2024年诺贝尔物理学奖得主是谁?他们做了什么研究?"
})
print(result["output"])

59.4.2 自定义工具的 Agent

from langchain_core.tools import tool
from typing import Annotated

# 使用 @tool 装饰器定义自定义工具
@tool
def query_customer_database(customer_id: Annotated[str, "客户 ID"]) -> str:
    """查询客户的基本信息和历史订单。仅用于查询,不修改数据。"""
    # 模拟数据库查询
    mock_data = {
        "C001": {
            "name": "张三",
            "email": "[email protected]",
            "orders": ["ORD-001", "ORD-002"],
            "total_spent": 2999.0
        }
    }
    customer = mock_data.get(customer_id)
    if not customer:
        return f"未找到客户 ID: {customer_id}"
    return str(customer)

@tool
def calculate_discount(
    total_amount: Annotated[float, "订单总金额"],
    customer_level: Annotated[str, "客户等级:silver/gold/platinum"]
) -> str:
    """根据客户等级计算折扣金额。"""
    discount_rates = {"silver": 0.05, "gold": 0.10, "platinum": 0.15}
    rate = discount_rates.get(customer_level, 0)
    discount = total_amount * rate
    return f"折扣率:{rate*100}%,折扣金额:{discount:.2f},实付:{total_amount - discount:.2f}"

@tool
def send_notification(
    customer_email: Annotated[str, "客户邮箱"],
    message: Annotated[str, "通知内容"]
) -> str:
    """向客户发送通知邮件(测试环境下仅打印,不真实发送)。"""
    print(f"[MOCK EMAIL] To: {customer_email}\nContent: {message}")
    return f"通知已发送至 {customer_email}"

# 将自定义工具加入 Agent
custom_tools = [query_customer_database, calculate_discount, send_notification]

llm = ChatAnthropic(model="claude-opus-4-5")
agent = create_tool_calling_agent(llm, custom_tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=custom_tools, verbose=True)

result = agent_executor.invoke({
    "input": "查询客户 C001 的信息,计算他 gold 会员身份下购买 5000 元商品的折扣,然后发邮件通知他"
})

59.5 添加记忆(Memory)

59.5.1 使用 ConversationBufferMemory

from langchain.memory import ConversationBufferWindowMemory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

llm = ChatAnthropic(model="claude-opus-4-5")

# 使用 LCEL 方式配置持久化对话历史
store = {}  # 实际应用中应使用 Redis 或数据库

def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryChatMessageHistory()
    return store[session_id]

# 带历史的 Prompt
prompt_with_history = ChatPromptTemplate.from_messages([
    ("system", "你是一个有记忆的 AI 助手。记住用户告诉你的信息。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

chain_with_history = RunnableWithMessageHistory(
    prompt_with_history | llm | StrOutputParser(),
    get_session_history,
    input_messages_key="input",
    history_messages_key="history"
)

# 多轮对话
session_id = "user_001_session_1"

chain_with_history.invoke(
    {"input": "我叫李明,是一名软件工程师。"},
    config={"configurable": {"session_id": session_id}}
)

response = chain_with_history.invoke(
    {"input": "你还记得我的名字和职业吗?"},
    config={"configurable": {"session_id": session_id}}
)
print(response)  # 应该能回忆起李明是软件工程师

59.6 混合 Agent 架构:Claude + 专用模型

在复杂的企业场景中,单一模型往往不是最优方案。混合 Agent 架构让不同的任务由最适合的模型处理。

from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableBranch, RunnableLambda

# Claude 处理复杂推理和内容生成
claude = ChatAnthropic(model="claude-opus-4-5", temperature=0)

# GPT-4o-mini 处理简单分类和提取(成本更低)
gpt_mini = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 分类任务(用便宜的模型)
classify_prompt = ChatPromptTemplate.from_template(
    "将以下用户请求分类为:simple_query/complex_analysis/creative_writing\n\n请求:{input}\n\n只输出分类名称:"
)
classify_chain = classify_prompt | gpt_mini | StrOutputParser()

# Claude 处理复杂分析
complex_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个深度分析专家,提供详细、有见解的分析。"),
    ("human", "{input}")
])
complex_chain = complex_prompt | claude | StrOutputParser()

# 简单查询(用便宜的模型)
simple_prompt = ChatPromptTemplate.from_template("简洁回答:{input}")
simple_chain = simple_prompt | gpt_mini | StrOutputParser()

# 路由逻辑
def route(info):
    classification = info["classification"]
    if "complex" in classification or "creative" in classification:
        return complex_chain
    return simple_chain

# 完整混合架构
hybrid_chain = (
    {"input": RunnablePassthrough(), "classification": classify_chain}
    | RunnableLambda(route)
)

# 测试
result1 = hybrid_chain.invoke("帮我分析中国房地产市场2025年的走势及其对银行业的影响")
result2 = hybrid_chain.invoke("Python 中如何判断一个字符串是否为空?")

59.7 错误处理与生产化配置

59.7.1 LangChain 的错误处理机制

from langchain_core.runnables import RunnableWithFallbacks
from langchain_anthropic import ChatAnthropic

# 主模型:Claude Opus(高质量,高成本)
primary_llm = ChatAnthropic(model="claude-opus-4-5")

# 备用模型:Claude Haiku(低成本,快速)
fallback_llm = ChatAnthropic(model="claude-haiku-4-5")

# 配置自动降级
llm_with_fallback = primary_llm.with_fallbacks([fallback_llm])

# 如果 Opus 失败,自动切换到 Haiku
try:
    response = llm_with_fallback.invoke("你好")
except Exception as e:
    print(f"所有模型均失败: {e}")

59.7.2 回调与可观测性

from langchain_core.callbacks import BaseCallbackHandler
import time

class PerformanceCallback(BaseCallbackHandler):
    """性能监控回调"""
    
    def __init__(self):
        self.start_times = {}
        self.metrics = []
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        run_id = kwargs.get("run_id", "unknown")
        self.start_times[str(run_id)] = time.time()
    
    def on_llm_end(self, response, **kwargs):
        run_id = str(kwargs.get("run_id", "unknown"))
        if run_id in self.start_times:
            elapsed = time.time() - self.start_times[run_id]
            token_usage = response.llm_output.get("usage", {}) if response.llm_output else {}
            self.metrics.append({
                "elapsed_s": round(elapsed, 2),
                "input_tokens": token_usage.get("input_tokens", 0),
                "output_tokens": token_usage.get("output_tokens", 0)
            })
    
    def on_tool_start(self, serialized, input_str, **kwargs):
        tool_name = serialized.get("name", "unknown")
        print(f"[Tool Call] {tool_name}: {input_str[:100]}")

callback = PerformanceCallback()

llm = ChatAnthropic(model="claude-opus-4-5", callbacks=[callback])
llm.invoke("解释机器学习中的过拟合问题")

print(f"调用指标: {callback.metrics}")

59.8 最佳实践总结

Prompt 设计建议:

# Claude 在 LangChain 中的最佳 System Prompt 结构
BEST_SYSTEM_PROMPT = """你是{role}。

## 能力
- {capability_1}
- {capability_2}

## 行为准则
- 始终基于提供的上下文回答,不要编造信息
- 如果不确定,明确说明
- 结构化输出时使用 Markdown 格式

## 约束
- 不要执行{constraint_1}
- 不要透露{constraint_2}"""

性能优化建议:

  1. 使用 temperature=0 获得确定性输出(适合结构化任务)
  2. 对长文档使用 max_tokens 限制,避免不必要的长输出
  3. 在 Agent 中设置 max_iterations 防止无限循环
  4. 使用 streaming=True 提升用户体验感知速度
  5. 对高频调用启用 LangChain 的 SQLite 缓存

小结

LangChain 与 Claude 的集成通过 ChatAnthropic 类实现,支持 LCEL 链式编排、Agent 工具调用、RAG 检索增强等核心场景。混合 Agent 架构允许按任务复杂度路由到不同模型,在质量与成本之间取得平衡。生产化时需重点关注:回调监控可观测性、自动降级策略、对话历史的持久化存储,以及 Agent 的迭代次数限制。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论