第 15 章

定时任务与 CLI 工具——让脚本成为专业工具

第15章:定时任务与 CLI 工具——让脚本成为真正的生产工具

写好了自动化脚本只是第一步。让它在正确的时间自动运行、以专业的命令行界面供他人使用、打包成可分发的可执行文件——这才是把脚本从"个人小工具"变成"团队生产工具"的关键。本章覆盖四个层次:定时调度(APScheduler / cron)、CLI 框架(Click / Typer)、脚本部署(PyInstaller / Docker),以及一个整合所有技术的完整 CLI 项目。

定时任务方案对比与选型

方案 适用平台 适用场景 持久化 上手难度
cron Linux / macOS 服务器定时脚本,长期稳定运行 系统级
Windows 任务计划 Windows Windows 服务器或开发机上的定时任务 系统级
schedule 库 全平台 简单场景,脚本内嵌调度,不需要持久化 极低
APScheduler 全平台 复杂调度逻辑,需要持久化、错误处理、监控 可选

选型建议:

APScheduler 精讲

APScheduler(Advanced Python Scheduler)是功能最完整的 Python 调度库,支持多种触发器、持久化存储和完善的错误处理机制。

pip install apscheduler sqlalchemy

三种触发器

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)

def sync_data():
    print(f"[{datetime.now():%H:%M:%S}] 同步数据中...")

def generate_report():
    print(f"[{datetime.now():%H:%M:%S}] 生成月报...")

def send_one_time_alert():
    print("一次性告警已发送")

scheduler = BlockingScheduler(timezone='Asia/Shanghai')

# IntervalTrigger:固定间隔执行
scheduler.add_job(
    sync_data,
    trigger=IntervalTrigger(minutes=30),   # 每30分钟
    id='sync_job',
    name='数据同步',
    replace_existing=True
)

# CronTrigger:类 cron 表达式,精确控制执行时间
scheduler.add_job(
    generate_report,
    trigger=CronTrigger(
        day_of_week='mon',   # 每周一
        hour=8,              # 早上8点
        minute=0,
        timezone='Asia/Shanghai'
    ),
    id='report_job',
    name='周报生成'
)

# DateTrigger:在指定时刻执行一次
run_at = datetime.now() + timedelta(minutes=10)
scheduler.add_job(
    send_one_time_alert,
    trigger=DateTrigger(run_date=run_at),
    id='alert_job'
)

print("调度器已启动,按 Ctrl+C 停止")
scheduler.start()

持久化存储(SQLAlchemy 后端)

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor

# 配置持久化:任务状态保存到 SQLite(或 PostgreSQL / MySQL)
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}
executors = {
    'default': ThreadPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': True,       # 如果积压了多次,合并为一次执行
    'max_instances': 1,     # 同一任务同时只能有1个实例运行
    'misfire_grace_time': 300  # 错过执行时间后,5分钟内仍可补执行
}

scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    timezone='Asia/Shanghai'
)
scheduler.start()

# 任务会被持久化到 jobs.db
# 下次程序重启后,已注册的任务自动恢复,不需要重新 add_job

错误处理与任务监听

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

def job_listener(event):
    """监听所有任务事件"""
    if event.exception:
        print(f"任务 {event.job_id} 执行失败:{event.exception}")
        # 接入告警通知(见第11章)
    elif hasattr(event, 'retval'):
        print(f"任务 {event.job_id} 执行成功")
    else:
        # EVENT_JOB_MISSED:任务错过了执行时间
        print(f"任务 {event.job_id} 错过了计划执行时间!")

scheduler.add_listener(
    job_listener,
    EVENT_JOB_ERROR | EVENT_JOB_EXECUTED | EVENT_JOB_MISSED
)

# 带重试的任务函数
def flaky_task():
    """模拟可能失败的任务"""
    import random
    if random.random() < 0.3:
        raise ConnectionError("网络抖动,稍后重试")
    print("任务成功")

企业级定时任务框架模板

"""
企业级调度框架:集中管理所有定时任务
- 统一配置触发器
- 统一错误处理和告警
- 支持动态增删任务
"""
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED
from apscheduler.triggers.cron import CronTrigger
import logging, signal, time

logger = logging.getLogger(__name__)

class TaskScheduler:
    def __init__(self, db_url: str = 'sqlite:///scheduler.db'):
        self.scheduler = BackgroundScheduler(
            jobstores={'default': SQLAlchemyJobStore(url=db_url)},
            job_defaults={'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 60},
            timezone='Asia/Shanghai'
        )
        self.scheduler.add_listener(self._on_error, EVENT_JOB_ERROR | EVENT_JOB_MISSED)

    def _on_error(self, event):
        if event.exception:
            logger.error(f"Job {event.job_id} FAILED: {event.exception}", exc_info=True)
        else:
            logger.warning(f"Job {event.job_id} MISSED its scheduled run")

    def register(self, func, cron_expr: str, job_id: str):
        """注册一个 cron 定时任务"""
        minute, hour, day, month, dow = cron_expr.split()
        self.scheduler.add_job(
            func,
            trigger=CronTrigger(
                minute=minute, hour=hour,
                day=day, month=month, day_of_week=dow
            ),
            id=job_id,
            replace_existing=True
        )
        logger.info(f"Registered job: {job_id} ({cron_expr})")

    def start(self):
        self.scheduler.start()
        logger.info("Scheduler started")

        # 优雅关闭:处理 SIGTERM / SIGINT
        def shutdown(signum, frame):
            logger.info("Shutting down scheduler...")
            self.scheduler.shutdown(wait=True)
            exit(0)

        signal.signal(signal.SIGTERM, shutdown)
        signal.signal(signal.SIGINT, shutdown)
        while True:
            time.sleep(1)

# 使用示例
if __name__ == '__main__':
    from tasks import sync_data, generate_report, cleanup_old_files

    ts = TaskScheduler()
    ts.register(sync_data,         '0 */2 * * *',  'sync')      # 每2小时
    ts.register(generate_report,   '0 8 * * 1',    'report')    # 周一08:00
    ts.register(cleanup_old_files, '30 2 * * *',   'cleanup')   # 每天02:30
    ts.start()

cron 实战

crontab 语法详解

# crontab 格式:分 时 日 月 周 命令
# *  *  *  *  *
# |  |  |  |  └─ 周几 (0-7,0和7都是周日)
# |  |  |  └──── 月份 (1-12)
# |  |  └─────── 日期 (1-31)
# |  └────────── 小时 (0-23)
# └───────────── 分钟 (0-59)

# 常用表达式
# 0 8 * * 1-5       周一到周五早上8点
# */30 * * * *      每30分钟
# 0 0 1 * *         每月1日午夜
# 0 8,12,18 * * *   每天8点、12点、18点
# 0 9 * * 1         每周一9点

编辑 crontab:

# 编辑当前用户的 crontab
crontab -e

# 查看当前 crontab 内容
crontab -l

# 示例条目:每天上午9点运行数据同步脚本,日志保存到文件
0 9 * * * /usr/bin/python3 /home/user/sync.py >> /var/log/sync.log 2>&1

cron 三大常见坑: 1. 环境变量:cron 运行在最小化环境中,PATH 和你终端里不同。脚本里用到的命令都要写绝对路径(如 /usr/bin/python3),或在 crontab 顶部设置 PATH=/usr/local/bin:/usr/bin:/bin。 2. 工作目录:cron 的工作目录默认是用户 home 目录,不是脚本所在目录。如果脚本用相对路径读写文件,需要在命令里先 cd /path/to/script && 再运行。 3. Python 路径:虚拟环境(venv)需要用 venv 里的 python,如 /home/user/myproject/venv/bin/python,不能用系统 python。

# 正确的 crontab 条目示例(使用 venv)
0 9 * * 1-5 cd /home/user/myproject && /home/user/myproject/venv/bin/python sync.py >> logs/sync.log 2>&1

# 日志重定向说明:
# >> logs/sync.log    标准输出追加到日志文件
# 2>&1               标准错误也重定向到同一文件
# 如果不重定向,cron 会尝试发邮件给用户,在没有 MTA 的服务器上会静默丢失

Click 构建 CLI 工具

Click 是 Python 生态中最成熟的 CLI 框架,被 Flask 团队开发,设计理念清晰,文档完善。

pip install click

基础用法:命令、选项、参数

import click

@click.command()
@click.option('--name', '-n', default='World', help='要问候的名字')
@click.option('--count', '-c', default=1, type=int, help='重复次数')
@click.option('--upper', is_flag=True, help='转为大写')
@click.argument('message')
def greet(name, count, upper, message):
    """问候程序 - 演示 Click 基础用法"""
    text = f"Hello {name}: {message}"
    if upper:
        text = text.upper()
    for _ in range(count):
        click.echo(text)

if __name__ == '__main__':
    greet()

# 运行示例:
# python app.py --name Alice --count 3 --upper "Good morning"
# python app.py -n Bob "Hello"

子命令(Group)

import click

@click.group()
@click.option('--verbose', '-v', is_flag=True, help='显示详细日志')
@click.pass_context
def cli(ctx, verbose):
    """数据处理工具集"""
    ctx.ensure_object(dict)
    ctx.obj['verbose'] = verbose

@cli.command()
@click.option('--source', '-s', required=True, help='数据源 URL 或路径')
@click.option('--output', '-o', default='./data', help='输出目录')
@click.pass_context
def fetch(ctx, source, output):
    """从指定源拉取数据"""
    verbose = ctx.obj['verbose']
    if verbose:
        click.echo(f"从 {source} 拉取数据到 {output}...")
    # 实际拉取逻辑...
    click.secho("数据拉取完成", fg='green')

@cli.command()
@click.argument('input_file')
@click.option('--format', '-f', type=click.Choice(['csv', 'json', 'parquet']),
              default='csv', help='输出格式')
def process(input_file, format):
    """处理原始数据"""
    click.echo(f"处理 {input_file} -> {format} 格式")

@cli.command()
@click.option('--email', '-e', multiple=True, help='接收报告的邮件地址(可多个)')
def report(email):
    """生成并发送报告"""
    if email:
        click.echo(f"发送报告到:{', '.join(email)}")
    else:
        click.echo("生成本地报告...")

if __name__ == '__main__':
    cli()

# 运行示例:
# python tool.py --verbose fetch --source https://api.example.com/data --output ./raw
# python tool.py process data.csv --format json
# python tool.py report --email [email protected] --email [email protected]

进度条

import click
import time

@click.command()
@click.argument('files', nargs=-1, type=click.Path(exists=True))
def process_files(files):
    """批量处理文件"""
    with click.progressbar(files, label='处理进度', show_pos=True) as bar:
        for f in bar:
            time.sleep(0.5)  # 模拟处理
            # 实际处理逻辑...

    click.secho(f"\n完成!共处理 {len(files)} 个文件", fg='green')

Typer:现代 CLI 框架

Typer 是 FastAPI 作者开发的 CLI 框架,基于 Python 类型注解自动生成帮助文档,代码更简洁。

pip install typer[all]
import typer
from pathlib import Path
from typing import Optional
from enum import Enum

app = typer.Typer(help="数据处理工具 - 用 Typer 构建")

class OutputFormat(str, Enum):
    csv = "csv"
    json = "json"
    parquet = "parquet"

@app.command()
def fetch(
    source: str = typer.Option(..., "--source", "-s", help="数据源"),
    output: Path = typer.Option(Path("./data"), "--output", "-o", help="输出目录"),
    verbose: bool = typer.Option(False, "--verbose", "-v"),
):
    """从指定源拉取数据"""
    if verbose:
        typer.echo(f"Source: {source}")
    output.mkdir(parents=True, exist_ok=True)
    typer.secho("Done!", fg=typer.colors.GREEN)

@app.command()
def process(
    input_file: Path = typer.Argument(..., help="输入文件路径"),
    fmt: OutputFormat = typer.Option(OutputFormat.csv, "--format", "-f"),
):
    """处理原始数据"""
    if not input_file.exists():
        typer.secho(f"文件不存在:{input_file}", fg=typer.colors.RED, err=True)
        raise typer.Exit(code=1)
    typer.echo(f"处理 {input_file} -> {fmt.value}")

if __name__ == "__main__":
    app()

Click vs Typer 选型建议

维度 Click Typer
代码风格 装饰器驱动,显式声明 类型注解驱动,更简洁
学习曲线 稍陡,概念多 平缓,Python 3.6+ 类型注解即可
生态成熟度 极成熟,社区大 较新,但快速增长
复杂场景 更灵活,支持自定义类型 复杂场景需要回退到 Click API
推荐场景 团队项目、复杂 CLI 个人项目、快速原型

脚本部署

打包为可执行文件(PyInstaller)

pip install pyinstaller

# 基础打包:生成单个可执行文件
pyinstaller --onefile your_script.py

# 结果在 dist/ 目录下,可直接分发,无需对方安装 Python

# 完整选项示例
pyinstaller \
  --onefile \                       # 打包成单文件
  --name "datatool" \               # 可执行文件名称
  --icon icon.ico \                 # 图标(Windows)
  --add-data "config.yaml:." \      # 随包带入配置文件
  --hidden-import pandas \          # 手动声明动态导入的包
  your_script.py

**PyInstaller 注意事项:**打包后的文件体积较大(通常 30-80MB),因为内嵌了 Python 解释器和所有依赖。Windows 打包的可执行文件只能在 Windows 上运行,不能跨平台。建议在与目标机器相同的操作系统上打包。

Docker 容器化运行

# Dockerfile 示例:把 Python 脚本容器化
FROM python:3.12-slim

WORKDIR /app

# 先复制依赖文件(利用 Docker 缓存层)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 再复制代码
COPY . .

# 默认命令(可被 docker run 覆盖)
CMD ["python", "main.py"]
# 构建和运行
docker build -t datatool:latest .
docker run datatool:latest
docker run datatool:latest python main.py fetch --source https://api.example.com

# 挂载本地目录(用于读写数据文件)
docker run -v $(pwd)/data:/app/data datatool:latest python main.py process /app/data/input.csv

# 结合 cron:用 docker run 替代 python 命令写入 crontab
0 9 * * * docker run --rm datatool:latest >> /var/log/datatool.log 2>&1

实战项目:完整的数据处理 CLI 工具

把前面学到的所有技术整合为一个生产级 CLI 工具,支持三个子命令(fetch / process / report),读取 .env 配置,包含完整的错误处理。

项目结构

datatool/
  ├── main.py          # CLI 入口(约120行)
  ├── .env             # 环境变量(不提交 git)
  ├── .env.example     # 配置模板(提交 git)
  ├── requirements.txt
  └── Dockerfile

.env.example

API_URL=https://api.example.com
API_KEY=your_api_key_here
DB_URL=sqlite:///data.db
OUTPUT_DIR=./output
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
[email protected]
SMTP_PASS=your_app_password

main.py(完整实现,约120行)

"""
datatool - 数据处理 CLI 工具
支持:fetch(拉取)/ process(处理)/ report(报告)

依赖:click, pydantic-settings, requests, pandas, python-dotenv
"""
from __future__ import annotations
import sys
import json
import click
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from pydantic_settings import BaseSettings, SettingsConfigDict

# ── 配置管理(pydantic-settings 自动读取 .env)────────────────────────────
class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')

    api_url: str = 'https://httpbin.org/json'
    api_key: str = ''
    db_url: str = 'sqlite:///data.db'
    output_dir: Path = Path('./output')

cfg = Settings()

# ── CLI 定义 ───────────────────────────────────────────────────────────────
@click.group()
@click.version_option('1.0.0')
@click.option('--verbose', '-v', is_flag=True, envvar='DATATOOL_VERBOSE',
              help='显示详细日志(也可设环境变量 DATATOOL_VERBOSE=1)')
@click.pass_context
def cli(ctx: click.Context, verbose: bool):
    """datatool - 企业级数据处理 CLI

    \b
    典型工作流:
      datatool fetch --date 2024-01-15
      datatool process ./output/raw_2024-01-15.json --format csv
      datatool report --email [email protected]
    """
    ctx.ensure_object(dict)
    ctx.obj['verbose'] = verbose
    ctx.obj['cfg'] = cfg

# ── fetch 命令 ─────────────────────────────────────────────────────────────
@cli.command()
@click.option('--date', '-d', default=datetime.today().strftime('%Y-%m-%d'),
              help='数据日期,格式 YYYY-MM-DD(默认今天)')
@click.option('--source', '-s', default=None, help='覆盖配置文件中的 API_URL')
@click.option('--dry-run', is_flag=True, help='仅打印请求信息,不实际执行')
@click.pass_context
def fetch(ctx: click.Context, date: str, source: str | None, dry_run: bool):
    """从 API 拉取指定日期的数据并保存到本地"""
    c: Settings = ctx.obj['cfg']
    verbose: bool = ctx.obj['verbose']

    url = source or c.api_url
    params = {'date': date, 'apikey': c.api_key}

    if dry_run:
        click.echo(f"[DRY RUN] GET {url}")
        click.echo(f"[DRY RUN] params = {params}")
        return

    if verbose:
        click.echo(f"正在请求:{url}?date={date}")

    try:
        resp = requests.get(url, params=params, timeout=30)
        resp.raise_for_status()
    except requests.RequestException as e:
        click.secho(f"请求失败:{e}", fg='red', err=True)
        sys.exit(1)

    # 保存原始数据
    c.output_dir.mkdir(parents=True, exist_ok=True)
    out_file = c.output_dir / f"raw_{date}.json"
    out_file.write_text(json.dumps(resp.json(), ensure_ascii=False, indent=2), encoding='utf-8')
    click.secho(f"已保存:{out_file}", fg='green')

# ── process 命令 ───────────────────────────────────────────────────────────
@cli.command()
@click.argument('input_file', type=click.Path(exists=True, path_type=Path))
@click.option('--format', '-f', 'fmt',
              type=click.Choice(['csv', 'json', 'parquet']), default='csv',
              help='输出格式(默认 csv)')
@click.option('--output', '-o', default=None, type=click.Path(path_type=Path),
              help='输出路径(默认同目录,自动命名)')
@click.pass_context
def process(ctx: click.Context, input_file: Path, fmt: str, output: Path | None):
    """处理原始 JSON 数据并转换为指定格式"""
    verbose: bool = ctx.obj['verbose']

    if verbose:
        click.echo(f"读取:{input_file}")

    try:
        raw = json.loads(input_file.read_text(encoding='utf-8'))
    except (json.JSONDecodeError, IOError) as e:
        click.secho(f"读取文件失败:{e}", fg='red', err=True)
        sys.exit(1)

    # 标准化数据:假设 raw 是列表或包含 data 键的字典
    rows = raw if isinstance(raw, list) else raw.get('data', [raw])
    df = pd.json_normalize(rows)

    if verbose:
        click.echo(f"共 {len(df)} 行,{len(df.columns)} 列")

    # 确定输出路径
    if output is None:
        stem = input_file.stem.replace('raw_', 'processed_')
        output = input_file.parent / f"{stem}.{fmt}"

    # 保存
    if fmt == 'csv':
        df.to_csv(output, index=False, encoding='utf-8-sig')
    elif fmt == 'json':
        df.to_json(output, orient='records', force_ascii=False, indent=2)
    elif fmt == 'parquet':
        df.to_parquet(output, index=False)

    click.secho(f"已保存:{output}({len(df)} 行)", fg='green')

# ── report 命令 ────────────────────────────────────────────────────────────
@cli.command()
@click.option('--date', '-d', default=datetime.today().strftime('%Y-%m-%d'),
              help='报告日期(默认今天)')
@click.option('--email', '-e', multiple=True, metavar='EMAIL',
              help='发送报告的邮箱(可多次指定)')
@click.option('--open-file', 'open_report', is_flag=True,
              help='生成后自动打开报告文件')
@click.pass_context
def report(ctx: click.Context, date: str, email: tuple, open_report: bool):
    """生成数据报告,可选发送邮件"""
    c: Settings = ctx.obj['cfg']

    # 查找已处理的数据文件
    processed_files = list(c.output_dir.glob(f"processed_{date}.*"))
    if not processed_files:
        click.secho(f"未找到 {date} 的已处理数据,请先运行 process 命令", fg='yellow')
        sys.exit(1)

    data_file = processed_files[0]
    df = pd.read_csv(data_file) if data_file.suffix == '.csv' else pd.read_json(data_file)

    # 生成简单摘要报告
    report_path = c.output_dir / f"report_{date}.txt"
    summary_lines = [
        f"=== 数据报告 {date} ===",
        f"数据行数:{len(df)}",
        f"数据列数:{len(df.columns)}",
        f"生成时间:{datetime.now():%Y-%m-%d %H:%M:%S}",
        "",
        "数值列统计:",
        df.describe().to_string(),
    ]
    report_path.write_text('\n'.join(summary_lines), encoding='utf-8')
    click.secho(f"报告已生成:{report_path}", fg='green')

    if email:
        click.echo(f"发送报告到:{', '.join(email)}")
        # 实际发送逻辑接入第10章 Email 模块

    if open_report:
        import subprocess, platform
        cmd = 'open' if platform.system() == 'Darwin' else 'xdg-open'
        subprocess.run([cmd, str(report_path)], check=False)

if __name__ == '__main__':
    cli()
# 典型使用流程
python main.py --help
python main.py fetch --help

# 拉取今天数据(dry-run 模式预览)
python main.py --verbose fetch --dry-run

# 拉取指定日期数据
python main.py fetch --date 2024-01-15

# 处理数据(转为 CSV)
python main.py process ./output/raw_2024-01-15.json --format csv

# 生成报告并发送邮件
python main.py report --date 2024-01-15 --email [email protected]

**生产化建议:**把 datatool fetch && datatool process ... && datatool report 这三个命令写成一个 shell 脚本或 Makefile target,再用 cron 定时调度,就形成了一个完整的每日数据处理流水线。后续可以把 CLI 工具容器化,放到服务器上用 Docker + cron 运行,彻底摆脱本地环境依赖。

上一章

下一章
第16章:综合实战项目
本章评分
4.8  / 5  (16 评分)

💬 留言讨论