第 18 章

飞书、企业微信、钉钉深度集成与消息路由

第十八章:飞书、企业微信、钉钉深度集成与消息路由

把 Dify Agent 接入中国企业最主流的三大协作平台——飞书、企业微信和钉钉,实现统一消息路由、用户权限映射和跨平台会话管理,让 AI 助手无缝融入企业日常工作流。

本章导读

国内企业的 AI 助手落地,绕不开三个问题:员工在哪里工作(飞书/企业微信/钉钉),AI 的答案如何送达(消息推送/对话机器人),以及如何管理谁可以用什么功能(权限控制)。

本章深入三大平台的集成方案,不是简单的"Hello World Bot",而是生产级别的完整方案:

读完本章,你将能够:


Level 1:基础认知(1-3 年经验)

三大平台的集成模式对比

特性 飞书 企业微信 钉钉
机器人接入方式 自建应用/第三方应用 群机器人/企业应用 群机器人/企业应用
消息类型丰富度 极高(消息卡片、互动) 中(Markdown/图片) 高(消息卡片/表单)
API 文档质量 优秀 良好 良好
国际化支持 强(Lark = 飞书国际版)
开放平台生态 丰富 丰富 较丰富
Webhook 推送 支持 支持(企业级) 支持

飞书 Bot 快速上手

步骤一:在飞书开放平台创建应用

  1. 访问 https://open.feishu.cn/app
  2. 点击"创建企业自建应用"
  3. 填写应用名称(如"智能助手 AI")
  4. 记录 App ID 和 App Secret

步骤二:配置机器人能力

在应用管理页面:

步骤三:最简单的飞书 Bot(接收并回复消息)

from flask import Flask, request, jsonify
import httpx, json

app = Flask(__name__)

FEISHU_APP_ID     = "cli_your_app_id"
FEISHU_APP_SECRET = "your_app_secret"
DIFY_API_KEY      = "app-your_dify_key"

def get_feishu_token() -> str:
    """获取飞书访问令牌"""
    resp = httpx.post(
        "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
        json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}
    )
    return resp.json()["tenant_access_token"]

def send_feishu_message(open_id: str, text: str):
    """发送飞书文本消息"""
    token = get_feishu_token()
    httpx.post(
        "https://open.feishu.cn/open-apis/im/v1/messages",
        headers={"Authorization": f"Bearer {token}"},
        params={"receive_id_type": "open_id"},
        json={
            "receive_id": open_id,
            "msg_type":   "text",
            "content":    json.dumps({"text": text}),
        }
    )

def call_dify(query: str, user_id: str) -> str:
    """调用 Dify API"""
    resp = httpx.post(
        "https://api.dify.ai/v1/chat-messages",
        headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
        json={"query": query, "user": user_id,
              "response_mode": "blocking", "inputs": {}},
        timeout=60.0
    )
    return resp.json().get("answer", "抱歉,无法获取回答")

@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook():
    data = request.json

    # 飞书 URL 验证(首次配置时)
    if data.get("type") == "url_verification":
        return jsonify({"challenge": data.get("challenge")})

    # 处理消息事件
    event = data.get("event", {})
    if event.get("type") == "message":
        msg = event.get("message", {})
        if msg.get("message_type") == "text":
            content  = json.loads(msg["content"])
            query    = content.get("text", "").strip()
            sender   = event.get("sender", {})
            open_id  = sender.get("sender_id", {}).get("open_id")

            if query and open_id:
                # 调用 Dify 并回复
                answer = call_dify(query, open_id)
                send_feishu_message(open_id, answer)

    return jsonify({"code": 0})

if __name__ == "__main__":
    app.run(port=8000)

企业微信群机器人(最简集成)

import httpx, json

WXWORK_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"

def send_wxwork_message(text: str):
    """发送企业微信群消息"""
    httpx.post(WXWORK_WEBHOOK, json={
        "msgtype": "markdown",
        "markdown": {
            "content": text  # 支持 Markdown 格式
        }
    })

def send_wxwork_card(title: str, description: str, url: str = None):
    """发送企业微信卡片消息"""
    article = {"title": title, "description": description}
    if url:
        article["url"] = url

    httpx.post(WXWORK_WEBHOOK, json={
        "msgtype": "news",
        "news": {"articles": [article]}
    })

钉钉群机器人(最简集成)

import httpx, hmac, hashlib, base64, time, urllib.parse

DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=your_token"
DINGTALK_SECRET  = "your_secret"

def get_dingtalk_sign() -> tuple[str, str]:
    """生成钉钉签名(安全模式必须)"""
    timestamp = str(round(time.time() * 1000))
    sign_str  = f"{timestamp}\n{DINGTALK_SECRET}"
    sign      = base64.b64encode(
        hmac.new(DINGTALK_SECRET.encode(), sign_str.encode(), hashlib.sha256).digest()
    ).decode()
    return timestamp, urllib.parse.quote_plus(sign)

def send_dingtalk_message(text: str):
    """发送钉钉文本消息"""
    timestamp, sign = get_dingtalk_sign()
    url = f"{DINGTALK_WEBHOOK}&timestamp={timestamp}&sign={sign}"
    httpx.post(url, json={
        "msgtype": "markdown",
        "markdown": {
            "title": "AI 助手",
            "text":  text
        }
    })

Level 2:机制深解(3-5 年经验)

飞书深度集成:消息卡片与交互

飞书的消息卡片(Interactive Card)是最强大的消息类型,支持按钮、表单、动态更新等交互功能:

import httpx, json
from typing import Optional

class FeishuCardBuilder:
    """飞书消息卡片构建器"""

    def build_ai_response_card(
        self,
        question:   str,
        answer:     str,
        confidence: float = 1.0,
        sources:    list[str] = None
    ) -> dict:
        """构建 AI 回答卡片"""
        elements = [
            # 问题区域
            {
                "tag": "div",
                "text": {
                    "tag":     "lark_md",
                    "content": f"**问:** {question}"
                }
            },
            {"tag": "hr"},
            # 回答区域
            {
                "tag": "div",
                "text": {
                    "tag":     "lark_md",
                    "content": f"**答:**\n{answer}"
                }
            },
        ]

        # 来源引用
        if sources:
            source_text = "\n".join([f"- {s}" for s in sources[:3]])
            elements.append({
                "tag": "note",
                "elements": [
                    {"tag": "plain_text",
                     "content": f"参考来源:\n{source_text}"}
                ]
            })

        # 反馈按钮
        elements.extend([
            {"tag": "hr"},
            {
                "tag": "action",
                "actions": [
                    {
                        "tag":   "button",
                        "text":  {"tag": "plain_text", "content": "👍 有帮助"},
                        "type":  "primary",
                        "value": {"action": "like", "answer_id": "placeholder"}
                    },
                    {
                        "tag":   "button",
                        "text":  {"tag": "plain_text", "content": "👎 没帮助"},
                        "type":  "danger",
                        "value": {"action": "dislike", "answer_id": "placeholder"}
                    },
                    {
                        "tag":   "button",
                        "text":  {"tag": "plain_text", "content": "🔄 重新回答"},
                        "type":  "default",
                        "value": {"action": "regenerate", "question": question}
                    }
                ]
            }
        ])

        return {
            "config":   {"wide_screen_mode": True},
            "header":   {
                "title": {"tag": "plain_text", "content": "AI 智能助手"},
                "template": "blue"
            },
            "elements": elements
        }

    def build_loading_card(self, question: str) -> dict:
        """构建加载中卡片(流式响应时先显示这个)"""
        return {
            "config": {"wide_screen_mode": True},
            "header": {
                "title":    {"tag": "plain_text", "content": "AI 正在思考..."},
                "template": "grey"
            },
            "elements": [
                {
                    "tag": "div",
                    "text": {
                        "tag":     "lark_md",
                        "content": f"**问:** {question}\n\n⏳ 正在生成回答,请稍候..."
                    }
                }
            ]
        }

class FeishuClient:
    """飞书完整客户端"""

    BASE_URL = "https://open.feishu.cn/open-apis"

    def __init__(self, app_id: str, app_secret: str):
        self.app_id     = app_id
        self.app_secret = app_secret
        self._token     = None
        self._token_expire = 0
        self.client     = httpx.AsyncClient(timeout=30.0)
        self.card_builder = FeishuCardBuilder()

    async def get_token(self) -> str:
        """获取租户访问令牌(带缓存)"""
        import time
        if self._token and time.time() < self._token_expire - 60:
            return self._token

        resp = await self.client.post(
            f"{self.BASE_URL}/auth/v3/tenant_access_token/internal",
            json={"app_id": self.app_id, "app_secret": self.app_secret}
        )
        data = resp.json()
        self._token        = data["tenant_access_token"]
        self._token_expire = time.time() + data.get("expire", 7200)
        return self._token

    async def send_card(
        self,
        receive_id:      str,
        card:            dict,
        receive_id_type: str = "open_id"
    ) -> dict:
        """发送消息卡片"""
        token = await self.get_token()
        resp  = await self.client.post(
            f"{self.BASE_URL}/im/v1/messages",
            headers={"Authorization": f"Bearer {token}"},
            params={"receive_id_type": receive_id_type},
            json={
                "receive_id": receive_id,
                "msg_type":   "interactive",
                "content":    json.dumps(card)
            }
        )
        resp.raise_for_status()
        return resp.json()

    async def update_card(self, message_id: str, card: dict) -> dict:
        """更新已发送的消息卡片(用于流式输出效果)"""
        token = await self.get_token()
        resp  = await self.client.patch(
            f"{self.BASE_URL}/im/v1/messages/{message_id}",
            headers={"Authorization": f"Bearer {token}"},
            json={
                "msg_type": "interactive",
                "content":  json.dumps(card)
            }
        )
        resp.raise_for_status()
        return resp.json()

    async def get_user_info(self, open_id: str) -> dict:
        """获取用户信息"""
        token = await self.get_token()
        resp  = await self.client.get(
            f"{self.BASE_URL}/contact/v3/users/{open_id}",
            headers={"Authorization": f"Bearer {token}"},
            params={"user_id_type": "open_id"}
        )
        resp.raise_for_status()
        return resp.json().get("data", {}).get("user", {})

    async def handle_card_action(self, action_data: dict) -> Optional[dict]:
        """处理卡片按钮点击事件"""
        action  = action_data.get("action", {})
        value   = action.get("value", {})
        open_id = action_data.get("open_id")

        if value.get("action") == "like":
            # 记录正向反馈
            await self._record_feedback(value.get("answer_id"), "like", open_id)
            return {"toast": {"type": "success", "content": "感谢你的反馈!"}}

        elif value.get("action") == "dislike":
            await self._record_feedback(value.get("answer_id"), "dislike", open_id)
            return {"toast": {"type": "info", "content": "感谢反馈,我们会继续改进"}}

        elif value.get("action") == "regenerate":
            # 重新调用 Dify 生成答案
            question   = value.get("question", "")
            new_answer = await call_dify_async(question, open_id)
            new_card   = self.card_builder.build_ai_response_card(question, new_answer)
            return new_card  # 返回新卡片内容(飞书会自动更新卡片)

        return None

飞书签名验证(安全必须):

import hashlib, hmac

def verify_feishu_signature(
    timestamp:    str,
    nonce:        str,
    body:         str,
    verify_token: str
) -> bool:
    """验证飞书 Webhook 签名"""
    sign_str  = timestamp + nonce + verify_token + body
    signature = hashlib.sha256(sign_str.encode()).hexdigest()
    received  = request.headers.get("X-Lark-Signature", "")
    return hmac.compare_digest(signature, received)

@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook_secure():
    """带签名验证的飞书 Webhook"""
    timestamp    = request.headers.get("X-Lark-Request-Timestamp", "")
    nonce        = request.headers.get("X-Lark-Request-Nonce", "")
    body         = request.get_data(as_text=True)
    verify_token = os.getenv("FEISHU_VERIFY_TOKEN")

    if not verify_feishu_signature(timestamp, nonce, body, verify_token):
        return jsonify({"code": 403, "msg": "signature mismatch"}), 403

    # 防重放攻击:检查 timestamp 是否在 5 分钟内
    import time
    if abs(time.time() - int(timestamp)) > 300:
        return jsonify({"code": 403, "msg": "timestamp expired"}), 403

    data = json.loads(body)
    # ... 处理消息
    return jsonify({"code": 0})

企业微信深度集成

企业微信有两种集成模式:群机器人(简单,只能发消息)和企业应用(复杂,支持双向通信):

import httpx, hashlib, xml.etree.ElementTree as ET
from typing import Optional

class WxWorkClient:
    """企业微信完整客户端"""

    def __init__(self, corp_id: str, corp_secret: str, agent_id: int):
        self.corp_id    = corp_id
        self.corp_secret = corp_secret
        self.agent_id   = agent_id
        self.client     = httpx.AsyncClient(timeout=30.0)
        self._token     = None
        self._token_expire = 0

    async def get_access_token(self) -> str:
        """获取企业微信 AccessToken"""
        import time
        if self._token and time.time() < self._token_expire - 60:
            return self._token

        resp = await self.client.get(
            "https://qyapi.weixin.qq.com/cgi-bin/gettoken",
            params={"corpid": self.corp_id, "corpsecret": self.corp_secret}
        )
        data = resp.json()
        self._token        = data["access_token"]
        self._token_expire = time.time() + data.get("expires_in", 7200)
        return self._token

    async def send_message_to_user(
        self,
        user_id: str,
        content: str,
        msg_type: str = "text"
    ) -> dict:
        """给指定员工发送消息(企业应用)"""
        token = await self.get_access_token()
        payload = {
            "touser":  user_id,
            "msgtype": msg_type,
            "agentid": self.agent_id,
        }
        if msg_type == "text":
            payload["text"]     = {"content": content}
        elif msg_type == "markdown":
            payload["markdown"] = {"content": content}

        resp = await self.client.post(
            "https://qyapi.weixin.qq.com/cgi-bin/message/send",
            params={"access_token": token},
            json=payload
        )
        resp.raise_for_status()
        return resp.json()

    async def send_markdown_card(
        self,
        user_id:     str,
        title:       str,
        description: str,
        url:         Optional[str] = None
    ) -> dict:
        """发送图文消息(News 类型)"""
        token = await self.get_access_token()
        article = {"title": title, "description": description, "picurl": ""}
        if url:
            article["url"] = url

        resp = await self.client.post(
            "https://qyapi.weixin.qq.com/cgi-bin/message/send",
            params={"access_token": token},
            json={
                "touser":  user_id,
                "msgtype": "news",
                "agentid": self.agent_id,
                "news":    {"articles": [article]}
            }
        )
        resp.raise_for_status()
        return resp.json()

    async def get_user_by_userid(self, userid: str) -> dict:
        """获取员工信息"""
        token = await self.get_access_token()
        resp  = await self.client.get(
            "https://qyapi.weixin.qq.com/cgi-bin/user/get",
            params={"access_token": token, "userid": userid}
        )
        resp.raise_for_status()
        return resp.json()

def verify_wxwork_signature(
    token:     str,
    timestamp: str,
    nonce:     str,
    echostr:   str = ""
) -> str:
    """验证企业微信 Webhook 签名"""
    items = sorted([token, timestamp, nonce])
    sig   = hashlib.sha1("".join(items).encode()).hexdigest()
    return sig

@app.route("/webhook/wxwork", methods=["GET", "POST"])
def wxwork_webhook():
    token     = os.getenv("WXWORK_TOKEN")
    timestamp = request.args.get("timestamp", "")
    nonce     = request.args.get("nonce", "")

    if request.method == "GET":
        # 验证 Webhook URL
        echostr = request.args.get("echostr", "")
        sig     = verify_wxwork_signature(token, timestamp, nonce)
        if sig == request.args.get("msg_signature"):
            return echostr  # 原样返回 echostr 表示验证通过
        return "signature error", 403

    # 处理消息推送(POST)
    xml_str = request.get_data(as_text=True)
    root    = ET.fromstring(xml_str)

    msg_type = root.findtext("MsgType")
    from_user = root.findtext("FromUserName")

    if msg_type == "text":
        content = root.findtext("Content", "").strip()
        # 调用 Dify 并回复
        answer = call_dify(content, from_user)
        wxwork_client.send_message_to_user(from_user, answer)

    return "success"

钉钉深度集成

import httpx, hmac, hashlib, base64, time
import urllib.parse
from typing import Optional

class DingTalkClient:
    """钉钉完整客户端"""

    def __init__(self, app_key: str, app_secret: str):
        self.app_key    = app_key
        self.app_secret = app_secret
        self.client     = httpx.AsyncClient(timeout=30.0)
        self._token     = None
        self._token_expire = 0

    async def get_access_token(self) -> str:
        """获取钉钉 AccessToken"""
        import time
        if self._token and time.time() < self._token_expire - 60:
            return self._token

        resp = await self.client.post(
            "https://oapi.dingtalk.com/gettoken",
            params={"appkey": self.app_key, "appsecret": self.app_secret}
        )
        data = resp.json()
        self._token        = data["access_token"]
        self._token_expire = time.time() + data.get("expires_in", 7200)
        return self._token

    async def send_work_notification(
        self,
        user_id: str,
        title:   str,
        content: str,
    ) -> dict:
        """发送钉钉工作通知"""
        token = await self.get_access_token()
        resp  = await self.client.post(
            "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2",
            params={"access_token": token},
            json={
                "agent_id":   int(os.getenv("DINGTALK_AGENT_ID")),
                "userid_list": user_id,
                "msg": {
                    "msgtype": "action_card",
                    "action_card": {
                        "title":       title,
                        "markdown":    content,
                        "btn_orientation": "0",
                        "btns": [
                            {"title": "查看详情", "action_url": ""}
                        ]
                    }
                }
            }
        )
        resp.raise_for_status()
        return resp.json()

    def build_group_robot_signed_url(
        self,
        webhook_url: str,
        secret:      str
    ) -> str:
        """构建钉钉群机器人签名 URL"""
        timestamp    = str(round(time.time() * 1000))
        sign_content = f"{timestamp}\n{secret}"
        signature    = base64.b64encode(
            hmac.new(secret.encode(), sign_content.encode(), hashlib.sha256).digest()
        ).decode()
        return f"{webhook_url}&timestamp={timestamp}&sign={urllib.parse.quote_plus(signature)}"

    async def send_group_message(
        self,
        webhook_url: str,
        secret:      str,
        content:     str,
        msg_type:    str = "markdown"
    ) -> dict:
        """发送钉钉群消息"""
        signed_url = self.build_group_robot_signed_url(webhook_url, secret)
        payload    = {}
        if msg_type == "text":
            payload = {"msgtype": "text", "text": {"content": content}}
        elif msg_type == "markdown":
            payload = {"msgtype": "markdown", "markdown": {
                "title": "AI 助手", "text": content
            }}

        resp = await self.client.post(signed_url, json=payload)
        resp.raise_for_status()
        return resp.json()

    async def get_user_info_by_userid(self, user_id: str) -> dict:
        """通过 userid 获取用户信息"""
        token = await self.get_access_token()
        resp  = await self.client.post(
            "https://oapi.dingtalk.com/topapi/v2/user/get",
            params={"access_token": token},
            json={"userid": user_id, "language": "zh_CN"}
        )
        resp.raise_for_status()
        return resp.json().get("result", {})

统一消息路由层设计

在多平台部署时,应该构建一个统一的消息路由层,避免为每个平台重复编写业务逻辑:

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Any
import asyncio

class Platform(str, Enum):
    FEISHU  = "feishu"
    WXWORK  = "wxwork"
    DINGTALK = "dingtalk"
    WEB      = "web"
    API      = "api"

@dataclass
class UnifiedMessage:
    """平台无关的统一消息格式"""
    platform:        Platform
    platform_user_id: str      # 平台内的用户 ID
    content:         str       # 消息内容(纯文本)
    message_id:      str       # 平台消息 ID(用于回复)
    session_id:      Optional[str] = None  # 会话/对话 ID
    metadata:        dict      = field(default_factory=dict)

@dataclass
class UnifiedResponse:
    """平台无关的统一回答格式"""
    content:     str
    content_type: str = "text"  # "text" | "markdown" | "card"
    attachments: list = field(default_factory=list)
    metadata:    dict = field(default_factory=dict)

class UnifiedMessageRouter:
    """统一消息路由器"""

    def __init__(
        self,
        feishu_client:   FeishuClient,
        wxwork_client:   WxWorkClient,
        dingtalk_client: DingTalkClient,
        dify_client,
        user_mapper:     "UserIdentityMapper"
    ):
        self.clients = {
            Platform.FEISHU:   feishu_client,
            Platform.WXWORK:   wxwork_client,
            Platform.DINGTALK: dingtalk_client,
        }
        self.dify        = dify_client
        self.user_mapper = user_mapper
        self.handlers    = {}  # 业务处理函数注册表

    def register_handler(self, intent: str, handler: callable):
        """注册意图处理函数"""
        self.handlers[intent] = handler

    async def route(self, message: UnifiedMessage) -> UnifiedResponse:
        """
        统一路由入口:
        1. 识别用户意图
        2. 检查权限
        3. 调用 Dify
        4. 格式化响应
        """
        # 用户身份映射(跨平台统一)
        unified_user_id = await self.user_mapper.get_unified_id(
            message.platform, message.platform_user_id
        )

        # 权限检查
        if not await self._check_permission(unified_user_id, message.content):
            return UnifiedResponse(
                content="抱歉,你没有权限使用此功能。请联系管理员。"
            )

        # 获取对话历史(跨平台共享)
        conversation_id = await self._get_conversation_id(unified_user_id)

        # 调用 Dify
        try:
            dify_response = await self.dify.send_message_async(
                query           = message.content,
                user            = unified_user_id,
                conversation_id = conversation_id
            )
            answer = dify_response["answer"]
            new_conv_id = dify_response.get("conversation_id", conversation_id)
            await self._save_conversation_id(unified_user_id, new_conv_id)

        except Exception as e:
            answer = f"AI 服务暂时不可用:{str(e)}\n请稍后重试。"

        return UnifiedResponse(
            content      = answer,
            content_type = self._determine_content_type(answer, message.platform)
        )

    async def send_response(
        self,
        platform: Platform,
        platform_user_id: str,
        response: UnifiedResponse
    ):
        """将响应发送回对应平台"""
        client = self.clients.get(platform)
        if not client:
            raise ValueError(f"未知平台: {platform}")

        if platform == Platform.FEISHU:
            if response.content_type == "card":
                await client.send_card(platform_user_id, response.metadata.get("card"))
            else:
                # 构建 Markdown 消息卡片
                card = client.card_builder.build_ai_response_card(
                    question="", answer=response.content
                )
                await client.send_card(platform_user_id, card)

        elif platform == Platform.WXWORK:
            await client.send_message_to_user(
                platform_user_id,
                response.content,
                msg_type="markdown"
            )

        elif platform == Platform.DINGTALK:
            await client.send_work_notification(
                platform_user_id,
                "AI 助手回复",
                response.content
            )

    def _determine_content_type(self, content: str, platform: Platform) -> str:
        """根据内容特征和平台确定最佳展示格式"""
        has_markdown = any(marker in content for marker in ["**", "##", "- ", "```"])
        if has_markdown:
            return "markdown" if platform != Platform.FEISHU else "card"
        return "text"

Level 3:源码与原理(5 年以上)

用户身份映射系统

跨平台 AI 助手的核心挑战之一:飞书用户、企业微信用户和钉钉用户可能是同一个自然人,需要统一映射到一个内部用户 ID:

import asyncio
import aioredis
from pydantic import BaseModel
from typing import Optional

class UserMapping(BaseModel):
    """用户跨平台身份映射"""
    internal_user_id: str       # 内部统一用户 ID
    feishu_open_id:   Optional[str] = None
    feishu_union_id:  Optional[str] = None
    wxwork_userid:    Optional[str] = None
    dingtalk_userid:  Optional[str] = None
    email:            Optional[str] = None  # 通过邮箱匹配
    name:             Optional[str] = None

class UserIdentityMapper:
    """用户身份映射器"""

    def __init__(self, redis_client, db):
        self.redis = redis_client
        self.db    = db  # 持久化数据库

    async def get_unified_id(
        self, platform: Platform, platform_user_id: str
    ) -> str:
        """
        获取内部统一用户 ID
        优先从缓存查询,再查数据库,找不到则创建新用户
        """
        cache_key = f"user_mapping:{platform}:{platform_user_id}"
        cached    = await self.redis.get(cache_key)
        if cached:
            return cached.decode()

        # 从数据库查询
        mapping = await self.db.find_mapping(platform, platform_user_id)
        if mapping:
            await self.redis.setex(cache_key, 3600, mapping.internal_user_id)
            return mapping.internal_user_id

        # 创建新用户
        new_id = await self._create_user(platform, platform_user_id)
        await self.redis.setex(cache_key, 3600, new_id)
        return new_id

    async def link_accounts(
        self,
        internal_user_id: str,
        platform:         Platform,
        platform_user_id: str
    ):
        """
        关联账号:将平台账号关联到已有的内部用户
        (用于跨平台账号合并)
        """
        await self.db.update_mapping(
            internal_user_id=internal_user_id,
            platform=platform,
            platform_user_id=platform_user_id
        )
        # 更新缓存
        cache_key = f"user_mapping:{platform}:{platform_user_id}"
        await self.redis.setex(cache_key, 3600, internal_user_id)

    async def get_all_platform_ids(self, internal_user_id: str) -> dict:
        """获取用户在所有平台的 ID"""
        mapping = await self.db.get_by_internal_id(internal_user_id)
        if not mapping:
            return {}
        return {
            Platform.FEISHU:   mapping.feishu_open_id,
            Platform.WXWORK:   mapping.wxwork_userid,
            Platform.DINGTALK: mapping.dingtalk_userid,
        }

    async def _create_user(self, platform: Platform, platform_user_id: str) -> str:
        """创建新的内部用户"""
        import uuid
        new_id  = f"user_{uuid.uuid4().hex[:8]}"
        mapping = UserMapping(
            internal_user_id=new_id,
            **{f"{platform}_userid" if platform != Platform.FEISHU
               else "feishu_open_id": platform_user_id}
        )
        await self.db.save_mapping(mapping)
        return new_id

跨平台会话同步

用户可以在飞书开始一段对话,在企业微信继续,要实现这种体验,需要跨平台会话同步:

class CrossPlatformSessionManager:
    """跨平台会话管理器"""

    def __init__(self, redis_client, dify_client):
        self.redis = redis_client
        self.dify  = dify_client

    async def get_or_create_session(
        self,
        unified_user_id: str,
        platform:        Platform,
        mode:            str = "continue"  # "continue" | "new"
    ) -> str:
        """
        获取或创建 Dify 会话 ID
        mode="continue": 继续上次的会话(跨平台共享)
        mode="new":      强制开始新会话
        """
        session_key = f"session:{unified_user_id}"

        if mode == "new":
            await self.redis.delete(session_key)
            return ""

        # 获取现有会话
        session_data = await self.redis.hgetall(session_key)
        if session_data:
            conv_id = session_data.get(b"conv_id", b"").decode()
            if conv_id:
                return conv_id

        return ""  # 返回空串让 Dify 创建新会话

    async def save_session(
        self,
        unified_user_id: str,
        conv_id:         str,
        platform:        Platform,
        message_count:   int = 1
    ):
        """保存会话状态(跨平台共享)"""
        session_key = f"session:{unified_user_id}"
        await self.redis.hset(session_key, mapping={
            "conv_id":        conv_id,
            "last_platform":  platform,
            "message_count":  message_count,
            "last_active":    str(time.time()),
        })
        await self.redis.expire(session_key, 86400 * 7)  # 7 天过期

    async def get_session_summary(self, unified_user_id: str) -> dict:
        """获取会话摘要"""
        session_key = f"session:{unified_user_id}"
        data        = await self.redis.hgetall(session_key)
        if not data:
            return {}
        return {
            "conversation_id": data.get(b"conv_id", b"").decode(),
            "last_platform":   data.get(b"last_platform", b"").decode(),
            "message_count":   int(data.get(b"message_count", b"0")),
            "last_active":     float(data.get(b"last_active", b"0")),
        }

Dify 的平台集成功能(内置)

Dify 也提供了一些内置的平台集成功能,位于控制台的"渠道"(Channel)设置中:

# Dify 内置的渠道配置(通过 API 设置)
import httpx

DIFY_ADMIN_URL  = "https://your-dify.com/console/api"
ADMIN_TOKEN     = "your_admin_token"

def configure_feishu_channel(app_id: str, app_secret: str, verify_token: str) -> dict:
    """通过 Dify API 配置飞书渠道"""
    resp = httpx.post(
        f"{DIFY_ADMIN_URL}/channels",
        headers={"Authorization": f"Bearer {ADMIN_TOKEN}"},
        json={
            "type": "feishu",
            "config": {
                "app_id":       app_id,
                "app_secret":   app_secret,
                "verify_token": verify_token,
            }
        }
    )
    resp.raise_for_status()
    return resp.json()

# 注意:Dify 内置渠道功能较为基础
# 生产环境推荐使用本章介绍的自定义集成方案,灵活性更高

Level 4:生产陷阱与决策(专家视角)

陷阱一:消息重复处理

飞书、企业微信和钉钉的 Webhook 都可能重复发送消息(网络超时重试)。如果业务逻辑不幂等,会导致 AI 重复回复同一条消息:

import redis
import hashlib

class MessageDeduplicator:
    """消息去重处理器"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def is_duplicate(self, message_id: str, platform: str) -> bool:
        """
        检查消息是否重复
        使用 Redis SET NX 实现原子性去重
        """
        dedup_key = f"msg_dedup:{platform}:{message_id}"
        # SET key 1 EX 300 NX:如果 key 不存在则设置(原子操作)
        result = self.redis.set(dedup_key, "1", ex=300, nx=True)
        return result is None  # None 表示 key 已存在(重复消息)

@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook_deduped():
    data       = request.json
    event      = data.get("event", {})
    message_id = event.get("message", {}).get("message_id", "")

    # 去重检查
    if deduplicator.is_duplicate(message_id, "feishu"):
        return jsonify({"code": 0})  # 静默忽略重复消息

    # 正常处理消息...
    return jsonify({"code": 0})

陷阱二:飞书卡片的更新时机

在飞书实现流式输出时,不应该每收到一个字符就更新卡片(会触发 API 频率限制),而应该批量更新:

import asyncio

class FeishuStreamingUpdater:
    """飞书流式卡片更新器(批量更新)"""

    UPDATE_INTERVAL = 0.5  # 每 0.5 秒更新一次

    def __init__(self, feishu_client: FeishuClient, message_id: str, question: str):
        self.client     = feishu_client
        self.message_id = message_id
        self.question   = question
        self._buffer    = ""
        self._done      = False

    async def update_loop(self):
        """后台更新任务"""
        while not self._done or self._buffer:
            await asyncio.sleep(self.UPDATE_INTERVAL)
            if self._buffer:
                await self._push_update()

    async def _push_update(self):
        """推送当前内容到飞书"""
        card = self.client.card_builder.build_ai_response_card(
            question=self.question,
            answer=self._buffer + "⏳",  # 流式状态加省略号
        )
        try:
            await self.client.update_card(self.message_id, card)
        except Exception as e:
            if "rate limit" not in str(e).lower():
                raise

    def append(self, chunk: str):
        """追加新内容(由 Dify 流式回调调用)"""
        self._buffer += chunk

    async def finish(self):
        """完成流式输出,发送最终卡片"""
        self._done = True
        card = self.client.card_builder.build_ai_response_card(
            question=self.question,
            answer=self._buffer
        )
        await self.client.update_card(self.message_id, card)

async def handle_feishu_message_with_streaming(
    question: str, open_id: str,
    feishu_client: FeishuClient, dify_client
):
    """飞书消息处理:先发"思考中"卡片,流式更新,最后显示完整答案"""
    # 发送"加载中"卡片,获取消息 ID
    loading_card = feishu_client.card_builder.build_loading_card(question)
    msg_resp     = await feishu_client.send_card(open_id, loading_card)
    message_id   = msg_resp["data"]["message_id"]

    # 创建流式更新器
    updater = FeishuStreamingUpdater(feishu_client, message_id, question)

    # 并发:更新循环 + 流式接收
    update_task = asyncio.create_task(updater.update_loop())

    async for event in dify_client.send_message_stream_async(question, open_id):
        if event.get("event") == "message":
            updater.append(event.get("answer", ""))
        elif event.get("event") == "message_end":
            break

    await updater.finish()
    update_task.cancel()

陷阱三:敏感数据在平台日志中留存

三大平台都会在其服务器上记录消息内容。如果用户在 AI 对话中涉及敏感信息(医疗、法律、财务),这些数据会留存在第三方服务器:

# 敏感内容检测与脱敏
import re

class SensitiveDataFilter:
    """敏感数据过滤器(在推送到消息平台前脱敏)"""

    PATTERNS = {
        "id_card":    (r'\b\d{17}[\dXx]\b', "身份证号"),
        "phone":      (r'\b1[3-9]\d{9}\b', "手机号"),
        "bank_card":  (r'\b\d{16,19}\b', "银行卡号"),
        "password":   (r'密码[::]\s*\S+', "密码"),
    }

    def detect(self, text: str) -> list[str]:
        """检测敏感数据类型"""
        found = []
        for key, (pattern, name) in self.PATTERNS.items():
            if re.search(pattern, text):
                found.append(name)
        return found

    def mask(self, text: str) -> str:
        """脱敏(用于推送到消息平台)"""
        result = text
        # 手机号:保留前3后4位
        result = re.sub(r'\b(1[3-9]\d)\d{4}(\d{4})\b', r'\1****\2', result)
        # 身份证号:只显示前6后4位
        result = re.sub(r'\b(\d{6})\d{8}([\dXx]{4})\b', r'\1********\2', result)
        # 银行卡号:只显示后4位
        result = re.sub(r'\b\d{12,15}(\d{4})\b', r'****\1', result)
        return result

# 在消息路由层使用
async def route_with_privacy(message: UnifiedMessage, router: UnifiedMessageRouter):
    sens_filter = SensitiveDataFilter()
    detected    = sens_filter.detect(message.content)

    if detected:
        # 记录用户访问了敏感功能(不记录内容)
        audit_log.info(f"用户 {message.platform_user_id} 涉及敏感信息: {detected}")

    response = await router.route(message)

    # 发送前脱敏
    response.content = sens_filter.mask(response.content)
    return response

三大平台的功能对比与选型建议

消息丰富度:飞书(最高)> 钉钉(高)> 企业微信(中)

双向通信:
- 飞书:通过 Bot Event,天然支持用户主动发消息
- 企业微信:企业应用支持,群机器人仅单向
- 钉钉:企业应用支持,群机器人仅单向

API 友好度:
- 飞书:文档最完整,SDK 最丰富,Lark 国际版同一套 API
- 企业微信:文档较好,对接微信生态方便
- 钉钉:文档一般,签名机制复杂

推荐选型:
- 如果团队已在使用飞书 → 优先飞书集成(最强的消息卡片能力)
- 如果需要国际化 → 飞书(国内飞书 = 国际 Lark,一套代码两用)
- 如果需要微信生态打通 → 企业微信
- 如果行业习惯钉钉 → 钉钉
- 多平台覆盖 → 使用本章的统一消息路由层

本章小结

本章系统介绍了飞书、企业微信和钉钉的深度集成方案:

飞书集成核心:

  1. 消息卡片(Interactive Card)是最强的消息类型,支持按钮交互、动态更新
  2. 流式输出通过"先发加载卡片,再批量更新"实现(间隔 0.5s 更新,避免频率限制)
  3. 签名验证是安全必须项(timestamp + nonce + verify_token 的 SHA256)

企业微信集成核心:

  1. 群机器人仅支持单向发消息;双向对话需要企业应用
  2. 消息签名:SHA1(token, timestamp, nonce 排序后拼接)
  3. AccessToken 需要缓存(有效期约 7200s),避免频繁刷新

钉钉集成核心:

  1. 群机器人签名:HMAC-SHA256(timestamp + "\n" + secret),结果 base64 编码
  2. 工作通知(企业应用)支持 action_card 类型,可以有按钮
  3. 开放 API 接口需要配置 IP 白名单

统一路由层设计原则:

关键数字:

本章评分
4.7  / 5  (11 评分)

💬 留言讨论