第 10 章

邮件自动化——批量发送与智能收件处理

第10章:邮件自动化——定时发送报告、告警和通知

邮件是最通用的异步沟通渠道。无论是每日数据日报、异常告警、还是给客户发个性化报告,Python 都能全自动搞定。本章覆盖三套邮件方案(从标准库到企业级),HTML 模板渲染,附件发送,以及接收邮件自动处理——最后用一个每日8点自动发送的日报系统串起全章内容。

Python 邮件库对比

定位 优势 适合场景
smtplib(标准库) 内置,无需安装 零依赖,最大灵活性 学习原理、自定义需求
yagmail 对 Gmail 极度简化 3行代码发邮件,自动处理附件 个人项目、快速原型
SendGrid SDK 企业级邮件服务 高送达率、详细统计、模板管理 生产环境、营销邮件

**选型建议:**个人脚本用 yagmail 最快。生产系统用 SendGrid(或 AWS SES、Mailgun),原因是普通 SMTP 发出的邮件容易进垃圾箱,且没有送达率追踪。学习时用 smtplib 理解底层原理。

发送邮件基础

SMTP 配置:主流邮件服务商

邮件服务 SMTP 地址 端口 注意
Gmail smtp.gmail.com 587 需开启"应用专用密码"
QQ邮箱 smtp.qq.com 587 需在设置中开启SMTP并获取授权码
企业微信邮箱 smtp.exmail.qq.com 465 用企业邮箱账号密码
Outlook/Hotmail smtp-mail.outlook.com 587 普通账号密码即可

用 smtplib 发送纯文本邮件

import smtplib
from email.message import EmailMessage

def send_text_email(
    subject: str,
    body: str,
    to: str | list[str],
    smtp_host: str = "smtp.gmail.com",
    smtp_port: int = 587,
    username: str = "",
    password: str = "",
):
    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"] = username
    msg["To"] = to if isinstance(to, str) else ", ".join(to)
    msg.set_content(body)

    with smtplib.SMTP(smtp_host, smtp_port) as server:
        server.starttls()           # 启用 TLS 加密
        server.login(username, password)
        server.send_message(msg)
    print(f"邮件已发送至: {msg['To']}")

# 使用示例
send_text_email(
    subject="每日数据摘要",
    body="今日销售额:¥128,000,同比增长 12%。",
    to=["[email protected]", "[email protected]"],
    username="[email protected]",
    password="your-app-password",  # Gmail 应用专用密码,不是登录密码
)

**密码安全:**永远不要把密码硬编码在代码里。用环境变量(os.getenv("EMAIL_PASSWORD"))或 .env 文件配合 python-dotenv 库管理敏感信息。

发送带附件的邮件(Excel/PDF 报告)

import smtplib
from email.message import EmailMessage
from pathlib import Path

def send_email_with_attachment(
    subject: str, body: str, to: str,
    attachment_path: str,
    username: str, password: str,
):
    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"] = username
    msg["To"] = to
    msg.set_content(body)

    # 添加附件
    file_path = Path(attachment_path)
    with open(file_path, "rb") as f:
        msg.add_attachment(
            f.read(),
            maintype="application",
            subtype="octet-stream",
            filename=file_path.name,
        )

    with smtplib.SMTP("smtp.gmail.com", 587) as server:
        server.starttls()
        server.login(username, password)
        server.send_message(msg)

send_email_with_attachment(
    subject="2024年1月销售报告",
    body="请查收附件中的月度销售报告,如有问题请回复。",
    to="[email protected]",
    attachment_path="reports/sales_2024_01.xlsx",
    username="[email protected]",
    password="your-app-password",
)

HTML 邮件模板

用 Jinja2 渲染邮件正文

pip install jinja2
from jinja2 import Template

WEEKLY_REPORT_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<style>
  body { font-family: Arial, sans-serif; color: #333; max-width: 600px; margin: 0 auto; }
  h1 { color: #2563eb; font-size: 22px; border-bottom: 2px solid #2563eb; padding-bottom: 8px; }
  .metric { background: #f0f4ff; border-radius: 8px; padding: 16px; margin: 12px 0; }
  .metric .value { font-size: 28px; font-weight: bold; color: #1d4ed8; }
  .metric .label { font-size: 13px; color: #6b7280; }
  .change-up { color: #16a34a; }
  .change-down { color: #dc2626; }
  table { width: 100%; border-collapse: collapse; margin: 16px 0; }
  th { background: #2563eb; color: white; padding: 10px; text-align: left; }
  td { padding: 8px 10px; border-bottom: 1px solid #e5e7eb; }
  tr:nth-child(even) td { background: #f9fafb; }
</style>
</head>
<body>
  <h1>{{"{{"}}report_date{{"}}"}} 周报摘要</h1>
  <div class="metric">
    <div class="value">¥{{"{{"}}revenue | format_money{{"}}"}}</div>
    <div class="label">本周营收
      <span class="{{"{{"}}'change-up' if revenue_change >= 0 else 'change-down'{{"}}"}}">
        {{"{{"}}'%+.1f%%' | format(revenue_change){{"}}"}}
      </span>
    </div>
  </div>
  <h2 style="font-size:16px;">Top 5 产品</h2>
  <table>
    <thead><tr><th>产品</th><th>销量</th><th>营收</th></tr></thead>
    <tbody>
      {% for p in top_products %}
      <tr>
        <td>{{"{{"}}p.name{{"}}"}}</td>
        <td>{{"{{"}}p.qty{{"}}"}}</td>
        <td>¥{{"{{"}}p.revenue{{"}}"}}</td>
      </tr>
      {% endfor %}
    </tbody>
  </table>
</body>
</html>
"""

def render_weekly_report(data: dict) -> str:
    tpl = Template(WEEKLY_REPORT_TEMPLATE)
    return tpl.render(**data)

# 渲染示例
html_body = render_weekly_report({
    "report_date": "2024年第3周",
    "revenue": 128000,
    "revenue_change": 12.5,
    "top_products": [
        {"name": "蓝牙耳机 Pro", "qty": 320, "revenue": 45600},
        {"name": "充电宝 10000mAh", "qty": 580, "revenue": 34800},
    ],
})

发送 HTML 格式邮件

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

def send_html_email(subject: str, html_body: str, to: str, username: str, password: str):
    msg = MIMEMultipart("alternative")
    msg["Subject"] = subject
    msg["From"] = username
    msg["To"] = to

    # 同时附上纯文本版本(邮件客户端降级显示)
    text_part = MIMEText("请使用支持HTML的邮件客户端查看此邮件。", "plain", "utf-8")
    html_part = MIMEText(html_body, "html", "utf-8")
    msg.attach(text_part)
    msg.attach(html_part)  # HTML版本放后面,优先显示

    with smtplib.SMTP("smtp.gmail.com", 587) as server:
        server.starttls()
        server.login(username, password)
        server.sendmail(username, to, msg.as_string())

邮件接收与解析

用 imaplib 读取邮箱

import imaplib
import email
from email.header import decode_header
from pathlib import Path

def fetch_unread_emails(
    imap_host: str, username: str, password: str,
    mailbox: str = "INBOX", limit: int = 10
) -> list[dict]:
    """读取未读邮件,返回结构化数据"""
    results = []
    with imaplib.IMAP4_SSL(imap_host) as imap:
        imap.login(username, password)
        imap.select(mailbox)

        # 搜索未读邮件
        _, message_ids = imap.search(None, "UNSEEN")
        ids = message_ids[0].split()[-limit:]  # 最近 N 封

        for mid in ids:
            _, data = imap.fetch(mid, "(RFC822)")
            msg = email.message_from_bytes(data[0][1])

            # 解码主题
            subject, enc = decode_header(msg["Subject"])[0]
            if isinstance(subject, bytes):
                subject = subject.decode(enc or "utf-8", errors="replace")

            results.append({
                "subject": subject,
                "from": msg.get("From", ""),
                "date": msg.get("Date", ""),
                "msg_obj": msg,
            })
    return results

def save_attachments(msg, save_dir: str = "attachments") -> list[str]:
    """从邮件对象中提取并保存所有附件,返回保存路径列表"""
    Path(save_dir).mkdir(exist_ok=True)
    saved = []
    for part in msg.walk():
        if part.get_content_disposition() == "attachment":
            filename, enc = decode_header(part.get_filename())[0]
            if isinstance(filename, bytes):
                filename = filename.decode(enc or "utf-8")
            save_path = Path(save_dir) / filename
            with open(save_path, "wb") as f:
                f.write(part.get_payload(decode=True))
            saved.append(str(save_path))
            print(f"附件已保存: {save_path}")
    return saved

完整案例:自动处理客服邮件附件

import os
from pathlib import Path

def auto_process_customer_emails():
    """
    自动检查指定邮箱,下载客服邮件中的订单附件到 downloads 目录,
    然后标记邮件为已读。
    """
    emails = fetch_unread_emails(
        imap_host="imap.gmail.com",
        username=os.getenv("EMAIL_USER"),
        password=os.getenv("EMAIL_PASS"),
    )

    for e in emails:
        subject = e["subject"]
        # 只处理包含"订单"的邮件
        if "订单" in subject or "order" in subject.lower():
            print(f"处理邮件: {subject}")
            saved = save_attachments(e["msg_obj"], save_dir="downloads/orders")
            if saved:
                print(f"  已下载 {len(saved)} 个附件")
            else:
                print("  无附件")

if __name__ == "__main__":
    auto_process_customer_emails()

企业邮件场景

批量个性化邮件

import csv
import time
from pathlib import Path

def send_personalized_emails(csv_path: str, username: str, password: str):
    """
    从 CSV 读取收件人信息,每人收到不同内容的邮件。
    CSV 格式:name,email,amount,due_date
    """
    TEMPLATE = """尊敬的 {name},

您好!您的账单已生成。

账单金额:¥{amount}
到期日期:{due_date}

请在到期日前完成支付,如有疑问请回复此邮件。

此致
财务部"""

    with open(csv_path, encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            body = TEMPLATE.format(**row)
            send_text_email(
                subject=f"账单通知 - {row['name']}",
                body=body,
                to=row["email"],
                username=username,
                password=password,
            )
            print(f"[{i+1}] 已发送给 {row['name']} ({row['email']})")
            # 发送间隔:避免被标记为垃圾邮件
            time.sleep(2)

**批量发送注意事项:**Gmail 每天免费额度约 500 封,企业 Google Workspace 每天 2000 封。发送间隔不低于 1 秒。大批量(1000封以上)建议使用 SendGrid 等专业邮件服务,有更好的送达率和退订管理。

SendGrid API 发送高可靠邮件

pip install sendgrid
import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Attachment, FileContent, FileName, FileType
import base64

def send_via_sendgrid(
    subject: str, html_content: str, to_email: str,
    attachment_path: str = None,
):
    """用 SendGrid API 发送邮件,支持附件"""
    message = Mail(
        from_email="[email protected]",
        to_emails=to_email,
        subject=subject,
        html_content=html_content,
    )
    if attachment_path:
        with open(attachment_path, "rb") as f:
            data = base64.b64encode(f.read()).decode()
        att = Attachment(
            FileContent(data),
            FileName(Path(attachment_path).name),
            FileType("application/octet-stream"),
        )
        message.attachment = att

    sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY"))
    response = sg.send(message)
    print(f"SendGrid 状态码: {response.status_code}")
    return response.status_code == 202

实战项目:自动化日报系统

每天早上 8 点,自动从数据库查询昨日数据,渲染 HTML 报告并发送给相关人员。

import os
import sqlite3
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime, timedelta
from jinja2 import Template

# ---- 配置 ----
EMAIL_USER = os.getenv("EMAIL_USER", "[email protected]")
EMAIL_PASS = os.getenv("EMAIL_PASS", "")
RECIPIENTS = ["[email protected]", "[email protected]"]
DB_PATH = "sales.db"

DAILY_REPORT_HTML = """
<html><body style="font-family:Arial,sans-serif;max-width:600px;margin:0 auto;">
<h2 style="color:#2563eb;">{{"{{"}}date{{"}}"}} 日报</h2>
<p>以下是昨日({{"{{"}}yesterday{{"}}"}})关键指标:</p>
<table style="width:100%;border-collapse:collapse;">
  <tr style="background:#2563eb;color:white;">
    <th style="padding:10px;text-align:left;">指标</th>
    <th style="padding:10px;text-align:right;">数值</th>
  </tr>
  <tr><td style="padding:8px;border-bottom:1px solid #e5e7eb;">总订单数</td>
      <td style="padding:8px;border-bottom:1px solid #e5e7eb;text-align:right;">{{"{{"}}orders{{"}}"}}</td></tr>
  <tr style="background:#f9fafb;">
      <td style="padding:8px;border-bottom:1px solid #e5e7eb;">总营收</td>
      <td style="padding:8px;border-bottom:1px solid #e5e7eb;text-align:right;">¥{{"{{"}}revenue{{"}}"}}</td></tr>
  <tr><td style="padding:8px;">新增用户</td>
      <td style="padding:8px;text-align:right;">{{"{{"}}new_users{{"}}"}}</td></tr>
</table>
<p style="color:#6b7280;font-size:12px;margin-top:24px;">此邮件由自动化系统发送,请勿直接回复。</p>
</body></html>
"""

def query_yesterday_stats(db_path: str, date_str: str) -> dict:
    """从 SQLite 查询指定日期的统计数据"""
    conn = sqlite3.connect(db_path)
    orders = conn.execute(
        "SELECT COUNT(*), COALESCE(SUM(amount),0) FROM orders WHERE date(created_at)=?",
        (date_str,)
    ).fetchone()
    new_users = conn.execute(
        "SELECT COUNT(*) FROM users WHERE date(created_at)=?", (date_str,)
    ).fetchone()[0]
    conn.close()
    return {
        "orders": orders[0],
        "revenue": f"{orders[1]:,.0f}",
        "new_users": new_users,
    }

def send_daily_report():
    today = datetime.now()
    yesterday = today - timedelta(days=1)
    yesterday_str = yesterday.strftime("%Y-%m-%d")

    # 查询数据
    stats = query_yesterday_stats(DB_PATH, yesterday_str)
    stats.update({
        "date": today.strftime("%Y年%m月%d日"),
        "yesterday": yesterday_str,
    })

    # 渲染 HTML
    html_body = Template(DAILY_REPORT_HTML).render(**stats)

    # 构建邮件
    msg = MIMEMultipart("alternative")
    msg["Subject"] = f"【日报】{yesterday_str} 数据摘要"
    msg["From"] = EMAIL_USER
    msg["To"] = ", ".join(RECIPIENTS)
    msg.attach(MIMEText(html_body, "html", "utf-8"))

    # 发送
    with smtplib.SMTP("smtp.gmail.com", 587) as server:
        server.starttls()
        server.login(EMAIL_USER, EMAIL_PASS)
        server.sendmail(EMAIL_USER, RECIPIENTS, msg.as_string())
    print(f"[{today:%H:%M:%S}] 日报已发送给 {len(RECIPIENTS)} 位收件人")

if __name__ == "__main__":
    send_daily_report()

**定时运行:**在 macOS/Linux 上,把这个脚本加入 crontab:0 8 * * * /path/to/venv/bin/python /path/to/daily_report.py 即可实现每天早上 8 点自动运行。Windows 用任务计划程序。更完整的调度方案见第15章。

上一章

下一章
第11章:消息通知自动化
本章评分
4.5  / 5  (30 评分)

💬 留言讨论