第 11 章
通知推送——Slack、钉钉、微信与桌面通知
第11章:消息通知自动化——飞书、企业微信、Telegram 机器人
邮件适合正式沟通,但告警通知要快、要直达。飞书、企业微信、Telegram 的 Webhook 机器人,是程序状态推送的最佳载体——代码出错立刻推送,每日数据自动播报,全程无需人工盯屏。本章覆盖国内外主流IM平台的机器人接入,最后封装统一接口,让你一行代码切换通知渠道。
飞书机器人
创建 Webhook 机器人
- 打开飞书群 - 设置机器人名称,选择安全设置(推荐"自定义关键词"或"加签")
- 复制生成的 Webhook URL(格式:
https://open.feishu.cn/open-apis/bot/v2/hook/xxx)
发送文本消息
import httpx
FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/your-token"
def feishu_send_text(text: str, webhook: str = FEISHU_WEBHOOK) -> bool:
"""发送纯文本消息到飞书群"""
payload = {
"msg_type": "text",
"content": {"text": text},
}
resp = httpx.post(webhook, json=payload, timeout=10)
result = resp.json()
if result.get("StatusCode") == 0:
print("飞书消息发送成功")
return True
print(f"飞书发送失败: {result}")
return False
feishu_send_text("服务器 CPU 使用率超过 90%,请立即检查!")
发送富文本卡片消息
def feishu_send_card(title: str, content: str, color: str = "red",
webhook: str = FEISHU_WEBHOOK) -> bool:
"""
发送交互卡片消息。
color: red(告警)/ green(正常)/ orange(警告)/ blue(通知)
"""
payload = {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": color,
},
"elements": [
{
"tag": "div",
"text": {"tag": "lark_md", "content": content},
}
],
},
}
resp = httpx.post(webhook, json=payload, timeout=10)
return resp.json().get("StatusCode") == 0
完整案例:服务器监控告警推送飞书
import psutil
from datetime import datetime
def check_and_alert_feishu(cpu_threshold: float = 90.0, mem_threshold: float = 85.0):
"""检查系统资源,超阈值时推送飞书告警"""
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
alerts = []
if cpu > cpu_threshold:
alerts.append(f"**CPU使用率**: {cpu:.1f}% > {cpu_threshold}%")
if mem > mem_threshold:
alerts.append(f"**内存使用率**: {mem:.1f}% > {mem_threshold}%")
if alerts:
content = f"**时间**: {now}\n" + "\n".join(alerts)
content += f"\n\n磁盘使用: {disk:.1f}%"
feishu_send_card(
title="服务器资源告警",
content=content,
color="red",
)
else:
print(f"[{now}] 系统正常: CPU {cpu:.1f}%, 内存 {mem:.1f}%")
if __name__ == "__main__":
check_and_alert_feishu()
企业微信机器人
配置 Webhook
企业微信群
发送文本与 Markdown 消息
import httpx
WECOM_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your-key"
def wecom_send_text(content: str, mentioned_list: list[str] = None) -> bool:
"""
发送文本消息,可以 @ 指定成员。
mentioned_list: 企业微信账号列表,["user1", "@all"] 表示 @所有人
"""
payload = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_list": mentioned_list or [],
},
}
resp = httpx.post(WECOM_WEBHOOK, json=payload, timeout=10)
result = resp.json()
return result.get("errcode") == 0
def wecom_send_markdown(content: str) -> bool:
"""发送 Markdown 格式消息(支持加粗、颜色等)"""
payload = {
"msgtype": "markdown",
"markdown": {"content": content},
}
resp = httpx.post(WECOM_WEBHOOK, json=payload, timeout=10)
return resp.json().get("errcode") == 0
完整案例:每日数据播报
from datetime import datetime, timedelta
def daily_broadcast(stats: dict):
"""每日数据播报,使用 Markdown 格式"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
change_icon = "up" if stats["revenue_change"] >= 0 else "down"
color = "info" if stats["revenue_change"] >= 0 else "warning"
content = f"""## 每日数据播报 {yesterday}
> **营收**: ¥{stats['revenue']:,.0f}
> **订单数**: {stats['orders']}
> **新增用户**: {stats['new_users']}
> **环比变化**: <font color="{color}">{stats['revenue_change']:+.1f}%</font>
数据来源:销售系统 | 更新时间:{datetime.now():%H:%M}"""
success = wecom_send_markdown(content)
if success:
print("企业微信播报发送成功")
daily_broadcast({
"revenue": 128000, "orders": 342,
"new_users": 56, "revenue_change": 12.5
})
Telegram Bot
创建 Bot 并获取 Token
- 在 Telegram 中搜索 @BotFather
- 发送
/newbot,按提示输入 Bot 名称和用户名 - 获得 API Token(格式:
123456789:AAHxxxxx) - 用 @userinfobot 获取你的 chat_id
pip install python-telegram-bot
发送消息(简单方式)
import httpx
import os
def telegram_send(text: str, chat_id: str, token: str = None) -> bool:
"""直接调用 Telegram Bot API 发送消息,无需额外依赖"""
token = token or os.getenv("TELEGRAM_BOT_TOKEN")
url = f"https://api.telegram.org/bot{token}/sendMessage"
resp = httpx.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown", # 支持 *加粗* _斜体_ `代码`
}, timeout=10)
return resp.json().get("ok", False)
telegram_send(
"*服务器告警*\nCPU: 95%\n时间: 2024-01-15 14:30",
chat_id=os.getenv("TELEGRAM_CHAT_ID"),
)
完整案例:个人数据查询 Bot(处理命令)
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
import sqlite3, os
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"我是数据查询 Bot。\n"
"/stats - 查询今日数据\n"
"/price [商品名] - 查询商品价格"
)
async def cmd_stats(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""查询今日统计数据"""
conn = sqlite3.connect("sales.db")
row = conn.execute(
"SELECT COUNT(*), COALESCE(SUM(amount),0) FROM orders WHERE date(created_at)=date('now')"
).fetchone()
conn.close()
msg = f"*今日数据*\n订单数:{row[0]}\n营收:¥{row[1]:,.0f}"
await update.message.reply_text(msg, parse_mode="Markdown")
async def cmd_price(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""查询指定商品最新价格"""
if not ctx.args:
await update.message.reply_text("用法:/price 商品名称")
return
name = " ".join(ctx.args)
conn = sqlite3.connect("prices.db")
row = conn.execute(
"SELECT price, scraped_at FROM price_history WHERE name LIKE ? ORDER BY scraped_at DESC LIMIT 1",
(f"%{name}%",)
).fetchone()
conn.close()
if row:
await update.message.reply_text(f"{name}:¥{row[0]:.2f}({row[1][:10]})")
else:
await update.message.reply_text(f"未找到 {name} 的价格记录")
def run_bot():
token = os.getenv("TELEGRAM_BOT_TOKEN")
app = Application.builder().token(token).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("stats", cmd_stats))
app.add_handler(CommandHandler("price", cmd_price))
print("Bot 运行中...")
app.run_polling()
if __name__ == "__main__":
run_bot()
钉钉机器人
安全设置:加签验证
钉钉机器人有三种安全设置:IP 白名单、自定义关键词、加签。加签最灵活,推荐使用。
import hashlib, hmac, base64, time, urllib.parse
import httpx, os
def dingtalk_send(title: str, content: str,
webhook: str = None, secret: str = None) -> bool:
"""
发送钉钉 Markdown 消息(带加签验证)。
webhook: 钉钉机器人 Webhook URL
secret: 钉钉机器人加签密钥(以 SEC 开头)
"""
webhook = webhook or os.getenv("DINGTALK_WEBHOOK")
secret = secret or os.getenv("DINGTALK_SECRET")
# 生成签名
timestamp = str(round(time.time() * 1000))
sign_str = f"{timestamp}\n{secret}"
hmac_code = hmac.new(
secret.encode("utf-8"), sign_str.encode("utf-8"), digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f"{webhook}×tamp={timestamp}&sign={sign}"
payload = {
"msgtype": "markdown",
"markdown": {"title": title, "text": f"## {title}\n\n{content}"},
}
resp = httpx.post(url, json=payload, timeout=10)
result = resp.json()
return result.get("errcode") == 0
统一 Notifier 封装
当项目需要同时支持多个通知渠道,或者将来可能切换渠道时,统一接口是最优解——调用方只管调 notify(),不需要关心背后用了哪个平台。
import os
import logging
from abc import ABC, abstractmethod
from enum import Enum
import httpx
logger = logging.getLogger(__name__)
# ---- 抽象基类 ----
class BaseNotifier(ABC):
@abstractmethod
def send(self, title: str, content: str, level: str = "info") -> bool:
"""
发送通知。
level: info / warn / error(影响卡片颜色等视觉样式)
"""
...
# ---- 飞书实现 ----
class FeishuNotifier(BaseNotifier):
COLOR_MAP = {"info": "blue", "warn": "orange", "error": "red"}
def __init__(self, webhook: str):
self.webhook = webhook
def send(self, title: str, content: str, level: str = "info") -> bool:
payload = {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": self.COLOR_MAP.get(level, "blue"),
},
"elements": [{"tag": "div", "text": {"tag": "lark_md", "content": content}}],
},
}
try:
resp = httpx.post(self.webhook, json=payload, timeout=10)
return resp.json().get("StatusCode") == 0
except Exception as e:
logger.error(f"飞书通知失败: {e}")
return False
# ---- 企业微信实现 ----
class WeComNotifier(BaseNotifier):
def __init__(self, webhook: str):
self.webhook = webhook
def send(self, title: str, content: str, level: str = "info") -> bool:
color_map = {"info": "comment", "warn": "warning", "error": "warning"}
color = color_map.get(level, "comment")
md = f"## {title}\n\n> <font color=\"{color}\">{content}</font>"
try:
resp = httpx.post(self.webhook, json={"msgtype": "markdown", "markdown": {"content": md}}, timeout=10)
return resp.json().get("errcode") == 0
except Exception as e:
logger.error(f"企业微信通知失败: {e}")
return False
# ---- Telegram 实现 ----
class TelegramNotifier(BaseNotifier):
def __init__(self, token: str, chat_id: str):
self.token = token
self.chat_id = chat_id
def send(self, title: str, content: str, level: str = "info") -> bool:
icon = {"info": "ℹ️", "warn": "⚠️", "error": "🚨"}.get(level, "ℹ️")
text = f"{icon} *{title}*\n\n{content}"
try:
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
resp = httpx.post(url, json={"chat_id": self.chat_id, "text": text, "parse_mode": "Markdown"}, timeout=10)
return resp.json().get("ok", False)
except Exception as e:
logger.error(f"Telegram通知失败: {e}")
return False
# ---- 统一管理器 ----
class Notifier:
"""统一通知管理器:支持多渠道并行发送"""
def __init__(self):
self._channels: list[BaseNotifier] = []
def add_channel(self, notifier: BaseNotifier) -> "Notifier":
self._channels.append(notifier)
return self # 支持链式调用
def notify(self, title: str, content: str, level: str = "info") -> dict:
"""向所有注册渠道发送通知,返回每个渠道的发送结果"""
results = {}
for ch in self._channels:
name = type(ch).__name__
results[name] = ch.send(title, content, level)
status = "成功" if results[name] else "失败"
logger.info(f"[{name}] 通知{status}: {title}")
return results
# ---- 从环境变量初始化(约50行可复用模块)----
def build_notifier_from_env() -> Notifier:
"""根据环境变量自动配置可用的通知渠道"""
notifier = Notifier()
if webhook := os.getenv("FEISHU_WEBHOOK"):
notifier.add_channel(FeishuNotifier(webhook))
if webhook := os.getenv("WECOM_WEBHOOK"):
notifier.add_channel(WeComNotifier(webhook))
if (token := os.getenv("TELEGRAM_BOT_TOKEN")) and (chat_id := os.getenv("TELEGRAM_CHAT_ID")):
notifier.add_channel(TelegramNotifier(token, chat_id))
return notifier
实战项目:全渠道告警系统
程序出错时自动发送飞书/企业微信通知,同时记录到日志文件,不需要人工盯屏。
import logging
import traceback
import functools
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler("app.log", encoding="utf-8"),
logging.StreamHandler(),
]
)
logger = logging.getLogger("app")
# 初始化通知器(从环境变量读配置)
notifier = build_notifier_from_env()
def alert_on_error(func):
"""装饰器:自动捕获函数异常并推送告警"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
tb = traceback.format_exc()
title = f"程序异常:{func.__name__}"
content = (
f"**错误类型**: {type(e).__name__}\n"
f"**错误信息**: {str(e)}\n"
f"**时间**: {datetime.now():%Y-%m-%d %H:%M:%S}\n"
f"**堆栈**(最后3行):\n```\n{chr(10).join(tb.strip().splitlines()[-3:])}\n```"
)
logger.error(f"{title}\n{tb}")
notifier.notify(title, content, level="error")
raise # 重新抛出,不吞异常
return wrapper
# ---- 使用示例 ----
@alert_on_error
def process_daily_data():
"""模拟数据处理任务,出错时自动告警"""
logger.info("开始处理每日数据...")
# ... 业务逻辑 ...
# 模拟一个可能出错的操作
data = fetch_from_database() # 如果这里报错,会自动推送飞书告警
logger.info(f"处理完成,共 {len(data)} 条记录")
return data
# ---- 主动发送业务告警 ----
def check_business_metrics():
"""业务指标异常时主动推送"""
# 假设从数据库获取到了指标
conversion_rate = 0.8 # 1% 以下触发告警
if conversion_rate < 1.0:
notifier.notify(
title="转化率异常告警",
content=(
f"**当前转化率**: {conversion_rate:.1f}%\n"
f"**阈值**: 1.0%\n"
f"**建议**: 检查落地页或广告投放质量"
),
level="warn",
)
if __name__ == "__main__":
check_business_metrics()
try:
process_daily_data()
except Exception:
pass # 已在装饰器中处理并告警
**最佳实践:**把
@alert_on_error装饰器加在所有定时任务的入口函数上,配合第15章的调度工具,就能实现"任务失败
上一章
下一章
第12章:AI API 集成