综合实战项目——构建完整的自动化系统
第16章:综合实战——打造个人 AI 自动化助理
本章是全书的终章。我们不再学习单一技术,而是把前 15 章的所有模块整合在一起,构建一个真正能在服务器上持续运行的"个人 AI 自动化助理":每天定时抓取你关注的信息,用 Claude AI 自动摘要和分类,生成可视化 HTML 报告,通过飞书和邮件推送给你。这一章的代码可以直接拿去运行。
项目目标与架构设计
这个系统能做什么
假设你每天需要关注行业动态、某几个网站的更新、以及竞争对手的价格变动。现在你手动逐一查看,每天花费 30-60 分钟。这个助理将完全接管这件事:
- 每天早上 8:00,自动抓取你订阅的 RSS 源和指定网站
- 用 Claude API 对每条新闻生成 50 字摘要,打重要性评分(1-5 分)
- 生成 HTML 报告,包含图表:今日信息量、各来源占比、重要度分布
- 同时推送到飞书和邮件,手机上直接看摘要,点击链接看完整报告
- 所有数据存入 SQLite,可追溯历史、做趋势分析
项目目录结构
auto-assistant/ ├── config/ │ ├── init.py │ └── settings.py # pydantic-settings 配置 ├── scrapers/ │ ├── init.py │ ├── rss_scraper.py # RSS 订阅抓取 │ └── web_scraper.py # 网页定向爬取 ├── processors/ │ ├── init.py │ └── ai_processor.py # Claude AI 摘要与分类 ├── reporters/ │ ├── init.py │ ├── html_reporter.py # HTML 报告生成 │ └── templates/ │ └── daily_report.html ├── notifiers/ │ ├── init.py │ ├── base.py # 抽象 Notifier 接口 │ ├── feishu.py # 飞书 Webhook │ └── email_notifier.py # SMTP 邮件 ├── database/ │ ├── init.py │ └── models.py # SQLAlchemy 模型 ├── scheduler.py # APScheduler 主调度 ├── cli.py # Click CLI 入口 ├── requirements.txt └── .env # 密钥(不提交到 git)
技术栈一览
| 层级 | 技术 | 对应章节 |
|---|---|---|
| 数据采集 | feedparser、requests、BeautifulSoup | 第9章 |
| 数据持久化 | SQLite + SQLAlchemy | 第2章(文件I/O扩展) |
| AI 处理 | Anthropic Claude API | 第12章 |
| 报告生成 | Jinja2 + matplotlib | 第13章 |
| 通知推送 | 飞书 Webhook、smtplib | 第10、11章 |
| 定时调度 | APScheduler | 第15章 |
| 配置管理 | pydantic-settings + python-dotenv | 第15章 |
| CLI 工具 | Click | 第15章 |
第一模块:配置系统
配置系统是整个项目的基础。使用 pydantic-settings 实现:所有配置从 .env 文件读取,有类型校验,IDE 有自动补全,部署时只需替换环境变量。
config/settings.py pydantic-settings 配置
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field
from typing import List
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
# AI 配置
anthropic_api_key: str = Field(..., description="Claude API Key")
ai_model: str = "claude-3-5-haiku-20241022"
ai_max_tokens: int = 300
# 数据库
db_path: str = "data/assistant.db"
# 通知渠道
feishu_webhook_url: str = ""
smtp_host: str = "smtp.gmail.com"
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
report_recipients: List[str] = []
# 采集配置
rss_feeds: List[str] = [
"https://feeds.feedburner.com/PythonInsider",
"https://realpython.com/atom.xml",
]
max_items_per_feed: int = 10
# 报告配置
report_output_dir: str = "reports"
# 全局单例
settings = Settings()
**为什么用 pydantic-settings 而不是直接 os.environ:**pydantic-settings 在应用启动时就验证所有必填配置,缺少 API Key 会立刻报错而不是在任务运行到一半时崩溃。类型声明也让你的配置意图一目了然。
对应的 .env 文件模板(绝对不要提交到 Git):
.env.example 复制为 .env 并填写真实值
ANTHROPIC_API_KEY=sk-ant-xxxxxxxxxxxxx
FEISHU_WEBHOOK_URL=https://open.feishu.cn/open-apis/bot/v2/hook/xxx
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
[email protected]
SMTP_PASSWORD=your_app_password
REPORT_RECIPIENTS=["[email protected]","[email protected]"]
RSS_FEEDS=["https://feeds.feedburner.com/PythonInsider","https://hnrss.org/frontpage"]
第二模块:数据采集层
数据库模型(先定义,采集时写入)
database/models.py SQLAlchemy ORM 模型
from sqlalchemy import create_engine, Column, Integer, String, Text, Float, DateTime, Boolean
from sqlalchemy.orm import DeclarativeBase, Session
from datetime import datetime
from config.settings import settings
class Base(DeclarativeBase):
pass
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
title = Column(String(500), nullable=False)
url = Column(String(1000), unique=True, nullable=False)
source = Column(String(200))
published = Column(DateTime, default=datetime.utcnow)
raw_content = Column(Text)
summary = Column(Text) # AI 生成摘要
score = Column(Float) # AI 重要性评分 1-5
category = Column(String(100)) # AI 分类标签
processed = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
engine = create_engine(f"sqlite:///{settings.db_path}", echo=False)
def init_db():
import os
os.makedirs(os.path.dirname(settings.db_path), exist_ok=True)
Base.metadata.create_all(engine)
def get_session() -> Session:
return Session(engine)
RSS 抓取器
scrapers/rss_scraper.py feedparser 多源抓取
import feedparser
import logging
from datetime import datetime
from typing import List, Dict
from config.settings import settings
from database.models import Article, get_session
logger = logging.getLogger(__name__)
def fetch_rss_feed(feed_url: str) -> List[Dict]:
"""抓取单个 RSS 源,返回文章列表"""
try:
feed = feedparser.parse(feed_url)
articles = []
for entry in feed.entries[:settings.max_items_per_feed]:
articles.append({
"title": entry.get("title", "").strip(),
"url": entry.get("link", ""),
"source": feed.feed.get("title", feed_url),
"content": entry.get("summary", entry.get("description", "")),
"published": datetime(*entry.published_parsed[:6])
if hasattr(entry, "published_parsed") and entry.published_parsed
else datetime.utcnow(),
})
logger.info(f"RSS {feed_url}: 获取 {len(articles)} 篇")
return articles
except Exception as e:
logger.error(f"RSS 抓取失败 {feed_url}: {e}")
return []
def scrape_all_feeds() -> int:
"""抓取所有配置的 RSS 源,去重后写入数据库,返回新增数量"""
session = get_session()
saved = 0
for url in settings.rss_feeds:
for art in fetch_rss_feed(url):
if not art["url"]:
continue
exists = session.query(Article).filter_by(url=art["url"]).first()
if exists:
continue
record = Article(
title=art["title"],
url=art["url"],
source=art["source"],
published=art["published"],
raw_content=art["content"],
)
session.add(record)
saved += 1
session.commit()
session.close()
logger.info(f"新增文章 {saved} 篇")
return saved
第三模块:AI 处理层
AI 处理层是整个系统的核心差异所在。我们用 Claude API 批量处理未处理的文章,要求它一次性返回摘要、分类和评分,减少 API 调用次数(从而降低成本)。
processors/ai_processor.py Claude API 批量处理
import anthropic
import json
import logging
from database.models import Article, get_session
from config.settings import settings
logger = logging.getLogger(__name__)
client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
SYSTEM_PROMPT = """你是一个信息助理,专门处理科技/行业资讯。
对于每篇文章,用 JSON 格式输出:
{"summary": "50字以内的核心摘要", "category": "技术/产品/行业/政策/其他", "score": 3}
score 是 1-5 的整数,5 最重要。只输出 JSON,不加其他内容。"""
def process_article(article: Article) -> bool:
"""用 Claude 处理单篇文章,更新摘要/分类/评分"""
content = f"标题:{article.title}\n\n内容:{article.raw_content[:800]}"
try:
response = client.messages.create(
model=settings.ai_model,
max_tokens=settings.ai_max_tokens,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": content}],
)
result = json.loads(response.content[0].text.strip())
article.summary = result.get("summary", "")
article.category = result.get("category", "其他")
article.score = float(result.get("score", 3))
article.processed = True
return True
except Exception as e:
logger.error(f"AI 处理失败 [{article.title[:30]}]: {e}")
return False
def process_pending_articles(limit: int = 30) -> int:
"""批量处理未经 AI 分析的文章,返回成功数量"""
session = get_session()
pending = (session.query(Article)
.filter_by(processed=False)
.order_by(Article.published.desc())
.limit(limit).all())
ok = 0
for art in pending:
if process_article(art):
ok += 1
session.commit()
session.close()
logger.info(f"AI 处理完成 {ok}/{len(pending)} 篇")
return ok
**成本控制建议:**每次调用 Claude claude-3-5-haiku 处理一篇 800 字文章约消耗 500 tokens,费用约 $0.0004。每天处理 30 篇,月成本约 $0.36。如果文章数量更多,可以把多篇文章拼成一个 batch 请求(用换行符分隔),让 Claude 一次返回多个 JSON,进一步节省调用次数。
第四模块:报告生成层
报告生成器从数据库读取当天的已处理文章,用 matplotlib 生成图表(以 base64 内嵌到 HTML 中,无需外部文件依赖),再用 Jinja2 渲染 HTML 报告。
reporters/html_reporter.py Jinja2 + matplotlib 报告生成
import os
import base64
import io
from datetime import datetime, date
from jinja2 import Environment, FileSystemLoader
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from collections import Counter
from database.models import Article, get_session
from config.settings import settings
def make_chart_base64(articles) -> str:
"""生成来源分布饼图,返回 base64 字符串"""
sources = Counter(a.source for a in articles)
fig, ax = plt.subplots(figsize=(5, 4), facecolor="#1a1f35")
ax.set_facecolor("#1a1f35")
ax.pie(sources.values(), labels=sources.keys(), autopct="%1.0f%%",
colors=["#6c63ff","#22c55e","#f59e0b","#3b82f6","#ef4444"],
textprops={"color": "#e2e8f0", "fontsize": 9})
ax.set_title("信息来源分布", color="#e2e8f0", fontsize=11)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight", dpi=110)
plt.close(fig)
return base64.b64encode(buf.getvalue()).decode()
def generate_daily_report() -> str:
"""生成今日报告 HTML 文件,返回文件路径"""
session = get_session()
today = date.today()
articles = (session.query(Article)
.filter(Article.processed == True)
.filter(Article.published >= datetime.combine(today, datetime.min.time()))
.order_by(Article.score.desc())
.all())
session.close()
chart_b64 = make_chart_base64(articles) if articles else ""
high_priority = [a for a in articles if a.score >= 4]
env = Environment(loader=FileSystemLoader("reporters/templates"))
tmpl = env.get_template("daily_report.html")
html = tmpl.render(
date=today.strftime("%Y年%m月%d日"),
total=len(articles),
high_count=len(high_priority),
articles=articles,
chart_b64=chart_b64,
)
os.makedirs(settings.report_output_dir, exist_ok=True)
path = os.path.join(settings.report_output_dir, f"report_{today}.html")
with open(path, "w", encoding="utf-8") as f:
f.write(html)
return path
对应的 Jinja2 模板(核心片段,展示结构):
reporters/templates/daily_report.html 报告 HTML 模板
<!-- Jinja2 模板结构示例 -->
<h1>AI 自动化助理 · 每日简报 · {{"{{"}}date{{"}}"}}</h1>
<p>今日抓取 <strong>{{"{{"}}total{{"}}"}}</strong> 篇,其中重要 {{"{{"}}high_count{{"}}"}} 篇</p>
{% if chart_b64 %}
<img src="data:image/png;base64,{{"{{"}}chart_b64{{"}}"}}" alt="来源分布">
{% endif %}
{% for art in articles %}
<div class="article {% if art.score >= 4 %}high{% endif %}">
<span class="score">{{"{{"}}art.score{{"}}"}}</span>
<span class="tag">{{"{{"}}art.category{{"}}"}}</span>
<a href="{{"{{"}}art.url{{"}}"}}">{{"{{"}}art.title{{"}}"}}</a>
<p>{{"{{"}}art.summary{{"}}"}}</p>
</div>
{% endfor %}
第五模块:通知推送层
通知层使用策略模式:定义抽象基类,飞书和邮件各实现一个具体类。调度器只依赖抽象接口,可以随时新增 Telegram、钉钉等渠道。
notifiers/base.py + feishu.py + email_notifier.py 统一通知接口
# notifiers/base.py
from abc import ABC, abstractmethod
class BaseNotifier(ABC):
@abstractmethod
def send(self, title: str, summary: str, report_url: str) -> bool:
"""发送通知,返回是否成功"""
...
# notifiers/feishu.py
import requests
from .base import BaseNotifier
from config.settings import settings
class FeishuNotifier(BaseNotifier):
def send(self, title: str, summary: str, report_url: str) -> bool:
if not settings.feishu_webhook_url:
return False
payload = {
"msg_type": "interactive",
"card": {
"header": {"title": {"tag": "plain_text", "content": title}},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": summary}},
{"tag": "action", "actions": [{
"tag": "button", "text": {"tag": "plain_text", "content": "查看完整报告"},
"url": report_url, "type": "primary"
}]}
]
}
}
r = requests.post(settings.feishu_webhook_url, json=payload, timeout=10)
return r.status_code == 200
# notifiers/email_notifier.py
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from .base import BaseNotifier
from config.settings import settings
class EmailNotifier(BaseNotifier):
def send(self, title: str, summary: str, report_url: str) -> bool:
if not settings.smtp_user or not settings.report_recipients:
return False
msg = MIMEMultipart("alternative")
msg["Subject"] = title
msg["From"] = settings.smtp_user
msg["To"] = ", ".join(settings.report_recipients)
html = f"<h2>{title}</h2><pre>{summary}</pre><a href='{report_url}'>查看完整报告</a>"
msg.attach(MIMEText(html, "html", "utf-8"))
try:
with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as s:
s.starttls()
s.login(settings.smtp_user, settings.smtp_password)
s.sendmail(settings.smtp_user, settings.report_recipients, msg.as_string())
return True
except Exception as e:
return False
第六模块:定时调度与主入口
调度器是整个系统的指挥中心,负责按时序串联所有模块,并在任何模块失败时发送告警。
scheduler.py APScheduler 主调度
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from database.models import init_db
from scrapers.rss_scraper import scrape_all_feeds
from processors.ai_processor import process_pending_articles
from reporters.html_reporter import generate_daily_report
from notifiers.feishu import FeishuNotifier
from notifiers.email_notifier import EmailNotifier
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler("logs/assistant.log"),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
def run_daily_pipeline():
"""完整日报流程:采集 logger.info("=== 日报流程开始 ===")
try:
new_count = scrape_all_feeds()
logger.info(f"采集完成,新增 {new_count} 篇")
ok_count = process_pending_articles(limit=30)
logger.info(f"AI 处理完成,{ok_count} 篇")
report_path = generate_daily_report()
logger.info(f"报告已生成:{report_path}")
# 汇总推送
title = f"AI 助理日报 · 今日 {new_count} 条资讯"
summary = f"AI 处理 {ok_count} 篇,报告已就绪。"
url = f"https://your-server.com/{report_path}"
for notifier in [FeishuNotifier(), EmailNotifier()]:
notifier.send(title, summary, url)
logger.info("=== 日报流程完成 ===")
except Exception as e:
logger.exception(f"日报流程异常: {e}")
FeishuNotifier().send("助理系统告警", f"日报流程异常:{e}", "")
if __name__ == "__main__":
init_db()
scheduler = BlockingScheduler(timezone="Asia/Shanghai")
# 每天 08:00 运行完整流程
scheduler.add_job(run_daily_pipeline, CronTrigger(hour=8, minute=0))
# 每天 20:00 补充一次下午的新内容
scheduler.add_job(run_daily_pipeline, CronTrigger(hour=20, minute=0))
logger.info("调度器启动,等待执行任务...")
scheduler.start()
cli.py Click CLI 手动触发与管理
import click
from database.models import init_db, get_session, Article
from scrapers.rss_scraper import scrape_all_feeds
from processors.ai_processor import process_pending_articles
from reporters.html_reporter import generate_daily_report
@click.group()
def cli():
"""个人 AI 自动化助理 CLI"""
pass
@cli.command()
def setup():
"""初始化数据库"""
init_db()
click.echo("数据库初始化完成")
@cli.command()
def run():
"""手动触发一次完整日报流程"""
from scheduler import run_daily_pipeline
run_daily_pipeline()
@cli.command()
@click.option("--limit", default=10, help="显示最近 N 篇文章")
def status(limit):
"""查看数据库中最新文章状态"""
session = get_session()
articles = (session.query(Article)
.order_by(Article.created_at.desc())
.limit(limit).all())
for a in articles:
flag = "[AI]" if a.processed else "[ ]"
score = f"★{a.score:.0f}" if a.score else " "
click.echo(f"{flag} {score} {a.title[:60]}")
session.close()
if __name__ == "__main__":
cli()
部署指南
方式一:本地运行(开发调试)
终端命令 本地快速启动
# 创建虚拟环境
python -m venv .venv && source .venv/bin/activate # Windows: .venv\Scripts\activate
# 安装依赖
pip install feedparser requests beautifulsoup4 anthropic \
sqlalchemy jinja2 matplotlib apscheduler click \
pydantic-settings python-dotenv
# 复制并填写配置
cp .env.example .env # 编辑填入真实 API Key
# 初始化数据库
python cli.py setup
# 手动运行一次测试
python cli.py run
# 启动定时调度(长期运行)
python scheduler.py
方式二:VPS 部署(systemd 服务)
推荐用 systemd 在 Linux VPS 上管理进程,系统重启后自动恢复:
/etc/systemd/system/ai-assistant.service systemd 服务配置
[Unit]
Description=Personal AI Automation Assistant
After=network.target
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/auto-assistant
ExecStart=/home/ubuntu/auto-assistant/.venv/bin/python scheduler.py
Restart=always
RestartSec=10
Environment="PYTHONUNBUFFERED=1"
EnvironmentFile=/home/ubuntu/auto-assistant/.env
[Install]
WantedBy=multi-user.target
终端命令 启用并管理服务
sudo systemctl daemon-reload
sudo systemctl enable ai-assistant # 开机自启
sudo systemctl start ai-assistant # 立即启动
sudo systemctl status ai-assistant # 查看状态
journalctl -u ai-assistant -f # 实时查看日志
方式三:Docker 容器化
Dockerfile 容器化配置
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 安装中文字体(matplotlib 需要)
RUN apt-get update && apt-get install -y fonts-wqy-zenhei && rm -rf /var/lib/apt/lists/*
COPY . .
RUN mkdir -p data logs reports
CMD ["python", "scheduler.py"]
终端命令 Docker 运行
# 构建镜像
docker build -t ai-assistant .
# 运行(挂载数据目录,注入环境变量)
docker run -d \
--name ai-assistant \
--restart unless-stopped \
--env-file .env \
-v $(pwd)/data:/app/data \
-v $(pwd)/reports:/app/reports \
-v $(pwd)/logs:/app/logs \
ai-assistant
**VPS 选择建议:**这个助理对计算资源要求很低(主要是网络请求和 API 调用),2核1GB 内存的入门 VPS 完全够用。Hetzner、Vultr、DigitalOcean 都有月费 $4-6 的入门机型。国内用户也可以选择腾讯云轻量应用服务器。
全书总结与延伸学习
本书技术体系一览
📁
文件自动化
pathlib · shutil · watchdog
📊
数据处理
openpyxl · pandas · python-docx
📷
PDF 处理
PyMuPDF · pdfplumber
🌐
网络采集
requests · Playwright · feedparser
💌
通知推送
smtplib · Webhook · Telegram
🤖
AI 集成
Anthropic · OpenAI · API
📈
数据可视化
matplotlib · plotly · Jinja2
🕐
工程化
APScheduler · Click · Docker
下一步学习路径
完成本书后,你已经掌握了 Python 自动化的完整工具链。如果想继续深入,这里是三条清晰的进阶路径:
| 方向 | 学什么 | 能做什么 |
|---|---|---|
| 后端开发 | FastAPI、PostgreSQL、Redis、JWT 认证 | 把脚本变成 Web API,供前端或移动端调用;构建 SaaS 工具 |
| 数据工程 | Airflow/Prefect、Spark、数据仓库(BigQuery/Snowflake) | 处理亿级数据量;构建企业级数据管道 |
| AI 工程 | LangChain/LlamaIndex、RAG、向量数据库、Fine-tuning | 构建知识库问答系统;定制专属 AI 助手;Agent 系统 |
推荐资源
- **官方文档优先:**Python 标准库文档(docs.python.org)写得极好,遇到陌生模块直接看官方示例
- **Real Python(realpython.com):**高质量教程,覆盖本书所有相关主题的进阶内容
- **Anthropic Cookbook(github.com/anthropics/anthropic-cookbook):**Claude API 最佳实践,含 RAG、结构化输出等高级用法
- **测试习惯:**学完本书后,建议系统学习 pytest——为你的自动化脚本写测试是走向工程化的关键一步
全书完结
从第1章配置环境,到第16章部署完整 AI 助理——你已经走完了 Python 自动化的完整旅程。 代码是工具,问题意识才是核心竞争力。带着这套工具集,去解决真实世界里让你头疼的重复劳动吧。
上一章