第 10 章
邮件自动化——批量发送与智能收件处理
第10章:邮件自动化——定时发送报告、告警和通知
邮件是最通用的异步沟通渠道。无论是每日数据日报、异常告警、还是给客户发个性化报告,Python 都能全自动搞定。本章覆盖三套邮件方案(从标准库到企业级),HTML 模板渲染,附件发送,以及接收邮件自动处理——最后用一个每日8点自动发送的日报系统串起全章内容。
Python 邮件库对比
| 库 | 定位 | 优势 | 适合场景 |
|---|---|---|---|
| smtplib(标准库) | 内置,无需安装 | 零依赖,最大灵活性 | 学习原理、自定义需求 |
| yagmail | 对 Gmail 极度简化 | 3行代码发邮件,自动处理附件 | 个人项目、快速原型 |
| SendGrid SDK | 企业级邮件服务 | 高送达率、详细统计、模板管理 | 生产环境、营销邮件 |
**选型建议:**个人脚本用 yagmail 最快。生产系统用 SendGrid(或 AWS SES、Mailgun),原因是普通 SMTP 发出的邮件容易进垃圾箱,且没有送达率追踪。学习时用 smtplib 理解底层原理。
发送邮件基础
SMTP 配置:主流邮件服务商
| 邮件服务 | SMTP 地址 | 端口 | 注意 |
|---|---|---|---|
| Gmail | smtp.gmail.com | 587 | 需开启"应用专用密码" |
| QQ邮箱 | smtp.qq.com | 587 | 需在设置中开启SMTP并获取授权码 |
| 企业微信邮箱 | smtp.exmail.qq.com | 465 | 用企业邮箱账号密码 |
| Outlook/Hotmail | smtp-mail.outlook.com | 587 | 普通账号密码即可 |
用 smtplib 发送纯文本邮件
import smtplib
from email.message import EmailMessage
def send_text_email(
subject: str,
body: str,
to: str | list[str],
smtp_host: str = "smtp.gmail.com",
smtp_port: int = 587,
username: str = "",
password: str = "",
):
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = username
msg["To"] = to if isinstance(to, str) else ", ".join(to)
msg.set_content(body)
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls() # 启用 TLS 加密
server.login(username, password)
server.send_message(msg)
print(f"邮件已发送至: {msg['To']}")
# 使用示例
send_text_email(
subject="每日数据摘要",
body="今日销售额:¥128,000,同比增长 12%。",
to=["[email protected]", "[email protected]"],
username="[email protected]",
password="your-app-password", # Gmail 应用专用密码,不是登录密码
)
**密码安全:**永远不要把密码硬编码在代码里。用环境变量(
os.getenv("EMAIL_PASSWORD"))或.env文件配合 python-dotenv 库管理敏感信息。
发送带附件的邮件(Excel/PDF 报告)
import smtplib
from email.message import EmailMessage
from pathlib import Path
def send_email_with_attachment(
subject: str, body: str, to: str,
attachment_path: str,
username: str, password: str,
):
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = username
msg["To"] = to
msg.set_content(body)
# 添加附件
file_path = Path(attachment_path)
with open(file_path, "rb") as f:
msg.add_attachment(
f.read(),
maintype="application",
subtype="octet-stream",
filename=file_path.name,
)
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(username, password)
server.send_message(msg)
send_email_with_attachment(
subject="2024年1月销售报告",
body="请查收附件中的月度销售报告,如有问题请回复。",
to="[email protected]",
attachment_path="reports/sales_2024_01.xlsx",
username="[email protected]",
password="your-app-password",
)
HTML 邮件模板
用 Jinja2 渲染邮件正文
pip install jinja2
from jinja2 import Template
WEEKLY_REPORT_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; color: #333; max-width: 600px; margin: 0 auto; }
h1 { color: #2563eb; font-size: 22px; border-bottom: 2px solid #2563eb; padding-bottom: 8px; }
.metric { background: #f0f4ff; border-radius: 8px; padding: 16px; margin: 12px 0; }
.metric .value { font-size: 28px; font-weight: bold; color: #1d4ed8; }
.metric .label { font-size: 13px; color: #6b7280; }
.change-up { color: #16a34a; }
.change-down { color: #dc2626; }
table { width: 100%; border-collapse: collapse; margin: 16px 0; }
th { background: #2563eb; color: white; padding: 10px; text-align: left; }
td { padding: 8px 10px; border-bottom: 1px solid #e5e7eb; }
tr:nth-child(even) td { background: #f9fafb; }
</style>
</head>
<body>
<h1>{{"{{"}}report_date{{"}}"}} 周报摘要</h1>
<div class="metric">
<div class="value">¥{{"{{"}}revenue | format_money{{"}}"}}</div>
<div class="label">本周营收
<span class="{{"{{"}}'change-up' if revenue_change >= 0 else 'change-down'{{"}}"}}">
{{"{{"}}'%+.1f%%' | format(revenue_change){{"}}"}}
</span>
</div>
</div>
<h2 style="font-size:16px;">Top 5 产品</h2>
<table>
<thead><tr><th>产品</th><th>销量</th><th>营收</th></tr></thead>
<tbody>
{% for p in top_products %}
<tr>
<td>{{"{{"}}p.name{{"}}"}}</td>
<td>{{"{{"}}p.qty{{"}}"}}</td>
<td>¥{{"{{"}}p.revenue{{"}}"}}</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>
"""
def render_weekly_report(data: dict) -> str:
tpl = Template(WEEKLY_REPORT_TEMPLATE)
return tpl.render(**data)
# 渲染示例
html_body = render_weekly_report({
"report_date": "2024年第3周",
"revenue": 128000,
"revenue_change": 12.5,
"top_products": [
{"name": "蓝牙耳机 Pro", "qty": 320, "revenue": 45600},
{"name": "充电宝 10000mAh", "qty": 580, "revenue": 34800},
],
})
发送 HTML 格式邮件
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
def send_html_email(subject: str, html_body: str, to: str, username: str, password: str):
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = username
msg["To"] = to
# 同时附上纯文本版本(邮件客户端降级显示)
text_part = MIMEText("请使用支持HTML的邮件客户端查看此邮件。", "plain", "utf-8")
html_part = MIMEText(html_body, "html", "utf-8")
msg.attach(text_part)
msg.attach(html_part) # HTML版本放后面,优先显示
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(username, password)
server.sendmail(username, to, msg.as_string())
邮件接收与解析
用 imaplib 读取邮箱
import imaplib
import email
from email.header import decode_header
from pathlib import Path
def fetch_unread_emails(
imap_host: str, username: str, password: str,
mailbox: str = "INBOX", limit: int = 10
) -> list[dict]:
"""读取未读邮件,返回结构化数据"""
results = []
with imaplib.IMAP4_SSL(imap_host) as imap:
imap.login(username, password)
imap.select(mailbox)
# 搜索未读邮件
_, message_ids = imap.search(None, "UNSEEN")
ids = message_ids[0].split()[-limit:] # 最近 N 封
for mid in ids:
_, data = imap.fetch(mid, "(RFC822)")
msg = email.message_from_bytes(data[0][1])
# 解码主题
subject, enc = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(enc or "utf-8", errors="replace")
results.append({
"subject": subject,
"from": msg.get("From", ""),
"date": msg.get("Date", ""),
"msg_obj": msg,
})
return results
def save_attachments(msg, save_dir: str = "attachments") -> list[str]:
"""从邮件对象中提取并保存所有附件,返回保存路径列表"""
Path(save_dir).mkdir(exist_ok=True)
saved = []
for part in msg.walk():
if part.get_content_disposition() == "attachment":
filename, enc = decode_header(part.get_filename())[0]
if isinstance(filename, bytes):
filename = filename.decode(enc or "utf-8")
save_path = Path(save_dir) / filename
with open(save_path, "wb") as f:
f.write(part.get_payload(decode=True))
saved.append(str(save_path))
print(f"附件已保存: {save_path}")
return saved
完整案例:自动处理客服邮件附件
import os
from pathlib import Path
def auto_process_customer_emails():
"""
自动检查指定邮箱,下载客服邮件中的订单附件到 downloads 目录,
然后标记邮件为已读。
"""
emails = fetch_unread_emails(
imap_host="imap.gmail.com",
username=os.getenv("EMAIL_USER"),
password=os.getenv("EMAIL_PASS"),
)
for e in emails:
subject = e["subject"]
# 只处理包含"订单"的邮件
if "订单" in subject or "order" in subject.lower():
print(f"处理邮件: {subject}")
saved = save_attachments(e["msg_obj"], save_dir="downloads/orders")
if saved:
print(f" 已下载 {len(saved)} 个附件")
else:
print(" 无附件")
if __name__ == "__main__":
auto_process_customer_emails()
企业邮件场景
批量个性化邮件
import csv
import time
from pathlib import Path
def send_personalized_emails(csv_path: str, username: str, password: str):
"""
从 CSV 读取收件人信息,每人收到不同内容的邮件。
CSV 格式:name,email,amount,due_date
"""
TEMPLATE = """尊敬的 {name},
您好!您的账单已生成。
账单金额:¥{amount}
到期日期:{due_date}
请在到期日前完成支付,如有疑问请回复此邮件。
此致
财务部"""
with open(csv_path, encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
body = TEMPLATE.format(**row)
send_text_email(
subject=f"账单通知 - {row['name']}",
body=body,
to=row["email"],
username=username,
password=password,
)
print(f"[{i+1}] 已发送给 {row['name']} ({row['email']})")
# 发送间隔:避免被标记为垃圾邮件
time.sleep(2)
**批量发送注意事项:**Gmail 每天免费额度约 500 封,企业 Google Workspace 每天 2000 封。发送间隔不低于 1 秒。大批量(1000封以上)建议使用 SendGrid 等专业邮件服务,有更好的送达率和退订管理。
SendGrid API 发送高可靠邮件
pip install sendgrid
import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Attachment, FileContent, FileName, FileType
import base64
def send_via_sendgrid(
subject: str, html_content: str, to_email: str,
attachment_path: str = None,
):
"""用 SendGrid API 发送邮件,支持附件"""
message = Mail(
from_email="[email protected]",
to_emails=to_email,
subject=subject,
html_content=html_content,
)
if attachment_path:
with open(attachment_path, "rb") as f:
data = base64.b64encode(f.read()).decode()
att = Attachment(
FileContent(data),
FileName(Path(attachment_path).name),
FileType("application/octet-stream"),
)
message.attachment = att
sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY"))
response = sg.send(message)
print(f"SendGrid 状态码: {response.status_code}")
return response.status_code == 202
实战项目:自动化日报系统
每天早上 8 点,自动从数据库查询昨日数据,渲染 HTML 报告并发送给相关人员。
import os
import sqlite3
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime, timedelta
from jinja2 import Template
# ---- 配置 ----
EMAIL_USER = os.getenv("EMAIL_USER", "[email protected]")
EMAIL_PASS = os.getenv("EMAIL_PASS", "")
RECIPIENTS = ["[email protected]", "[email protected]"]
DB_PATH = "sales.db"
DAILY_REPORT_HTML = """
<html><body style="font-family:Arial,sans-serif;max-width:600px;margin:0 auto;">
<h2 style="color:#2563eb;">{{"{{"}}date{{"}}"}} 日报</h2>
<p>以下是昨日({{"{{"}}yesterday{{"}}"}})关键指标:</p>
<table style="width:100%;border-collapse:collapse;">
<tr style="background:#2563eb;color:white;">
<th style="padding:10px;text-align:left;">指标</th>
<th style="padding:10px;text-align:right;">数值</th>
</tr>
<tr><td style="padding:8px;border-bottom:1px solid #e5e7eb;">总订单数</td>
<td style="padding:8px;border-bottom:1px solid #e5e7eb;text-align:right;">{{"{{"}}orders{{"}}"}}</td></tr>
<tr style="background:#f9fafb;">
<td style="padding:8px;border-bottom:1px solid #e5e7eb;">总营收</td>
<td style="padding:8px;border-bottom:1px solid #e5e7eb;text-align:right;">¥{{"{{"}}revenue{{"}}"}}</td></tr>
<tr><td style="padding:8px;">新增用户</td>
<td style="padding:8px;text-align:right;">{{"{{"}}new_users{{"}}"}}</td></tr>
</table>
<p style="color:#6b7280;font-size:12px;margin-top:24px;">此邮件由自动化系统发送,请勿直接回复。</p>
</body></html>
"""
def query_yesterday_stats(db_path: str, date_str: str) -> dict:
"""从 SQLite 查询指定日期的统计数据"""
conn = sqlite3.connect(db_path)
orders = conn.execute(
"SELECT COUNT(*), COALESCE(SUM(amount),0) FROM orders WHERE date(created_at)=?",
(date_str,)
).fetchone()
new_users = conn.execute(
"SELECT COUNT(*) FROM users WHERE date(created_at)=?", (date_str,)
).fetchone()[0]
conn.close()
return {
"orders": orders[0],
"revenue": f"{orders[1]:,.0f}",
"new_users": new_users,
}
def send_daily_report():
today = datetime.now()
yesterday = today - timedelta(days=1)
yesterday_str = yesterday.strftime("%Y-%m-%d")
# 查询数据
stats = query_yesterday_stats(DB_PATH, yesterday_str)
stats.update({
"date": today.strftime("%Y年%m月%d日"),
"yesterday": yesterday_str,
})
# 渲染 HTML
html_body = Template(DAILY_REPORT_HTML).render(**stats)
# 构建邮件
msg = MIMEMultipart("alternative")
msg["Subject"] = f"【日报】{yesterday_str} 数据摘要"
msg["From"] = EMAIL_USER
msg["To"] = ", ".join(RECIPIENTS)
msg.attach(MIMEText(html_body, "html", "utf-8"))
# 发送
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(EMAIL_USER, EMAIL_PASS)
server.sendmail(EMAIL_USER, RECIPIENTS, msg.as_string())
print(f"[{today:%H:%M:%S}] 日报已发送给 {len(RECIPIENTS)} 位收件人")
if __name__ == "__main__":
send_daily_report()
**定时运行:**在 macOS/Linux 上,把这个脚本加入 crontab:
0 8 * * * /path/to/venv/bin/python /path/to/daily_report.py即可实现每天早上 8 点自动运行。Windows 用任务计划程序。更完整的调度方案见第15章。
上一章
下一章
第11章:消息通知自动化