Chapter 18

Deep Integration with Feishu, WeCom and DingTalk

Chapter 18: Deep Integration with Feishu, WeChat Work, and DingTalk โ€” Message Routing and Unified Management

Connect your Dify Agent to China's three dominant enterprise collaboration platforms โ€” Feishu, WeChat Work, and DingTalk โ€” with unified message routing, user permission mapping, and cross-platform session management, so your AI assistant fits seamlessly into daily enterprise workflows.

Chapter Overview

Deploying an AI assistant inside a Chinese enterprise involves three unavoidable questions: where employees work (Feishu / WeChat Work / DingTalk), how AI answers reach them (push notifications / chat bots), and how to control who can use which features (permissions).

This chapter delivers production-grade integration โ€” not just "Hello World bots":

After reading this chapter, you will be able to:


Level 1: Foundational Knowledge (1โ€“3 Years Experience)

Platform Comparison

Feature Feishu (Lark) WeChat Work DingTalk
Bot integration Custom app / ISV app Group robot / Enterprise app Group robot / Enterprise app
Message richness Very high (cards, interactions) Medium (Markdown/image) High (cards/forms)
API documentation Excellent Good Good
International support Strong (Lark = Feishu international) Weak Weak
Webhook push Supported Supported (enterprise) Supported

Feishu Bot Quick Start

Step 1: Create an app on Feishu Open Platform

  1. Visit https://open.feishu.cn/app
  2. Click "Create Internal Enterprise App"
  3. Fill in the app name (e.g., "AI Smart Assistant")
  4. Note the App ID and App Secret

Step 2: Enable Bot capability and subscribe to events

Step 3: Minimal Feishu bot

from flask import Flask, request, jsonify
import httpx, json, os

app = Flask(__name__)

FEISHU_APP_ID     = "cli_your_app_id"
FEISHU_APP_SECRET = "your_app_secret"
DIFY_API_KEY      = "app-your_dify_key"

def get_feishu_token() -> str:
    resp = httpx.post(
        "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
        json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}
    )
    return resp.json()["tenant_access_token"]

def send_feishu_message(open_id: str, text: str):
    token = get_feishu_token()
    httpx.post(
        "https://open.feishu.cn/open-apis/im/v1/messages",
        headers={"Authorization": f"Bearer {token}"},
        params={"receive_id_type": "open_id"},
        json={"receive_id": open_id, "msg_type": "text",
              "content": json.dumps({"text": text})}
    )

def call_dify(query: str, user_id: str) -> str:
    resp = httpx.post(
        "https://api.dify.ai/v1/chat-messages",
        headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
        json={"query": query, "user": user_id,
              "response_mode": "blocking", "inputs": {}},
        timeout=60.0
    )
    return resp.json().get("answer", "Sorry, unable to get an answer.")

@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook():
    data = request.json

    # URL verification (first-time setup)
    if data.get("type") == "url_verification":
        return jsonify({"challenge": data.get("challenge")})

    event = data.get("event", {})
    if event.get("type") == "message":
        msg  = event.get("message", {})
        if msg.get("message_type") == "text":
            content = json.loads(msg["content"])
            query   = content.get("text", "").strip()
            open_id = event.get("sender", {}).get("sender_id", {}).get("open_id")
            if query and open_id:
                answer = call_dify(query, open_id)
                send_feishu_message(open_id, answer)

    return jsonify({"code": 0})

WeChat Work Group Robot (Simplest Integration)

import httpx

WXWORK_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"

def send_wxwork_text(text: str):
    httpx.post(WXWORK_WEBHOOK, json={"msgtype": "markdown", "markdown": {"content": text}})

def send_wxwork_card(title: str, description: str, url: str = None):
    article = {"title": title, "description": description}
    if url:
        article["url"] = url
    httpx.post(WXWORK_WEBHOOK, json={"msgtype": "news", "news": {"articles": [article]}})

DingTalk Group Robot (Simplest Integration)

import httpx, hmac, hashlib, base64, time, urllib.parse

DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=your_token"
DINGTALK_SECRET  = "your_secret"

def get_dingtalk_sign() -> tuple[str, str]:
    timestamp = str(round(time.time() * 1000))
    sign_str  = f"{timestamp}\n{DINGTALK_SECRET}"
    sign      = base64.b64encode(
        hmac.new(DINGTALK_SECRET.encode(), sign_str.encode(), hashlib.sha256).digest()
    ).decode()
    return timestamp, urllib.parse.quote_plus(sign)

def send_dingtalk_message(text: str):
    timestamp, sign = get_dingtalk_sign()
    url = f"{DINGTALK_WEBHOOK}&timestamp={timestamp}&sign={sign}"
    httpx.post(url, json={"msgtype": "markdown",
                          "markdown": {"title": "AI Assistant", "text": text}})

Level 2: Mechanism Deep Dive (3โ€“5 Years Experience)

Feishu: Interactive Message Cards

class FeishuCardBuilder:

    def build_ai_response_card(
        self, question: str, answer: str, sources: list[str] = None
    ) -> dict:
        elements = [
            {"tag": "div", "text": {"tag": "lark_md", "content": f"**Q:** {question}"}},
            {"tag": "hr"},
            {"tag": "div", "text": {"tag": "lark_md", "content": f"**A:**\n{answer}"}},
        ]
        if sources:
            src_text = "\n".join(f"- {s}" for s in sources[:3])
            elements.append({"tag": "note",
                             "elements": [{"tag": "plain_text",
                                           "content": f"Sources:\n{src_text}"}]})
        elements += [
            {"tag": "hr"},
            {"tag": "action", "actions": [
                {"tag": "button", "text": {"tag": "plain_text", "content": "๐Ÿ‘ Helpful"},
                 "type": "primary", "value": {"action": "like"}},
                {"tag": "button", "text": {"tag": "plain_text", "content": "๐Ÿ‘Ž Not helpful"},
                 "type": "danger",  "value": {"action": "dislike"}},
                {"tag": "button", "text": {"tag": "plain_text", "content": "๐Ÿ”„ Regenerate"},
                 "type": "default", "value": {"action": "regenerate", "question": question}},
            ]}
        ]
        return {
            "config":   {"wide_screen_mode": True},
            "header":   {"title": {"tag": "plain_text", "content": "AI Assistant"},
                         "template": "blue"},
            "elements": elements
        }

    def build_loading_card(self, question: str) -> dict:
        return {
            "config":   {"wide_screen_mode": True},
            "header":   {"title": {"tag": "plain_text", "content": "AI is thinking..."},
                         "template": "grey"},
            "elements": [{"tag": "div", "text": {"tag": "lark_md",
                          "content": f"**Q:** {question}\n\nโณ Generating answer..."}}]
        }

class FeishuClient:
    BASE_URL = "https://open.feishu.cn/open-apis"

    def __init__(self, app_id: str, app_secret: str):
        self.app_id      = app_id
        self.app_secret  = app_secret
        self._token      = None
        self._token_exp  = 0
        self.client      = httpx.AsyncClient(timeout=30.0)
        self.card_builder = FeishuCardBuilder()

    async def get_token(self) -> str:
        import time
        if self._token and time.time() < self._token_exp - 60:
            return self._token
        resp = await self.client.post(
            f"{self.BASE_URL}/auth/v3/tenant_access_token/internal",
            json={"app_id": self.app_id, "app_secret": self.app_secret}
        )
        data = resp.json()
        self._token    = data["tenant_access_token"]
        self._token_exp = time.time() + data.get("expire", 7200)
        return self._token

    async def send_card(self, receive_id: str, card: dict,
                        id_type: str = "open_id") -> dict:
        token = await self.get_token()
        resp  = await self.client.post(
            f"{self.BASE_URL}/im/v1/messages",
            headers={"Authorization": f"Bearer {token}"},
            params={"receive_id_type": id_type},
            json={"receive_id": receive_id, "msg_type": "interactive",
                  "content": json.dumps(card)}
        )
        resp.raise_for_status()
        return resp.json()

    async def update_card(self, message_id: str, card: dict) -> dict:
        token = await self.get_token()
        resp  = await self.client.patch(
            f"{self.BASE_URL}/im/v1/messages/{message_id}",
            headers={"Authorization": f"Bearer {token}"},
            json={"msg_type": "interactive", "content": json.dumps(card)}
        )
        resp.raise_for_status()
        return resp.json()

Feishu signature verification (security-critical):

import hashlib, hmac

def verify_feishu_signature(timestamp: str, nonce: str, body: str, verify_token: str) -> bool:
    sign_str  = timestamp + nonce + verify_token + body
    signature = hashlib.sha256(sign_str.encode()).hexdigest()
    received  = request.headers.get("X-Lark-Signature", "")
    return hmac.compare_digest(signature, received)

@app.route("/webhook/feishu", methods=["POST"])
def feishu_webhook_secure():
    ts, nonce  = request.headers.get("X-Lark-Request-Timestamp", ""), \
                 request.headers.get("X-Lark-Request-Nonce", "")
    body       = request.get_data(as_text=True)

    if not verify_feishu_signature(ts, nonce, body, os.getenv("FEISHU_VERIFY_TOKEN")):
        return jsonify({"code": 403, "msg": "signature mismatch"}), 403

    import time
    if abs(time.time() - int(ts)) > 300:
        return jsonify({"code": 403, "msg": "timestamp expired"}), 403

    return jsonify({"code": 0})

WeChat Work Enterprise Application

import httpx, hashlib, xml.etree.ElementTree as ET

class WxWorkClient:

    def __init__(self, corp_id: str, corp_secret: str, agent_id: int):
        self.corp_id    = corp_id
        self.corp_secret = corp_secret
        self.agent_id   = agent_id
        self.client     = httpx.AsyncClient(timeout=30.0)
        self._token     = None
        self._token_exp = 0

    async def get_access_token(self) -> str:
        import time
        if self._token and time.time() < self._token_exp - 60:
            return self._token
        resp = await self.client.get(
            "https://qyapi.weixin.qq.com/cgi-bin/gettoken",
            params={"corpid": self.corp_id, "corpsecret": self.corp_secret}
        )
        data = resp.json()
        self._token    = data["access_token"]
        self._token_exp = time.time() + data.get("expires_in", 7200)
        return self._token

    async def send_to_user(self, user_id: str, content: str, msg_type: str = "text") -> dict:
        token   = await self.get_access_token()
        payload = {"touser": user_id, "msgtype": msg_type, "agentid": self.agent_id}
        if msg_type == "text":
            payload["text"]     = {"content": content}
        elif msg_type == "markdown":
            payload["markdown"] = {"content": content}
        resp = await self.client.post(
            "https://qyapi.weixin.qq.com/cgi-bin/message/send",
            params={"access_token": token}, json=payload
        )
        resp.raise_for_status()
        return resp.json()

def verify_wxwork_signature(token: str, timestamp: str, nonce: str) -> str:
    return hashlib.sha1("".join(sorted([token, timestamp, nonce])).encode()).hexdigest()

Unified Message Routing Layer

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional

class Platform(str, Enum):
    FEISHU   = "feishu"
    WXWORK   = "wxwork"
    DINGTALK = "dingtalk"
    WEB      = "web"

@dataclass
class UnifiedMessage:
    platform:         Platform
    platform_user_id: str
    content:          str
    message_id:       str
    session_id:       Optional[str] = None
    metadata:         dict          = field(default_factory=dict)

@dataclass
class UnifiedResponse:
    content:      str
    content_type: str  = "text"   # "text" | "markdown" | "card"
    attachments:  list = field(default_factory=list)
    metadata:     dict = field(default_factory=dict)

class UnifiedMessageRouter:

    def __init__(self, feishu, wxwork, dingtalk, dify, user_mapper):
        self.clients     = {Platform.FEISHU: feishu,
                            Platform.WXWORK: wxwork,
                            Platform.DINGTALK: dingtalk}
        self.dify        = dify
        self.user_mapper = user_mapper

    async def route(self, message: UnifiedMessage) -> UnifiedResponse:
        unified_id = await self.user_mapper.get_unified_id(
            message.platform, message.platform_user_id
        )
        if not await self._check_permission(unified_id, message.content):
            return UnifiedResponse(
                content="Sorry, you don't have permission to use this feature."
            )

        conv_id = await self._get_conversation_id(unified_id)
        try:
            resp    = await self.dify.send_message_async(
                query=message.content, user=unified_id, conversation_id=conv_id
            )
            answer  = resp["answer"]
            new_cid = resp.get("conversation_id", conv_id)
            await self._save_conversation_id(unified_id, new_cid)
        except Exception as e:
            answer = f"AI service temporarily unavailable: {e}\nPlease try again later."

        return UnifiedResponse(
            content=answer,
            content_type=self._best_format(answer, message.platform)
        )

    async def send_response(self, platform: Platform, platform_user_id: str,
                            response: UnifiedResponse):
        client = self.clients[platform]
        if platform == Platform.FEISHU:
            card = client.card_builder.build_ai_response_card("", response.content)
            await client.send_card(platform_user_id, card)
        elif platform == Platform.WXWORK:
            await client.send_to_user(platform_user_id, response.content, "markdown")
        elif platform == Platform.DINGTALK:
            await client.send_work_notification(
                platform_user_id, "AI Assistant", response.content
            )

    def _best_format(self, content: str, platform: Platform) -> str:
        has_md = any(m in content for m in ["**", "##", "- ", "```"])
        if has_md:
            return "card" if platform == Platform.FEISHU else "markdown"
        return "text"

Level 3: Source Code and Principles (5+ Years Experience)

User Identity Mapping

from pydantic import BaseModel
from typing import Optional

class UserMapping(BaseModel):
    internal_user_id: str
    feishu_open_id:   Optional[str] = None
    wxwork_userid:    Optional[str] = None
    dingtalk_userid:  Optional[str] = None
    email:            Optional[str] = None

class UserIdentityMapper:

    def __init__(self, redis, db):
        self.redis = redis
        self.db    = db

    async def get_unified_id(self, platform: Platform, platform_user_id: str) -> str:
        cache_key = f"user_mapping:{platform}:{platform_user_id}"
        cached    = await self.redis.get(cache_key)
        if cached:
            return cached.decode()

        mapping = await self.db.find_mapping(platform, platform_user_id)
        if mapping:
            await self.redis.setex(cache_key, 3600, mapping.internal_user_id)
            return mapping.internal_user_id

        new_id = await self._create_user(platform, platform_user_id)
        await self.redis.setex(cache_key, 3600, new_id)
        return new_id

    async def link_accounts(self, internal_id: str, platform: Platform, platform_id: str):
        await self.db.update_mapping(internal_id, platform, platform_id)
        await self.redis.setex(f"user_mapping:{platform}:{platform_id}", 3600, internal_id)

    async def _create_user(self, platform: Platform, platform_user_id: str) -> str:
        import uuid
        new_id = f"user_{uuid.uuid4().hex[:8]}"
        await self.db.save_mapping(UserMapping(
            internal_user_id=new_id,
            **{f"{platform}_userid" if platform != Platform.FEISHU
               else "feishu_open_id": platform_user_id}
        ))
        return new_id

Cross-Platform Session Synchronization

import time

class CrossPlatformSessionManager:

    def __init__(self, redis, dify):
        self.redis = redis
        self.dify  = dify

    async def get_session(self, unified_user_id: str, mode: str = "continue") -> str:
        key = f"session:{unified_user_id}"
        if mode == "new":
            await self.redis.delete(key)
            return ""
        data = await self.redis.hgetall(key)
        if data:
            return data.get(b"conv_id", b"").decode()
        return ""

    async def save_session(self, unified_id: str, conv_id: str, platform: Platform):
        key = f"session:{unified_id}"
        await self.redis.hset(key, mapping={
            "conv_id":       conv_id,
            "last_platform": platform,
            "last_active":   str(time.time()),
        })
        await self.redis.expire(key, 86400 * 7)  # 7 days

Level 4: Production Pitfalls and Decision-Making (Expert Perspective)

Pitfall 1: Duplicate Message Processing

All three platforms may deliver the same webhook event more than once (due to their own retries). Without deduplication, your bot replies twice:

class MessageDeduplicator:

    def __init__(self, redis):
        self.redis = redis

    def is_duplicate(self, message_id: str, platform: str) -> bool:
        key    = f"msg_dedup:{platform}:{message_id}"
        result = self.redis.set(key, "1", ex=300, nx=True)  # Atomic SET NX
        return result is None  # None = key already existed = duplicate

@app.route("/webhook/feishu", methods=["POST"])
def feishu_deduped():
    data       = request.json
    message_id = data.get("event", {}).get("message", {}).get("message_id", "")
    if deduplicator.is_duplicate(message_id, "feishu"):
        return jsonify({"code": 0})   # Silently ignore duplicate
    # ... normal processing
    return jsonify({"code": 0})

Pitfall 2: Feishu Card Update Rate Limits

Updating the card for every streamed character triggers rate limits. Batch updates instead:

import asyncio

class FeishuStreamingUpdater:
    UPDATE_INTERVAL = 0.5   # Update every 500ms

    def __init__(self, feishu: FeishuClient, message_id: str, question: str):
        self.feishu     = feishu
        self.message_id = message_id
        self.question   = question
        self._buffer    = ""
        self._done      = False

    async def update_loop(self):
        while not self._done or self._buffer:
            await asyncio.sleep(self.UPDATE_INTERVAL)
            if self._buffer:
                await self._push()

    async def _push(self):
        card = self.feishu.card_builder.build_ai_response_card(
            self.question, self._buffer + "โณ"
        )
        try:
            await self.feishu.update_card(self.message_id, card)
        except Exception as e:
            if "rate limit" not in str(e).lower():
                raise

    def append(self, chunk: str):
        self._buffer += chunk

    async def finish(self):
        self._done = True
        card = self.feishu.card_builder.build_ai_response_card(
            self.question, self._buffer
        )
        await self.feishu.update_card(self.message_id, card)

async def handle_feishu_stream(
    question: str, open_id: str, feishu: FeishuClient, dify
):
    loading_card = feishu.card_builder.build_loading_card(question)
    msg_resp     = await feishu.send_card(open_id, loading_card)
    message_id   = msg_resp["data"]["message_id"]

    updater     = FeishuStreamingUpdater(feishu, message_id, question)
    update_task = asyncio.create_task(updater.update_loop())

    async for event in dify.send_message_stream_async(question, open_id):
        if event.get("event") == "message":
            updater.append(event.get("answer", ""))
        elif event.get("event") == "message_end":
            break

    await updater.finish()
    update_task.cancel()

Pitfall 3: Sensitive Data in Platform Logs

All three platforms log message content on their servers. Before pushing AI responses, mask PII:

import re

class SensitiveDataFilter:

    PATTERNS = {
        "phone":     r'\b(1[3-9]\d)\d{4}(\d{4})\b',
        "id_card":   r'\b(\d{6})\d{8}([\dXx]{4})\b',
        "bank_card": r'\b\d{12,15}(\d{4})\b',
    }

    def mask(self, text: str) -> str:
        result = re.sub(self.PATTERNS["phone"],     r'\1****\2', text)
        result = re.sub(self.PATTERNS["id_card"],   r'\1********\2', result)
        result = re.sub(self.PATTERNS["bank_card"], r'****\1', result)
        return result

    def detect(self, text: str) -> list[str]:
        return [k for k, p in self.PATTERNS.items() if re.search(p, text)]

Platform Selection Guide

Message richness:       Feishu (highest) > DingTalk (high) > WeChat Work (medium)

Bidirectional chat:
- Feishu:        Supported natively via Bot Events
- WeChat Work:   Enterprise App supports bidirectional; Group Robot = one-way only
- DingTalk:      Enterprise App supports bidirectional; Group Robot = one-way only

API quality:
- Feishu:        Best documentation, richest SDK, Feishu/Lark are the same API
- WeChat Work:   Good docs, easy WeChat ecosystem integration
- DingTalk:      Average docs, complex signing mechanism

Recommendations:
- Team already uses Feishu โ†’ Feishu first (best card capabilities)
- Needs internationalization โ†’ Feishu (domestic Feishu = international Lark, one codebase)
- Needs WeChat ecosystem connection โ†’ WeChat Work
- Industry uses DingTalk โ†’ DingTalk
- Multi-platform coverage โ†’ use the unified routing layer from this chapter

Chapter Summary

This chapter delivered production-grade integration guides for Feishu, WeChat Work, and DingTalk:

Feishu integration essentials:

  1. Interactive Message Cards are the most powerful message type โ€” support button actions and dynamic updates
  2. Streaming output uses "send loading card first, then batch-update every 500ms" (respect rate limits)
  3. Signature verification is security-critical: SHA256(timestamp + nonce + verify_token + body)

WeChat Work integration essentials:

  1. Group Robots support one-way messaging only; bidirectional chat requires an Enterprise Application
  2. Message signing: SHA1 of sorted(token, timestamp, nonce) concatenated
  3. AccessToken must be cached (~7200s valid); excessive refreshes cause errors

DingTalk integration essentials:

  1. Group robot signature: HMAC-SHA256(timestamp + "\n" + secret), base64-encoded
  2. Work notifications (enterprise app) support action_card type with buttons
  3. Open API requires configuring IP whitelist

Unified routing layer design principles:

Key numbers:

Rate this chapter
4.7  / 5  (11 ratings)

๐Ÿ’ฌ Comments