第 11 章

通知推送——Slack、钉钉、微信与桌面通知

第11章:消息通知自动化——飞书、企业微信、Telegram 机器人

邮件适合正式沟通,但告警通知要快、要直达。飞书、企业微信、Telegram 的 Webhook 机器人,是程序状态推送的最佳载体——代码出错立刻推送,每日数据自动播报,全程无需人工盯屏。本章覆盖国内外主流IM平台的机器人接入,最后封装统一接口,让你一行代码切换通知渠道。

飞书机器人

创建 Webhook 机器人

发送文本消息

import httpx

FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/your-token"

def feishu_send_text(text: str, webhook: str = FEISHU_WEBHOOK) -> bool:
    """发送纯文本消息到飞书群"""
    payload = {
        "msg_type": "text",
        "content": {"text": text},
    }
    resp = httpx.post(webhook, json=payload, timeout=10)
    result = resp.json()
    if result.get("StatusCode") == 0:
        print("飞书消息发送成功")
        return True
    print(f"飞书发送失败: {result}")
    return False

feishu_send_text("服务器 CPU 使用率超过 90%,请立即检查!")

发送富文本卡片消息

def feishu_send_card(title: str, content: str, color: str = "red",
                     webhook: str = FEISHU_WEBHOOK) -> bool:
    """
    发送交互卡片消息。
    color: red(告警)/ green(正常)/ orange(警告)/ blue(通知)
    """
    payload = {
        "msg_type": "interactive",
        "card": {
            "config": {"wide_screen_mode": True},
            "header": {
                "title": {"tag": "plain_text", "content": title},
                "template": color,
            },
            "elements": [
                {
                    "tag": "div",
                    "text": {"tag": "lark_md", "content": content},
                }
            ],
        },
    }
    resp = httpx.post(webhook, json=payload, timeout=10)
    return resp.json().get("StatusCode") == 0

完整案例:服务器监控告警推送飞书

import psutil
from datetime import datetime

def check_and_alert_feishu(cpu_threshold: float = 90.0, mem_threshold: float = 85.0):
    """检查系统资源,超阈值时推送飞书告警"""
    cpu = psutil.cpu_percent(interval=1)
    mem = psutil.virtual_memory().percent
    disk = psutil.disk_usage("/").percent
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    alerts = []
    if cpu > cpu_threshold:
        alerts.append(f"**CPU使用率**: {cpu:.1f}% > {cpu_threshold}%")
    if mem > mem_threshold:
        alerts.append(f"**内存使用率**: {mem:.1f}% > {mem_threshold}%")

    if alerts:
        content = f"**时间**: {now}\n" + "\n".join(alerts)
        content += f"\n\n磁盘使用: {disk:.1f}%"
        feishu_send_card(
            title="服务器资源告警",
            content=content,
            color="red",
        )
    else:
        print(f"[{now}] 系统正常: CPU {cpu:.1f}%, 内存 {mem:.1f}%")

if __name__ == "__main__":
    check_and_alert_feishu()

企业微信机器人

配置 Webhook

企业微信群

发送文本与 Markdown 消息

import httpx

WECOM_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your-key"

def wecom_send_text(content: str, mentioned_list: list[str] = None) -> bool:
    """
    发送文本消息,可以 @ 指定成员。
    mentioned_list: 企业微信账号列表,["user1", "@all"] 表示 @所有人
    """
    payload = {
        "msgtype": "text",
        "text": {
            "content": content,
            "mentioned_list": mentioned_list or [],
        },
    }
    resp = httpx.post(WECOM_WEBHOOK, json=payload, timeout=10)
    result = resp.json()
    return result.get("errcode") == 0

def wecom_send_markdown(content: str) -> bool:
    """发送 Markdown 格式消息(支持加粗、颜色等)"""
    payload = {
        "msgtype": "markdown",
        "markdown": {"content": content},
    }
    resp = httpx.post(WECOM_WEBHOOK, json=payload, timeout=10)
    return resp.json().get("errcode") == 0

完整案例:每日数据播报

from datetime import datetime, timedelta

def daily_broadcast(stats: dict):
    """每日数据播报,使用 Markdown 格式"""
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    change_icon = "up" if stats["revenue_change"] >= 0 else "down"
    color = "info" if stats["revenue_change"] >= 0 else "warning"

    content = f"""## 每日数据播报 {yesterday}

> **营收**: ¥{stats['revenue']:,.0f}
> **订单数**: {stats['orders']}
> **新增用户**: {stats['new_users']}
> **环比变化**: <font color="{color}">{stats['revenue_change']:+.1f}%</font>

数据来源:销售系统 | 更新时间:{datetime.now():%H:%M}"""

    success = wecom_send_markdown(content)
    if success:
        print("企业微信播报发送成功")

daily_broadcast({
    "revenue": 128000, "orders": 342,
    "new_users": 56, "revenue_change": 12.5
})

Telegram Bot

创建 Bot 并获取 Token

pip install python-telegram-bot

发送消息(简单方式)

import httpx
import os

def telegram_send(text: str, chat_id: str, token: str = None) -> bool:
    """直接调用 Telegram Bot API 发送消息,无需额外依赖"""
    token = token or os.getenv("TELEGRAM_BOT_TOKEN")
    url = f"https://api.telegram.org/bot{token}/sendMessage"
    resp = httpx.post(url, json={
        "chat_id": chat_id,
        "text": text,
        "parse_mode": "Markdown",  # 支持 *加粗* _斜体_ `代码`
    }, timeout=10)
    return resp.json().get("ok", False)

telegram_send(
    "*服务器告警*\nCPU: 95%\n时间: 2024-01-15 14:30",
    chat_id=os.getenv("TELEGRAM_CHAT_ID"),
)

完整案例:个人数据查询 Bot(处理命令)

from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
import sqlite3, os

async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
    await update.message.reply_text(
        "我是数据查询 Bot。\n"
        "/stats - 查询今日数据\n"
        "/price [商品名] - 查询商品价格"
    )

async def cmd_stats(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
    """查询今日统计数据"""
    conn = sqlite3.connect("sales.db")
    row = conn.execute(
        "SELECT COUNT(*), COALESCE(SUM(amount),0) FROM orders WHERE date(created_at)=date('now')"
    ).fetchone()
    conn.close()
    msg = f"*今日数据*\n订单数:{row[0]}\n营收:¥{row[1]:,.0f}"
    await update.message.reply_text(msg, parse_mode="Markdown")

async def cmd_price(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
    """查询指定商品最新价格"""
    if not ctx.args:
        await update.message.reply_text("用法:/price 商品名称")
        return
    name = " ".join(ctx.args)
    conn = sqlite3.connect("prices.db")
    row = conn.execute(
        "SELECT price, scraped_at FROM price_history WHERE name LIKE ? ORDER BY scraped_at DESC LIMIT 1",
        (f"%{name}%",)
    ).fetchone()
    conn.close()
    if row:
        await update.message.reply_text(f"{name}:¥{row[0]:.2f}({row[1][:10]})")
    else:
        await update.message.reply_text(f"未找到 {name} 的价格记录")

def run_bot():
    token = os.getenv("TELEGRAM_BOT_TOKEN")
    app = Application.builder().token(token).build()
    app.add_handler(CommandHandler("start", cmd_start))
    app.add_handler(CommandHandler("stats", cmd_stats))
    app.add_handler(CommandHandler("price", cmd_price))
    print("Bot 运行中...")
    app.run_polling()

if __name__ == "__main__":
    run_bot()

钉钉机器人

安全设置:加签验证

钉钉机器人有三种安全设置:IP 白名单、自定义关键词、加签。加签最灵活,推荐使用。

import hashlib, hmac, base64, time, urllib.parse
import httpx, os

def dingtalk_send(title: str, content: str,
                  webhook: str = None, secret: str = None) -> bool:
    """
    发送钉钉 Markdown 消息(带加签验证)。
    webhook: 钉钉机器人 Webhook URL
    secret: 钉钉机器人加签密钥(以 SEC 开头)
    """
    webhook = webhook or os.getenv("DINGTALK_WEBHOOK")
    secret = secret or os.getenv("DINGTALK_SECRET")

    # 生成签名
    timestamp = str(round(time.time() * 1000))
    sign_str = f"{timestamp}\n{secret}"
    hmac_code = hmac.new(
        secret.encode("utf-8"), sign_str.encode("utf-8"), digestmod=hashlib.sha256
    ).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))

    url = f"{webhook}&timestamp={timestamp}&sign={sign}"

    payload = {
        "msgtype": "markdown",
        "markdown": {"title": title, "text": f"## {title}\n\n{content}"},
    }
    resp = httpx.post(url, json=payload, timeout=10)
    result = resp.json()
    return result.get("errcode") == 0

统一 Notifier 封装

当项目需要同时支持多个通知渠道,或者将来可能切换渠道时,统一接口是最优解——调用方只管调 notify(),不需要关心背后用了哪个平台。

import os
import logging
from abc import ABC, abstractmethod
from enum import Enum
import httpx

logger = logging.getLogger(__name__)

# ---- 抽象基类 ----
class BaseNotifier(ABC):
    @abstractmethod
    def send(self, title: str, content: str, level: str = "info") -> bool:
        """
        发送通知。
        level: info / warn / error(影响卡片颜色等视觉样式)
        """
        ...

# ---- 飞书实现 ----
class FeishuNotifier(BaseNotifier):
    COLOR_MAP = {"info": "blue", "warn": "orange", "error": "red"}

    def __init__(self, webhook: str):
        self.webhook = webhook

    def send(self, title: str, content: str, level: str = "info") -> bool:
        payload = {
            "msg_type": "interactive",
            "card": {
                "config": {"wide_screen_mode": True},
                "header": {
                    "title": {"tag": "plain_text", "content": title},
                    "template": self.COLOR_MAP.get(level, "blue"),
                },
                "elements": [{"tag": "div", "text": {"tag": "lark_md", "content": content}}],
            },
        }
        try:
            resp = httpx.post(self.webhook, json=payload, timeout=10)
            return resp.json().get("StatusCode") == 0
        except Exception as e:
            logger.error(f"飞书通知失败: {e}")
            return False

# ---- 企业微信实现 ----
class WeComNotifier(BaseNotifier):
    def __init__(self, webhook: str):
        self.webhook = webhook

    def send(self, title: str, content: str, level: str = "info") -> bool:
        color_map = {"info": "comment", "warn": "warning", "error": "warning"}
        color = color_map.get(level, "comment")
        md = f"## {title}\n\n> <font color=\"{color}\">{content}</font>"
        try:
            resp = httpx.post(self.webhook, json={"msgtype": "markdown", "markdown": {"content": md}}, timeout=10)
            return resp.json().get("errcode") == 0
        except Exception as e:
            logger.error(f"企业微信通知失败: {e}")
            return False

# ---- Telegram 实现 ----
class TelegramNotifier(BaseNotifier):
    def __init__(self, token: str, chat_id: str):
        self.token = token
        self.chat_id = chat_id

    def send(self, title: str, content: str, level: str = "info") -> bool:
        icon = {"info": "ℹ️", "warn": "⚠️", "error": "🚨"}.get(level, "ℹ️")
        text = f"{icon} *{title}*\n\n{content}"
        try:
            url = f"https://api.telegram.org/bot{self.token}/sendMessage"
            resp = httpx.post(url, json={"chat_id": self.chat_id, "text": text, "parse_mode": "Markdown"}, timeout=10)
            return resp.json().get("ok", False)
        except Exception as e:
            logger.error(f"Telegram通知失败: {e}")
            return False

# ---- 统一管理器 ----
class Notifier:
    """统一通知管理器:支持多渠道并行发送"""

    def __init__(self):
        self._channels: list[BaseNotifier] = []

    def add_channel(self, notifier: BaseNotifier) -> "Notifier":
        self._channels.append(notifier)
        return self  # 支持链式调用

    def notify(self, title: str, content: str, level: str = "info") -> dict:
        """向所有注册渠道发送通知,返回每个渠道的发送结果"""
        results = {}
        for ch in self._channels:
            name = type(ch).__name__
            results[name] = ch.send(title, content, level)
            status = "成功" if results[name] else "失败"
            logger.info(f"[{name}] 通知{status}: {title}")
        return results

# ---- 从环境变量初始化(约50行可复用模块)----
def build_notifier_from_env() -> Notifier:
    """根据环境变量自动配置可用的通知渠道"""
    notifier = Notifier()
    if webhook := os.getenv("FEISHU_WEBHOOK"):
        notifier.add_channel(FeishuNotifier(webhook))
    if webhook := os.getenv("WECOM_WEBHOOK"):
        notifier.add_channel(WeComNotifier(webhook))
    if (token := os.getenv("TELEGRAM_BOT_TOKEN")) and (chat_id := os.getenv("TELEGRAM_CHAT_ID")):
        notifier.add_channel(TelegramNotifier(token, chat_id))
    return notifier

实战项目:全渠道告警系统

程序出错时自动发送飞书/企业微信通知,同时记录到日志文件,不需要人工盯屏。

import logging
import traceback
import functools
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.FileHandler("app.log", encoding="utf-8"),
        logging.StreamHandler(),
    ]
)
logger = logging.getLogger("app")

# 初始化通知器(从环境变量读配置)
notifier = build_notifier_from_env()

def alert_on_error(func):
    """装饰器:自动捕获函数异常并推送告警"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            tb = traceback.format_exc()
            title = f"程序异常:{func.__name__}"
            content = (
                f"**错误类型**: {type(e).__name__}\n"
                f"**错误信息**: {str(e)}\n"
                f"**时间**: {datetime.now():%Y-%m-%d %H:%M:%S}\n"
                f"**堆栈**(最后3行):\n```\n{chr(10).join(tb.strip().splitlines()[-3:])}\n```"
            )
            logger.error(f"{title}\n{tb}")
            notifier.notify(title, content, level="error")
            raise  # 重新抛出,不吞异常
    return wrapper

# ---- 使用示例 ----
@alert_on_error
def process_daily_data():
    """模拟数据处理任务,出错时自动告警"""
    logger.info("开始处理每日数据...")
    # ... 业务逻辑 ...
    # 模拟一个可能出错的操作
    data = fetch_from_database()  # 如果这里报错,会自动推送飞书告警
    logger.info(f"处理完成,共 {len(data)} 条记录")
    return data

# ---- 主动发送业务告警 ----
def check_business_metrics():
    """业务指标异常时主动推送"""
    # 假设从数据库获取到了指标
    conversion_rate = 0.8  # 1% 以下触发告警

    if conversion_rate < 1.0:
        notifier.notify(
            title="转化率异常告警",
            content=(
                f"**当前转化率**: {conversion_rate:.1f}%\n"
                f"**阈值**: 1.0%\n"
                f"**建议**: 检查落地页或广告投放质量"
            ),
            level="warn",
        )

if __name__ == "__main__":
    check_business_metrics()
    try:
        process_daily_data()
    except Exception:
        pass  # 已在装饰器中处理并告警

**最佳实践:**把 @alert_on_error 装饰器加在所有定时任务的入口函数上,配合第15章的调度工具,就能实现"任务失败

上一章

下一章
第12章:AI API 集成
本章评分
4.8  / 5  (26 评分)

💬 留言讨论