飞书、企业微信、钉钉深度集成与消息路由
第十八章:飞书、企业微信、钉钉深度集成与消息路由
把 Dify Agent 接入中国企业最主流的三大协作平台——飞书、企业微信和钉钉,实现统一消息路由、用户权限映射和跨平台会话管理,让 AI 助手无缝融入企业日常工作流。
本章导读
国内企业的 AI 助手落地,绕不开三个问题:员工在哪里工作(飞书/企业微信/钉钉),AI 的答案如何送达(消息推送/对话机器人),以及如何管理谁可以用什么功能(权限控制)。
本章深入三大平台的集成方案,不是简单的"Hello World Bot",而是生产级别的完整方案:
- 飞书(Lark):Bot 配置、消息卡片、飞书文档集成、会话机器人
- 企业微信:外部 Webhook、内部机器人、客服 API 对接
- 钉钉:群机器人、个人助手、审批工作流集成
读完本章,你将能够:
- 搭建企业级的多平台 AI 助手
- 实现统一的消息路由和分发层
- 处理三大平台的签名验证和消息格式转换
- 设计跨平台的用户身份映射和权限管理
Level 1:基础认知(1-3 年经验)
三大平台的集成模式对比
| 特性 | 飞书 | 企业微信 | 钉钉 |
|---|---|---|---|
| 机器人接入方式 | 自建应用/第三方应用 | 群机器人/企业应用 | 群机器人/企业应用 |
| 消息类型丰富度 | 极高(消息卡片、互动) | 中(Markdown/图片) | 高(消息卡片/表单) |
| API 文档质量 | 优秀 | 良好 | 良好 |
| 国际化支持 | 强(Lark = 飞书国际版) | 弱 | 弱 |
| 开放平台生态 | 丰富 | 丰富 | 较丰富 |
| Webhook 推送 | 支持 | 支持(企业级) | 支持 |
飞书 Bot 快速上手
步骤一:在飞书开放平台创建应用
- 访问 https://open.feishu.cn/app
- 点击"创建企业自建应用"
- 填写应用名称(如"智能助手 AI")
- 记录 App ID 和 App Secret
步骤二:配置机器人能力
在应用管理页面:
- 添加能力 → 机器人
- 事件与回调 → 添加事件:
接收消息(im.message.receive_v1) - 配置 Webhook URL:你的服务器地址 +
/webhook/feishu
步骤三:最简单的飞书 Bot(接收并回复消息)
from flask import Flask, request, jsonify
import httpx, json
app = Flask(__name__)
FEISHU_APP_ID = "cli_your_app_id"
FEISHU_APP_SECRET = "your_app_secret"
DIFY_API_KEY = "app-your_dify_key"
def get_feishu_token() -> str:
"""获取飞书访问令牌"""
resp = httpx.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}
)
return resp.json()["tenant_access_token"]
def send_feishu_message(open_id: str, text: str):
"""发送飞书文本消息"""
token = get_feishu_token()
httpx.post(
"https://open.feishu.cn/open-apis/im/v1/messages",
headers={"Authorization": f"Bearer {token}"},
params={"receive_id_type": "open_id"},
json={
"receive_id": open_id,
"msg_type": "text",
"content": json.dumps({"text": text}),
}
)
def call_dify(query: str, user_id: str) -> str:
"""调用 Dify API"""
resp = httpx.post(
"https://api.dify.ai/v1/chat-messages",
headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
json={"query": query, "user": user_id,
"response_mode": "blocking", "inputs": {}},
timeout=60.0
)
return resp.json().get("answer", "抱歉,无法获取回答")
@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook():
data = request.json
# 飞书 URL 验证(首次配置时)
if data.get("type") == "url_verification":
return jsonify({"challenge": data.get("challenge")})
# 处理消息事件
event = data.get("event", {})
if event.get("type") == "message":
msg = event.get("message", {})
if msg.get("message_type") == "text":
content = json.loads(msg["content"])
query = content.get("text", "").strip()
sender = event.get("sender", {})
open_id = sender.get("sender_id", {}).get("open_id")
if query and open_id:
# 调用 Dify 并回复
answer = call_dify(query, open_id)
send_feishu_message(open_id, answer)
return jsonify({"code": 0})
if __name__ == "__main__":
app.run(port=8000)
企业微信群机器人(最简集成)
import httpx, json
WXWORK_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"
def send_wxwork_message(text: str):
"""发送企业微信群消息"""
httpx.post(WXWORK_WEBHOOK, json={
"msgtype": "markdown",
"markdown": {
"content": text # 支持 Markdown 格式
}
})
def send_wxwork_card(title: str, description: str, url: str = None):
"""发送企业微信卡片消息"""
article = {"title": title, "description": description}
if url:
article["url"] = url
httpx.post(WXWORK_WEBHOOK, json={
"msgtype": "news",
"news": {"articles": [article]}
})
钉钉群机器人(最简集成)
import httpx, hmac, hashlib, base64, time, urllib.parse
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=your_token"
DINGTALK_SECRET = "your_secret"
def get_dingtalk_sign() -> tuple[str, str]:
"""生成钉钉签名(安全模式必须)"""
timestamp = str(round(time.time() * 1000))
sign_str = f"{timestamp}\n{DINGTALK_SECRET}"
sign = base64.b64encode(
hmac.new(DINGTALK_SECRET.encode(), sign_str.encode(), hashlib.sha256).digest()
).decode()
return timestamp, urllib.parse.quote_plus(sign)
def send_dingtalk_message(text: str):
"""发送钉钉文本消息"""
timestamp, sign = get_dingtalk_sign()
url = f"{DINGTALK_WEBHOOK}×tamp={timestamp}&sign={sign}"
httpx.post(url, json={
"msgtype": "markdown",
"markdown": {
"title": "AI 助手",
"text": text
}
})
Level 2:机制深解(3-5 年经验)
飞书深度集成:消息卡片与交互
飞书的消息卡片(Interactive Card)是最强大的消息类型,支持按钮、表单、动态更新等交互功能:
import httpx, json
from typing import Optional
class FeishuCardBuilder:
"""飞书消息卡片构建器"""
def build_ai_response_card(
self,
question: str,
answer: str,
confidence: float = 1.0,
sources: list[str] = None
) -> dict:
"""构建 AI 回答卡片"""
elements = [
# 问题区域
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"**问:** {question}"
}
},
{"tag": "hr"},
# 回答区域
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"**答:**\n{answer}"
}
},
]
# 来源引用
if sources:
source_text = "\n".join([f"- {s}" for s in sources[:3]])
elements.append({
"tag": "note",
"elements": [
{"tag": "plain_text",
"content": f"参考来源:\n{source_text}"}
]
})
# 反馈按钮
elements.extend([
{"tag": "hr"},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {"tag": "plain_text", "content": "👍 有帮助"},
"type": "primary",
"value": {"action": "like", "answer_id": "placeholder"}
},
{
"tag": "button",
"text": {"tag": "plain_text", "content": "👎 没帮助"},
"type": "danger",
"value": {"action": "dislike", "answer_id": "placeholder"}
},
{
"tag": "button",
"text": {"tag": "plain_text", "content": "🔄 重新回答"},
"type": "default",
"value": {"action": "regenerate", "question": question}
}
]
}
])
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": "AI 智能助手"},
"template": "blue"
},
"elements": elements
}
def build_loading_card(self, question: str) -> dict:
"""构建加载中卡片(流式响应时先显示这个)"""
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": "AI 正在思考..."},
"template": "grey"
},
"elements": [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"**问:** {question}\n\n⏳ 正在生成回答,请稍候..."
}
}
]
}
class FeishuClient:
"""飞书完整客户端"""
BASE_URL = "https://open.feishu.cn/open-apis"
def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
self._token = None
self._token_expire = 0
self.client = httpx.AsyncClient(timeout=30.0)
self.card_builder = FeishuCardBuilder()
async def get_token(self) -> str:
"""获取租户访问令牌(带缓存)"""
import time
if self._token and time.time() < self._token_expire - 60:
return self._token
resp = await self.client.post(
f"{self.BASE_URL}/auth/v3/tenant_access_token/internal",
json={"app_id": self.app_id, "app_secret": self.app_secret}
)
data = resp.json()
self._token = data["tenant_access_token"]
self._token_expire = time.time() + data.get("expire", 7200)
return self._token
async def send_card(
self,
receive_id: str,
card: dict,
receive_id_type: str = "open_id"
) -> dict:
"""发送消息卡片"""
token = await self.get_token()
resp = await self.client.post(
f"{self.BASE_URL}/im/v1/messages",
headers={"Authorization": f"Bearer {token}"},
params={"receive_id_type": receive_id_type},
json={
"receive_id": receive_id,
"msg_type": "interactive",
"content": json.dumps(card)
}
)
resp.raise_for_status()
return resp.json()
async def update_card(self, message_id: str, card: dict) -> dict:
"""更新已发送的消息卡片(用于流式输出效果)"""
token = await self.get_token()
resp = await self.client.patch(
f"{self.BASE_URL}/im/v1/messages/{message_id}",
headers={"Authorization": f"Bearer {token}"},
json={
"msg_type": "interactive",
"content": json.dumps(card)
}
)
resp.raise_for_status()
return resp.json()
async def get_user_info(self, open_id: str) -> dict:
"""获取用户信息"""
token = await self.get_token()
resp = await self.client.get(
f"{self.BASE_URL}/contact/v3/users/{open_id}",
headers={"Authorization": f"Bearer {token}"},
params={"user_id_type": "open_id"}
)
resp.raise_for_status()
return resp.json().get("data", {}).get("user", {})
async def handle_card_action(self, action_data: dict) -> Optional[dict]:
"""处理卡片按钮点击事件"""
action = action_data.get("action", {})
value = action.get("value", {})
open_id = action_data.get("open_id")
if value.get("action") == "like":
# 记录正向反馈
await self._record_feedback(value.get("answer_id"), "like", open_id)
return {"toast": {"type": "success", "content": "感谢你的反馈!"}}
elif value.get("action") == "dislike":
await self._record_feedback(value.get("answer_id"), "dislike", open_id)
return {"toast": {"type": "info", "content": "感谢反馈,我们会继续改进"}}
elif value.get("action") == "regenerate":
# 重新调用 Dify 生成答案
question = value.get("question", "")
new_answer = await call_dify_async(question, open_id)
new_card = self.card_builder.build_ai_response_card(question, new_answer)
return new_card # 返回新卡片内容(飞书会自动更新卡片)
return None
飞书签名验证(安全必须):
import hashlib, hmac
def verify_feishu_signature(
timestamp: str,
nonce: str,
body: str,
verify_token: str
) -> bool:
"""验证飞书 Webhook 签名"""
sign_str = timestamp + nonce + verify_token + body
signature = hashlib.sha256(sign_str.encode()).hexdigest()
received = request.headers.get("X-Lark-Signature", "")
return hmac.compare_digest(signature, received)
@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook_secure():
"""带签名验证的飞书 Webhook"""
timestamp = request.headers.get("X-Lark-Request-Timestamp", "")
nonce = request.headers.get("X-Lark-Request-Nonce", "")
body = request.get_data(as_text=True)
verify_token = os.getenv("FEISHU_VERIFY_TOKEN")
if not verify_feishu_signature(timestamp, nonce, body, verify_token):
return jsonify({"code": 403, "msg": "signature mismatch"}), 403
# 防重放攻击:检查 timestamp 是否在 5 分钟内
import time
if abs(time.time() - int(timestamp)) > 300:
return jsonify({"code": 403, "msg": "timestamp expired"}), 403
data = json.loads(body)
# ... 处理消息
return jsonify({"code": 0})
企业微信深度集成
企业微信有两种集成模式:群机器人(简单,只能发消息)和企业应用(复杂,支持双向通信):
import httpx, hashlib, xml.etree.ElementTree as ET
from typing import Optional
class WxWorkClient:
"""企业微信完整客户端"""
def __init__(self, corp_id: str, corp_secret: str, agent_id: int):
self.corp_id = corp_id
self.corp_secret = corp_secret
self.agent_id = agent_id
self.client = httpx.AsyncClient(timeout=30.0)
self._token = None
self._token_expire = 0
async def get_access_token(self) -> str:
"""获取企业微信 AccessToken"""
import time
if self._token and time.time() < self._token_expire - 60:
return self._token
resp = await self.client.get(
"https://qyapi.weixin.qq.com/cgi-bin/gettoken",
params={"corpid": self.corp_id, "corpsecret": self.corp_secret}
)
data = resp.json()
self._token = data["access_token"]
self._token_expire = time.time() + data.get("expires_in", 7200)
return self._token
async def send_message_to_user(
self,
user_id: str,
content: str,
msg_type: str = "text"
) -> dict:
"""给指定员工发送消息(企业应用)"""
token = await self.get_access_token()
payload = {
"touser": user_id,
"msgtype": msg_type,
"agentid": self.agent_id,
}
if msg_type == "text":
payload["text"] = {"content": content}
elif msg_type == "markdown":
payload["markdown"] = {"content": content}
resp = await self.client.post(
"https://qyapi.weixin.qq.com/cgi-bin/message/send",
params={"access_token": token},
json=payload
)
resp.raise_for_status()
return resp.json()
async def send_markdown_card(
self,
user_id: str,
title: str,
description: str,
url: Optional[str] = None
) -> dict:
"""发送图文消息(News 类型)"""
token = await self.get_access_token()
article = {"title": title, "description": description, "picurl": ""}
if url:
article["url"] = url
resp = await self.client.post(
"https://qyapi.weixin.qq.com/cgi-bin/message/send",
params={"access_token": token},
json={
"touser": user_id,
"msgtype": "news",
"agentid": self.agent_id,
"news": {"articles": [article]}
}
)
resp.raise_for_status()
return resp.json()
async def get_user_by_userid(self, userid: str) -> dict:
"""获取员工信息"""
token = await self.get_access_token()
resp = await self.client.get(
"https://qyapi.weixin.qq.com/cgi-bin/user/get",
params={"access_token": token, "userid": userid}
)
resp.raise_for_status()
return resp.json()
def verify_wxwork_signature(
token: str,
timestamp: str,
nonce: str,
echostr: str = ""
) -> str:
"""验证企业微信 Webhook 签名"""
items = sorted([token, timestamp, nonce])
sig = hashlib.sha1("".join(items).encode()).hexdigest()
return sig
@app.route("/webhook/wxwork", methods=["GET", "POST"])
def wxwork_webhook():
token = os.getenv("WXWORK_TOKEN")
timestamp = request.args.get("timestamp", "")
nonce = request.args.get("nonce", "")
if request.method == "GET":
# 验证 Webhook URL
echostr = request.args.get("echostr", "")
sig = verify_wxwork_signature(token, timestamp, nonce)
if sig == request.args.get("msg_signature"):
return echostr # 原样返回 echostr 表示验证通过
return "signature error", 403
# 处理消息推送(POST)
xml_str = request.get_data(as_text=True)
root = ET.fromstring(xml_str)
msg_type = root.findtext("MsgType")
from_user = root.findtext("FromUserName")
if msg_type == "text":
content = root.findtext("Content", "").strip()
# 调用 Dify 并回复
answer = call_dify(content, from_user)
wxwork_client.send_message_to_user(from_user, answer)
return "success"
钉钉深度集成
import httpx, hmac, hashlib, base64, time
import urllib.parse
from typing import Optional
class DingTalkClient:
"""钉钉完整客户端"""
def __init__(self, app_key: str, app_secret: str):
self.app_key = app_key
self.app_secret = app_secret
self.client = httpx.AsyncClient(timeout=30.0)
self._token = None
self._token_expire = 0
async def get_access_token(self) -> str:
"""获取钉钉 AccessToken"""
import time
if self._token and time.time() < self._token_expire - 60:
return self._token
resp = await self.client.post(
"https://oapi.dingtalk.com/gettoken",
params={"appkey": self.app_key, "appsecret": self.app_secret}
)
data = resp.json()
self._token = data["access_token"]
self._token_expire = time.time() + data.get("expires_in", 7200)
return self._token
async def send_work_notification(
self,
user_id: str,
title: str,
content: str,
) -> dict:
"""发送钉钉工作通知"""
token = await self.get_access_token()
resp = await self.client.post(
"https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2",
params={"access_token": token},
json={
"agent_id": int(os.getenv("DINGTALK_AGENT_ID")),
"userid_list": user_id,
"msg": {
"msgtype": "action_card",
"action_card": {
"title": title,
"markdown": content,
"btn_orientation": "0",
"btns": [
{"title": "查看详情", "action_url": ""}
]
}
}
}
)
resp.raise_for_status()
return resp.json()
def build_group_robot_signed_url(
self,
webhook_url: str,
secret: str
) -> str:
"""构建钉钉群机器人签名 URL"""
timestamp = str(round(time.time() * 1000))
sign_content = f"{timestamp}\n{secret}"
signature = base64.b64encode(
hmac.new(secret.encode(), sign_content.encode(), hashlib.sha256).digest()
).decode()
return f"{webhook_url}×tamp={timestamp}&sign={urllib.parse.quote_plus(signature)}"
async def send_group_message(
self,
webhook_url: str,
secret: str,
content: str,
msg_type: str = "markdown"
) -> dict:
"""发送钉钉群消息"""
signed_url = self.build_group_robot_signed_url(webhook_url, secret)
payload = {}
if msg_type == "text":
payload = {"msgtype": "text", "text": {"content": content}}
elif msg_type == "markdown":
payload = {"msgtype": "markdown", "markdown": {
"title": "AI 助手", "text": content
}}
resp = await self.client.post(signed_url, json=payload)
resp.raise_for_status()
return resp.json()
async def get_user_info_by_userid(self, user_id: str) -> dict:
"""通过 userid 获取用户信息"""
token = await self.get_access_token()
resp = await self.client.post(
"https://oapi.dingtalk.com/topapi/v2/user/get",
params={"access_token": token},
json={"userid": user_id, "language": "zh_CN"}
)
resp.raise_for_status()
return resp.json().get("result", {})
统一消息路由层设计
在多平台部署时,应该构建一个统一的消息路由层,避免为每个平台重复编写业务逻辑:
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Any
import asyncio
class Platform(str, Enum):
FEISHU = "feishu"
WXWORK = "wxwork"
DINGTALK = "dingtalk"
WEB = "web"
API = "api"
@dataclass
class UnifiedMessage:
"""平台无关的统一消息格式"""
platform: Platform
platform_user_id: str # 平台内的用户 ID
content: str # 消息内容(纯文本)
message_id: str # 平台消息 ID(用于回复)
session_id: Optional[str] = None # 会话/对话 ID
metadata: dict = field(default_factory=dict)
@dataclass
class UnifiedResponse:
"""平台无关的统一回答格式"""
content: str
content_type: str = "text" # "text" | "markdown" | "card"
attachments: list = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class UnifiedMessageRouter:
"""统一消息路由器"""
def __init__(
self,
feishu_client: FeishuClient,
wxwork_client: WxWorkClient,
dingtalk_client: DingTalkClient,
dify_client,
user_mapper: "UserIdentityMapper"
):
self.clients = {
Platform.FEISHU: feishu_client,
Platform.WXWORK: wxwork_client,
Platform.DINGTALK: dingtalk_client,
}
self.dify = dify_client
self.user_mapper = user_mapper
self.handlers = {} # 业务处理函数注册表
def register_handler(self, intent: str, handler: callable):
"""注册意图处理函数"""
self.handlers[intent] = handler
async def route(self, message: UnifiedMessage) -> UnifiedResponse:
"""
统一路由入口:
1. 识别用户意图
2. 检查权限
3. 调用 Dify
4. 格式化响应
"""
# 用户身份映射(跨平台统一)
unified_user_id = await self.user_mapper.get_unified_id(
message.platform, message.platform_user_id
)
# 权限检查
if not await self._check_permission(unified_user_id, message.content):
return UnifiedResponse(
content="抱歉,你没有权限使用此功能。请联系管理员。"
)
# 获取对话历史(跨平台共享)
conversation_id = await self._get_conversation_id(unified_user_id)
# 调用 Dify
try:
dify_response = await self.dify.send_message_async(
query = message.content,
user = unified_user_id,
conversation_id = conversation_id
)
answer = dify_response["answer"]
new_conv_id = dify_response.get("conversation_id", conversation_id)
await self._save_conversation_id(unified_user_id, new_conv_id)
except Exception as e:
answer = f"AI 服务暂时不可用:{str(e)}\n请稍后重试。"
return UnifiedResponse(
content = answer,
content_type = self._determine_content_type(answer, message.platform)
)
async def send_response(
self,
platform: Platform,
platform_user_id: str,
response: UnifiedResponse
):
"""将响应发送回对应平台"""
client = self.clients.get(platform)
if not client:
raise ValueError(f"未知平台: {platform}")
if platform == Platform.FEISHU:
if response.content_type == "card":
await client.send_card(platform_user_id, response.metadata.get("card"))
else:
# 构建 Markdown 消息卡片
card = client.card_builder.build_ai_response_card(
question="", answer=response.content
)
await client.send_card(platform_user_id, card)
elif platform == Platform.WXWORK:
await client.send_message_to_user(
platform_user_id,
response.content,
msg_type="markdown"
)
elif platform == Platform.DINGTALK:
await client.send_work_notification(
platform_user_id,
"AI 助手回复",
response.content
)
def _determine_content_type(self, content: str, platform: Platform) -> str:
"""根据内容特征和平台确定最佳展示格式"""
has_markdown = any(marker in content for marker in ["**", "##", "- ", "```"])
if has_markdown:
return "markdown" if platform != Platform.FEISHU else "card"
return "text"
Level 3:源码与原理(5 年以上)
用户身份映射系统
跨平台 AI 助手的核心挑战之一:飞书用户、企业微信用户和钉钉用户可能是同一个自然人,需要统一映射到一个内部用户 ID:
import asyncio
import aioredis
from pydantic import BaseModel
from typing import Optional
class UserMapping(BaseModel):
"""用户跨平台身份映射"""
internal_user_id: str # 内部统一用户 ID
feishu_open_id: Optional[str] = None
feishu_union_id: Optional[str] = None
wxwork_userid: Optional[str] = None
dingtalk_userid: Optional[str] = None
email: Optional[str] = None # 通过邮箱匹配
name: Optional[str] = None
class UserIdentityMapper:
"""用户身份映射器"""
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db # 持久化数据库
async def get_unified_id(
self, platform: Platform, platform_user_id: str
) -> str:
"""
获取内部统一用户 ID
优先从缓存查询,再查数据库,找不到则创建新用户
"""
cache_key = f"user_mapping:{platform}:{platform_user_id}"
cached = await self.redis.get(cache_key)
if cached:
return cached.decode()
# 从数据库查询
mapping = await self.db.find_mapping(platform, platform_user_id)
if mapping:
await self.redis.setex(cache_key, 3600, mapping.internal_user_id)
return mapping.internal_user_id
# 创建新用户
new_id = await self._create_user(platform, platform_user_id)
await self.redis.setex(cache_key, 3600, new_id)
return new_id
async def link_accounts(
self,
internal_user_id: str,
platform: Platform,
platform_user_id: str
):
"""
关联账号:将平台账号关联到已有的内部用户
(用于跨平台账号合并)
"""
await self.db.update_mapping(
internal_user_id=internal_user_id,
platform=platform,
platform_user_id=platform_user_id
)
# 更新缓存
cache_key = f"user_mapping:{platform}:{platform_user_id}"
await self.redis.setex(cache_key, 3600, internal_user_id)
async def get_all_platform_ids(self, internal_user_id: str) -> dict:
"""获取用户在所有平台的 ID"""
mapping = await self.db.get_by_internal_id(internal_user_id)
if not mapping:
return {}
return {
Platform.FEISHU: mapping.feishu_open_id,
Platform.WXWORK: mapping.wxwork_userid,
Platform.DINGTALK: mapping.dingtalk_userid,
}
async def _create_user(self, platform: Platform, platform_user_id: str) -> str:
"""创建新的内部用户"""
import uuid
new_id = f"user_{uuid.uuid4().hex[:8]}"
mapping = UserMapping(
internal_user_id=new_id,
**{f"{platform}_userid" if platform != Platform.FEISHU
else "feishu_open_id": platform_user_id}
)
await self.db.save_mapping(mapping)
return new_id
跨平台会话同步
用户可以在飞书开始一段对话,在企业微信继续,要实现这种体验,需要跨平台会话同步:
class CrossPlatformSessionManager:
"""跨平台会话管理器"""
def __init__(self, redis_client, dify_client):
self.redis = redis_client
self.dify = dify_client
async def get_or_create_session(
self,
unified_user_id: str,
platform: Platform,
mode: str = "continue" # "continue" | "new"
) -> str:
"""
获取或创建 Dify 会话 ID
mode="continue": 继续上次的会话(跨平台共享)
mode="new": 强制开始新会话
"""
session_key = f"session:{unified_user_id}"
if mode == "new":
await self.redis.delete(session_key)
return ""
# 获取现有会话
session_data = await self.redis.hgetall(session_key)
if session_data:
conv_id = session_data.get(b"conv_id", b"").decode()
if conv_id:
return conv_id
return "" # 返回空串让 Dify 创建新会话
async def save_session(
self,
unified_user_id: str,
conv_id: str,
platform: Platform,
message_count: int = 1
):
"""保存会话状态(跨平台共享)"""
session_key = f"session:{unified_user_id}"
await self.redis.hset(session_key, mapping={
"conv_id": conv_id,
"last_platform": platform,
"message_count": message_count,
"last_active": str(time.time()),
})
await self.redis.expire(session_key, 86400 * 7) # 7 天过期
async def get_session_summary(self, unified_user_id: str) -> dict:
"""获取会话摘要"""
session_key = f"session:{unified_user_id}"
data = await self.redis.hgetall(session_key)
if not data:
return {}
return {
"conversation_id": data.get(b"conv_id", b"").decode(),
"last_platform": data.get(b"last_platform", b"").decode(),
"message_count": int(data.get(b"message_count", b"0")),
"last_active": float(data.get(b"last_active", b"0")),
}
Dify 的平台集成功能(内置)
Dify 也提供了一些内置的平台集成功能,位于控制台的"渠道"(Channel)设置中:
# Dify 内置的渠道配置(通过 API 设置)
import httpx
DIFY_ADMIN_URL = "https://your-dify.com/console/api"
ADMIN_TOKEN = "your_admin_token"
def configure_feishu_channel(app_id: str, app_secret: str, verify_token: str) -> dict:
"""通过 Dify API 配置飞书渠道"""
resp = httpx.post(
f"{DIFY_ADMIN_URL}/channels",
headers={"Authorization": f"Bearer {ADMIN_TOKEN}"},
json={
"type": "feishu",
"config": {
"app_id": app_id,
"app_secret": app_secret,
"verify_token": verify_token,
}
}
)
resp.raise_for_status()
return resp.json()
# 注意:Dify 内置渠道功能较为基础
# 生产环境推荐使用本章介绍的自定义集成方案,灵活性更高
Level 4:生产陷阱与决策(专家视角)
陷阱一:消息重复处理
飞书、企业微信和钉钉的 Webhook 都可能重复发送消息(网络超时重试)。如果业务逻辑不幂等,会导致 AI 重复回复同一条消息:
import redis
import hashlib
class MessageDeduplicator:
"""消息去重处理器"""
def __init__(self, redis_client):
self.redis = redis_client
def is_duplicate(self, message_id: str, platform: str) -> bool:
"""
检查消息是否重复
使用 Redis SET NX 实现原子性去重
"""
dedup_key = f"msg_dedup:{platform}:{message_id}"
# SET key 1 EX 300 NX:如果 key 不存在则设置(原子操作)
result = self.redis.set(dedup_key, "1", ex=300, nx=True)
return result is None # None 表示 key 已存在(重复消息)
@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook_deduped():
data = request.json
event = data.get("event", {})
message_id = event.get("message", {}).get("message_id", "")
# 去重检查
if deduplicator.is_duplicate(message_id, "feishu"):
return jsonify({"code": 0}) # 静默忽略重复消息
# 正常处理消息...
return jsonify({"code": 0})
陷阱二:飞书卡片的更新时机
在飞书实现流式输出时,不应该每收到一个字符就更新卡片(会触发 API 频率限制),而应该批量更新:
import asyncio
class FeishuStreamingUpdater:
"""飞书流式卡片更新器(批量更新)"""
UPDATE_INTERVAL = 0.5 # 每 0.5 秒更新一次
def __init__(self, feishu_client: FeishuClient, message_id: str, question: str):
self.client = feishu_client
self.message_id = message_id
self.question = question
self._buffer = ""
self._done = False
async def update_loop(self):
"""后台更新任务"""
while not self._done or self._buffer:
await asyncio.sleep(self.UPDATE_INTERVAL)
if self._buffer:
await self._push_update()
async def _push_update(self):
"""推送当前内容到飞书"""
card = self.client.card_builder.build_ai_response_card(
question=self.question,
answer=self._buffer + "⏳", # 流式状态加省略号
)
try:
await self.client.update_card(self.message_id, card)
except Exception as e:
if "rate limit" not in str(e).lower():
raise
def append(self, chunk: str):
"""追加新内容(由 Dify 流式回调调用)"""
self._buffer += chunk
async def finish(self):
"""完成流式输出,发送最终卡片"""
self._done = True
card = self.client.card_builder.build_ai_response_card(
question=self.question,
answer=self._buffer
)
await self.client.update_card(self.message_id, card)
async def handle_feishu_message_with_streaming(
question: str, open_id: str,
feishu_client: FeishuClient, dify_client
):
"""飞书消息处理:先发"思考中"卡片,流式更新,最后显示完整答案"""
# 发送"加载中"卡片,获取消息 ID
loading_card = feishu_client.card_builder.build_loading_card(question)
msg_resp = await feishu_client.send_card(open_id, loading_card)
message_id = msg_resp["data"]["message_id"]
# 创建流式更新器
updater = FeishuStreamingUpdater(feishu_client, message_id, question)
# 并发:更新循环 + 流式接收
update_task = asyncio.create_task(updater.update_loop())
async for event in dify_client.send_message_stream_async(question, open_id):
if event.get("event") == "message":
updater.append(event.get("answer", ""))
elif event.get("event") == "message_end":
break
await updater.finish()
update_task.cancel()
陷阱三:敏感数据在平台日志中留存
三大平台都会在其服务器上记录消息内容。如果用户在 AI 对话中涉及敏感信息(医疗、法律、财务),这些数据会留存在第三方服务器:
# 敏感内容检测与脱敏
import re
class SensitiveDataFilter:
"""敏感数据过滤器(在推送到消息平台前脱敏)"""
PATTERNS = {
"id_card": (r'\b\d{17}[\dXx]\b', "身份证号"),
"phone": (r'\b1[3-9]\d{9}\b', "手机号"),
"bank_card": (r'\b\d{16,19}\b', "银行卡号"),
"password": (r'密码[::]\s*\S+', "密码"),
}
def detect(self, text: str) -> list[str]:
"""检测敏感数据类型"""
found = []
for key, (pattern, name) in self.PATTERNS.items():
if re.search(pattern, text):
found.append(name)
return found
def mask(self, text: str) -> str:
"""脱敏(用于推送到消息平台)"""
result = text
# 手机号:保留前3后4位
result = re.sub(r'\b(1[3-9]\d)\d{4}(\d{4})\b', r'\1****\2', result)
# 身份证号:只显示前6后4位
result = re.sub(r'\b(\d{6})\d{8}([\dXx]{4})\b', r'\1********\2', result)
# 银行卡号:只显示后4位
result = re.sub(r'\b\d{12,15}(\d{4})\b', r'****\1', result)
return result
# 在消息路由层使用
async def route_with_privacy(message: UnifiedMessage, router: UnifiedMessageRouter):
sens_filter = SensitiveDataFilter()
detected = sens_filter.detect(message.content)
if detected:
# 记录用户访问了敏感功能(不记录内容)
audit_log.info(f"用户 {message.platform_user_id} 涉及敏感信息: {detected}")
response = await router.route(message)
# 发送前脱敏
response.content = sens_filter.mask(response.content)
return response
三大平台的功能对比与选型建议
消息丰富度:飞书(最高)> 钉钉(高)> 企业微信(中)
双向通信:
- 飞书:通过 Bot Event,天然支持用户主动发消息
- 企业微信:企业应用支持,群机器人仅单向
- 钉钉:企业应用支持,群机器人仅单向
API 友好度:
- 飞书:文档最完整,SDK 最丰富,Lark 国际版同一套 API
- 企业微信:文档较好,对接微信生态方便
- 钉钉:文档一般,签名机制复杂
推荐选型:
- 如果团队已在使用飞书 → 优先飞书集成(最强的消息卡片能力)
- 如果需要国际化 → 飞书(国内飞书 = 国际 Lark,一套代码两用)
- 如果需要微信生态打通 → 企业微信
- 如果行业习惯钉钉 → 钉钉
- 多平台覆盖 → 使用本章的统一消息路由层
本章小结
本章系统介绍了飞书、企业微信和钉钉的深度集成方案:
飞书集成核心:
- 消息卡片(Interactive Card)是最强的消息类型,支持按钮交互、动态更新
- 流式输出通过"先发加载卡片,再批量更新"实现(间隔 0.5s 更新,避免频率限制)
- 签名验证是安全必须项(timestamp + nonce + verify_token 的 SHA256)
企业微信集成核心:
- 群机器人仅支持单向发消息;双向对话需要企业应用
- 消息签名:SHA1(token, timestamp, nonce 排序后拼接)
- AccessToken 需要缓存(有效期约 7200s),避免频繁刷新
钉钉集成核心:
- 群机器人签名:HMAC-SHA256(timestamp + "\n" + secret),结果 base64 编码
- 工作通知(企业应用)支持 action_card 类型,可以有按钮
- 开放 API 接口需要配置 IP 白名单
统一路由层设计原则:
- 用户身份映射:飞书 open_id / 企业微信 userid / 钉钉 userid → 内部统一 ID
- 跨平台会话共享:同一用户在不同平台可以延续对话
- 消息去重:使用 Redis SET NX 防止重复处理(TTL 5 分钟)
- 敏感数据脱敏:手机号/身份证/银行卡在推送到平台前自动脱敏
关键数字:
- 飞书 Token 有效期:7200s(需要提前 60s 刷新)
- 消息重放防护时间窗口:5 分钟(超出则拒绝)
- 飞书卡片更新频率限制:建议不超过每 0.5s 一次
- 钉钉群机器人签名时效:同一个时间戳可在 60s 内重用