Chapter 16

Final Project — Building a Complete Automation System

Chapter 16: Final Project — Build Your Personal AI Automation Hub

This is the final chapter of the book. Rather than learning a single technique, we integrate everything from the previous 15 chapters into a real, server-deployable Personal AI Automation Hub: it scrapes information you care about on a schedule, summarizes and classifies it using Claude AI, generates a visual HTML report, and pushes it to Feishu and email. The code here is ready to run.

Project Goals and Architecture

What This System Does

Suppose you need to track industry news, updates from specific websites, and competitor activity. Right now you check them manually — 30 to 60 minutes every day. This assistant takes over completely:

Project Directory Structure

auto-assistant/ ├── config/ │ ├── init.py │ └── settings.py # pydantic-settings configuration ├── scrapers/ │ ├── init.py │ ├── rss_scraper.py # RSS feed scraper │ └── web_scraper.py # targeted web scraper ├── processors/ │ ├── init.py │ └── ai_processor.py # Claude AI summarization and tagging ├── reporters/ │ ├── init.py │ ├── html_reporter.py # HTML report generator │ └── templates/ │ └── daily_report.html ├── notifiers/ │ ├── init.py │ ├── base.py # abstract Notifier interface │ ├── feishu.py # Feishu Webhook │ └── email_notifier.py # SMTP email ├── database/ │ ├── init.py │ └── models.py # SQLAlchemy models ├── scheduler.py # APScheduler main scheduler ├── cli.py # Click CLI entry point ├── requirements.txt └── .env # secrets (never commit to git)

Technology Stack

Layer Technology Chapter
Data collection feedparser, requests, BeautifulSoup Ch. 9
Persistence SQLite + SQLAlchemy Ch. 2 (extended)
AI processing Anthropic Claude API Ch. 12
Report generation Jinja2 + matplotlib Ch. 13
Notifications Feishu Webhook, smtplib Ch. 10, 11
Scheduling APScheduler Ch. 15
Configuration pydantic-settings + python-dotenv Ch. 15
CLI tooling Click Ch. 15

Module 1: Configuration System

The configuration system is the foundation of the entire project. Using pydantic-settings, all settings are read from a .env file with type validation, IDE auto-completion, and easy deployment via environment variable substitution.

config/settings.py pydantic-settings configuration

from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field
from typing import List

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # AI
    anthropic_api_key: str = Field(..., description="Claude API Key")
    ai_model: str = "claude-3-5-haiku-20241022"
    ai_max_tokens: int = 300

    # Database
    db_path: str = "data/assistant.db"

    # Notifications
    feishu_webhook_url: str = ""
    smtp_host: str = "smtp.gmail.com"
    smtp_port: int = 587
    smtp_user: str = ""
    smtp_password: str = ""
    report_recipients: List[str] = []

    # Scraping
    rss_feeds: List[str] = [
        "https://feeds.feedburner.com/PythonInsider",
        "https://realpython.com/atom.xml",
    ]
    max_items_per_feed: int = 10

    # Reports
    report_output_dir: str = "reports"

settings = Settings()

Why pydantic-settings over os.environ: pydantic-settings validates all required configuration at startup. A missing API key raises an error immediately rather than crashing halfway through a scheduled task. Type annotations also make your configuration intent explicit.

Module 2: Data Collection Layer

database/models.py SQLAlchemy ORM models

from sqlalchemy import create_engine, Column, Integer, String, Text, Float, DateTime, Boolean
from sqlalchemy.orm import DeclarativeBase, Session
from datetime import datetime
from config.settings import settings

class Base(DeclarativeBase):
    pass

class Article(Base):
    __tablename__ = "articles"

    id          = Column(Integer, primary_key=True)
    title       = Column(String(500), nullable=False)
    url         = Column(String(1000), unique=True, nullable=False)
    source      = Column(String(200))
    published   = Column(DateTime, default=datetime.utcnow)
    raw_content = Column(Text)
    summary     = Column(Text)
    score       = Column(Float)
    category    = Column(String(100))
    processed   = Column(Boolean, default=False)
    created_at  = Column(DateTime, default=datetime.utcnow)

engine = create_engine(f"sqlite:///{settings.db_path}", echo=False)

def init_db():
    import os
    os.makedirs(os.path.dirname(settings.db_path), exist_ok=True)
    Base.metadata.create_all(engine)

def get_session() -> Session:
    return Session(engine)

scrapers/rss_scraper.py feedparser multi-source scraper

import feedparser
import logging
from datetime import datetime
from typing import List, Dict
from config.settings import settings
from database.models import Article, get_session

logger = logging.getLogger(__name__)

def fetch_rss_feed(feed_url: str) -> List[Dict]:
    try:
        feed = feedparser.parse(feed_url)
        articles = []
        for entry in feed.entries[:settings.max_items_per_feed]:
            articles.append({
                "title":   entry.get("title", "").strip(),
                "url":     entry.get("link", ""),
                "source":  feed.feed.get("title", feed_url),
                "content": entry.get("summary", entry.get("description", "")),
                "published": datetime(*entry.published_parsed[:6])
                             if hasattr(entry, "published_parsed") and entry.published_parsed
                             else datetime.utcnow(),
            })
        logger.info(f"RSS {feed_url}: fetched {len(articles)} items")
        return articles
    except Exception as e:
        logger.error(f"RSS fetch failed {feed_url}: {e}")
        return []

def scrape_all_feeds() -> int:
    session = get_session()
    saved = 0
    for url in settings.rss_feeds:
        for art in fetch_rss_feed(url):
            if not art["url"]:
                continue
            if session.query(Article).filter_by(url=art["url"]).first():
                continue
            session.add(Article(
                title=art["title"], url=art["url"], source=art["source"],
                published=art["published"], raw_content=art["content"],
            ))
            saved += 1
    session.commit()
    session.close()
    return saved

Module 3: AI Processing Layer

processors/ai_processor.py Claude API batch processing

import anthropic
import json
import logging
from database.models import Article, get_session
from config.settings import settings

logger = logging.getLogger(__name__)
client = anthropic.Anthropic(api_key=settings.anthropic_api_key)

SYSTEM_PROMPT = """You are an information assistant processing tech/industry news.
For each article, output JSON only:
{"summary": "core summary under 60 words", "category": "Tech/Product/Industry/Policy/Other", "score": 3}
score is an integer 1-5, where 5 is most important. Output JSON only, no other text."""

def process_article(article: Article) -> bool:
    content = f"Title: {article.title}\n\nContent: {article.raw_content[:800]}"
    try:
        response = client.messages.create(
            model=settings.ai_model,
            max_tokens=settings.ai_max_tokens,
            system=SYSTEM_PROMPT,
            messages=[{"role": "user", "content": content}],
        )
        result = json.loads(response.content[0].text.strip())
        article.summary   = result.get("summary", "")
        article.category  = result.get("category", "Other")
        article.score     = float(result.get("score", 3))
        article.processed = True
        return True
    except Exception as e:
        logger.error(f"AI processing failed [{article.title[:30]}]: {e}")
        return False

def process_pending_articles(limit: int = 30) -> int:
    session = get_session()
    pending = (session.query(Article)
               .filter_by(processed=False)
               .order_by(Article.published.desc())
               .limit(limit).all())
    ok = sum(1 for art in pending if process_article(art))
    session.commit()
    session.close()
    return ok

Cost control: Processing one 800-word article with claude-3-5-haiku costs roughly $0.0004. At 30 articles per day, monthly cost is about $0.36. To reduce calls further, batch multiple articles into a single request separated by newlines and have Claude return an array of JSON objects.

Module 4: Report Generation

reporters/html_reporter.py Jinja2 + matplotlib report

import os, base64, io
from datetime import datetime, date
from jinja2 import Environment, FileSystemLoader
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from collections import Counter
from database.models import Article, get_session
from config.settings import settings

def make_chart_base64(articles) -> str:
    sources = Counter(a.source for a in articles)
    fig, ax = plt.subplots(figsize=(5, 4), facecolor="#1a1f35")
    ax.set_facecolor("#1a1f35")
    ax.pie(sources.values(), labels=sources.keys(), autopct="%1.0f%%",
           colors=["#6c63ff","#22c55e","#f59e0b","#3b82f6","#ef4444"],
           textprops={"color": "#e2e8f0", "fontsize": 9})
    ax.set_title("Source Distribution", color="#e2e8f0", fontsize=11)
    buf = io.BytesIO()
    plt.savefig(buf, format="png", bbox_inches="tight", dpi=110)
    plt.close(fig)
    return base64.b64encode(buf.getvalue()).decode()

def generate_daily_report() -> str:
    session = get_session()
    today = date.today()
    articles = (session.query(Article)
                .filter(Article.processed == True)
                .filter(Article.published >= datetime.combine(today, datetime.min.time()))
                .order_by(Article.score.desc()).all())
    session.close()
    chart_b64 = make_chart_base64(articles) if articles else ""
    env = Environment(loader=FileSystemLoader("reporters/templates"))
    html = env.get_template("daily_report.html").render(
        date=today.isoformat(), total=len(articles),
        high_count=sum(1 for a in articles if a.score >= 4),
        articles=articles, chart_b64=chart_b64,
    )
    os.makedirs(settings.report_output_dir, exist_ok=True)
    path = os.path.join(settings.report_output_dir, f"report_{today}.html")
    with open(path, "w", encoding="utf-8") as f:
        f.write(html)
    return path

Module 5: Notification Layer

notifiers/ Strategy pattern — abstract base + Feishu + Email

# notifiers/base.py
from abc import ABC, abstractmethod

class BaseNotifier(ABC):
    @abstractmethod
    def send(self, title: str, summary: str, report_url: str) -> bool: ...

# notifiers/feishu.py
import requests
from .base import BaseNotifier
from config.settings import settings

class FeishuNotifier(BaseNotifier):
    def send(self, title: str, summary: str, report_url: str) -> bool:
        if not settings.feishu_webhook_url:
            return False
        payload = {
            "msg_type": "interactive",
            "card": {
                "header": {"title": {"tag": "plain_text", "content": title}},
                "elements": [
                    {"tag": "div", "text": {"tag": "lark_md", "content": summary}},
                    {"tag": "action", "actions": [{"tag": "button",
                        "text": {"tag": "plain_text", "content": "View Full Report"},
                        "url": report_url, "type": "primary"}]}
                ]
            }
        }
        r = requests.post(settings.feishu_webhook_url, json=payload, timeout=10)
        return r.status_code == 200

# notifiers/email_notifier.py
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from .base import BaseNotifier
from config.settings import settings

class EmailNotifier(BaseNotifier):
    def send(self, title: str, summary: str, report_url: str) -> bool:
        if not settings.smtp_user or not settings.report_recipients:
            return False
        msg = MIMEMultipart("alternative")
        msg["Subject"] = title
        msg["From"] = settings.smtp_user
        msg["To"] = ", ".join(settings.report_recipients)
        html = f"<h2>{title}</h2><p>{summary}</p><a href='{report_url}'>View Report</a>"
        msg.attach(MIMEText(html, "html", "utf-8"))
        try:
            with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as s:
                s.starttls()
                s.login(settings.smtp_user, settings.smtp_password)
                s.sendmail(settings.smtp_user, settings.report_recipients, msg.as_string())
            return True
        except Exception:
            return False

Module 6: Scheduler and CLI

scheduler.py APScheduler main orchestrator

import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from database.models import init_db
from scrapers.rss_scraper import scrape_all_feeds
from processors.ai_processor import process_pending_articles
from reporters.html_reporter import generate_daily_report
from notifiers.feishu import FeishuNotifier
from notifiers.email_notifier import EmailNotifier

logging.basicConfig(level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[logging.FileHandler("logs/assistant.log"), logging.StreamHandler()])
logger = logging.getLogger(__name__)

def run_daily_pipeline():
    logger.info("=== Daily pipeline start ===")
    try:
        new_count = scrape_all_feeds()
        ok_count  = process_pending_articles(limit=30)
        path      = generate_daily_report()
        title     = f"AI Hub Report · {new_count} new items today"
        summary   = f"AI processed {ok_count} articles. Report ready."
        url       = f"https://your-server.com/{path}"
        for notifier in [FeishuNotifier(), EmailNotifier()]:
            notifier.send(title, summary, url)
        logger.info("=== Daily pipeline complete ===")
    except Exception as e:
        logger.exception(f"Pipeline error: {e}")
        FeishuNotifier().send("Assistant Alert", f"Pipeline failed: {e}", "")

if __name__ == "__main__":
    init_db()
    scheduler = BlockingScheduler(timezone="UTC")
    scheduler.add_job(run_daily_pipeline, CronTrigger(hour=8, minute=0))
    scheduler.add_job(run_daily_pipeline, CronTrigger(hour=20, minute=0))
    logger.info("Scheduler started. Waiting for jobs...")
    scheduler.start()

Deployment Guide

Option 1: Local Development

Terminal Local quickstart

python -m venv .venv && source .venv/bin/activate  # Windows: .venv\Scripts\activate

pip install feedparser requests beautifulsoup4 anthropic \
            sqlalchemy jinja2 matplotlib apscheduler click \
            pydantic-settings python-dotenv

cp .env.example .env  # fill in real API keys
python cli.py setup   # initialize database
python cli.py run     # test run once
python scheduler.py   # start long-running scheduler

Option 2: VPS with systemd

/etc/systemd/system/ai-assistant.service

[Unit]
Description=Personal AI Automation Hub
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/auto-assistant
ExecStart=/home/ubuntu/auto-assistant/.venv/bin/python scheduler.py
Restart=always
RestartSec=10
Environment="PYTHONUNBUFFERED=1"
EnvironmentFile=/home/ubuntu/auto-assistant/.env

[Install]
WantedBy=multi-user.target

Terminal Enable and manage the service

sudo systemctl daemon-reload
sudo systemctl enable ai-assistant
sudo systemctl start ai-assistant
sudo systemctl status ai-assistant
journalctl -u ai-assistant -f    # follow live logs

Option 3: Docker

Dockerfile + docker run

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN mkdir -p data logs reports
CMD ["python", "scheduler.py"]

# Build and run:
# docker build -t ai-assistant .
# docker run -d --name ai-assistant --restart unless-stopped \
#   --env-file .env \
#   -v $(pwd)/data:/app/data \
#   -v $(pwd)/reports:/app/reports \
#   ai-assistant

Book Summary and Next Steps

Complete Technology Map

&#128193;
File Automation
pathlib · shutil · watchdog

&#128202;
Data Processing
openpyxl · pandas · python-docx

&#128247;
PDF Handling
PyMuPDF · pdfplumber

&#127760;
Web Scraping
requests · Playwright · feedparser

&#128140;
Notifications
smtplib · Webhook · Telegram

&#129302;
AI Integration
Anthropic · OpenAI · API

&#128200;
Visualization
matplotlib · plotly · Jinja2

&#128336;
Engineering
APScheduler · Click · Docker

Where to Go Next

Direction What to Learn What You Can Build
Backend Development FastAPI, PostgreSQL, Redis, JWT auth Turn scripts into Web APIs; build SaaS tools
Data Engineering Airflow/Prefect, Spark, data warehouses Handle billion-row datasets; enterprise data pipelines
AI Engineering LangChain/LlamaIndex, RAG, vector databases, fine-tuning Knowledge-base Q&A; custom AI assistants; agent systems

The End

From configuring your environment in Chapter 1 to deploying a complete AI assistant in Chapter 16 — you have completed the full Python automation journey. Code is a tool. Problem awareness is the core skill. Take this toolkit and go eliminate the repetitive work that actually matters to you.

Previous
Rate this chapter
4.5  / 5  (14 ratings)

💬 Comments