Scheduler & CLI — Turning Scripts into Professional Tools
Chapter 15: Schedulers & CLI Tools — Turning Scripts into Production Tools
Writing an automation script is only the first step. Making it run automatically at the right time, exposing it through a professional command-line interface, and packaging it for distribution — these are what transform a personal script into a team production tool. This chapter covers four layers: task scheduling (APScheduler / cron), CLI frameworks (Click / Typer), deployment (PyInstaller / Docker), and a complete CLI project that ties it all together.
Scheduler Comparison and Selection
| Approach | Platform | Best For | Persistence | Complexity |
|---|---|---|---|---|
| cron | Linux / macOS | Server scripts, long-running stable schedules | OS-level | Low |
| Windows Task Scheduler | Windows | Scheduled tasks on Windows servers or dev machines | OS-level | Low |
| schedule library | All platforms | Simple cases, in-process scheduling, no persistence needed | None | Minimal |
| APScheduler | All platforms | Complex logic, persistence, error handling, monitoring | Optional | Medium |
Selection guidance:
- Long-running server script: Use cron (Linux) or Windows Task Scheduler. Simple, reliable, no Python daemon needed.
- Python long-lived process (e.g., embedded in a web app): Use APScheduler.
- Personal script or quick test: Use the schedule library — three lines of code.
- Distributed task queues: Consider Celery + Redis (out of scope here).
APScheduler Deep Dive
pip install apscheduler sqlalchemy
Three Trigger Types
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
def sync_data():
print(f"[{datetime.now():%H:%M:%S}] Syncing data...")
def generate_report():
print(f"[{datetime.now():%H:%M:%S}] Generating report...")
scheduler = BlockingScheduler(timezone='UTC')
# IntervalTrigger: fixed interval
scheduler.add_job(sync_data, IntervalTrigger(minutes=30),
id='sync', replace_existing=True)
# CronTrigger: cron-style schedule
scheduler.add_job(generate_report,
CronTrigger(day_of_week='mon', hour=8, minute=0),
id='report')
# DateTrigger: run once at a specific moment
scheduler.add_job(lambda: print("one-time alert"),
DateTrigger(run_date=datetime.now() + timedelta(minutes=10)),
id='alert')
scheduler.start()
Persistence with SQLAlchemy
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
scheduler = BackgroundScheduler(
jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')},
executors={'default': ThreadPoolExecutor(max_workers=5)},
job_defaults={
'coalesce': True, # Merge multiple missed runs into one
'max_instances': 1, # Only one instance of each job at a time
'misfire_grace_time': 300 # Execute up to 5 min late if missed
},
timezone='UTC'
)
scheduler.start()
# Jobs persist across restarts — no need to re-add them
Error Handling and Job Listeners
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
def job_listener(event):
if event.exception:
print(f"Job {event.job_id} FAILED: {event.exception}")
# Plug in alerting from Chapter 11 here
elif not hasattr(event, 'retval'):
print(f"Job {event.job_id} MISSED its scheduled run!")
scheduler.add_listener(job_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
cron in Practice
# crontab format: min hour day month weekday command
# Common expressions:
# 0 8 * * 1-5 Weekdays at 8:00 AM
# */30 * * * * Every 30 minutes
# 0 0 1 * * First day of every month at midnight
# 0 9 * * 1 Every Monday at 9:00 AM
# Edit your crontab
crontab -e
# Correct entry using a virtualenv (avoid common PATH pitfalls)
0 9 * * 1-5 cd /home/user/myproject && /home/user/myproject/venv/bin/python sync.py >> logs/sync.log 2>&1
Three common cron pitfalls: 1. Environment variables: cron runs in a minimal shell. Use absolute paths for all executables, or set PATH at the top of your crontab. 2. Working directory: cron's default working directory is the user home, not where your script lives. Prefix with
cd /path/to/project &&. 3. Python path: Always reference the virtualenv Python (/home/user/project/venv/bin/python), not the system Python.
Building CLIs with Click
pip install click
import click
@click.group()
@click.option('--verbose', '-v', is_flag=True)
@click.pass_context
def cli(ctx, verbose):
"""Data processing toolkit"""
ctx.ensure_object(dict)
ctx.obj['verbose'] = verbose
@cli.command()
@click.option('--source', '-s', required=True, help='Data source URL or path')
@click.option('--output', '-o', default='./data', help='Output directory')
@click.pass_context
def fetch(ctx, source, output):
"""Fetch data from the specified source"""
if ctx.obj['verbose']:
click.echo(f"Fetching from {source}...")
click.secho("Fetch complete", fg='green')
@cli.command()
@click.argument('input_file')
@click.option('--format', '-f', type=click.Choice(['csv', 'json', 'parquet']), default='csv')
def process(input_file, format):
"""Process raw data"""
click.echo(f"Processing {input_file} -> {format}")
@cli.command()
@click.option('--email', '-e', multiple=True, help='Report recipient (repeatable)')
def report(email):
"""Generate and optionally email a report"""
if email:
click.echo(f"Sending report to: {', '.join(email)}")
if __name__ == '__main__':
cli()
Progress Bar
import click, time
@click.command()
@click.argument('files', nargs=-1, type=click.Path(exists=True))
def process_files(files):
"""Batch-process files with a progress bar"""
with click.progressbar(files, label='Processing', show_pos=True) as bar:
for f in bar:
time.sleep(0.5)
click.secho(f"\nDone! Processed {len(files)} files.", fg='green')
Typer: Modern CLI Framework
pip install typer[all]
import typer
from pathlib import Path
from enum import Enum
app = typer.Typer(help="Data processing tool built with Typer")
class OutputFormat(str, Enum):
csv = "csv"
json = "json"
parquet = "parquet"
@app.command()
def fetch(
source: str = typer.Option(..., "--source", "-s", help="Data source URL"),
output: Path = typer.Option(Path("./data"), "--output", "-o"),
verbose: bool = typer.Option(False, "--verbose", "-v"),
):
"""Fetch data from the specified source"""
output.mkdir(parents=True, exist_ok=True)
if verbose:
typer.echo(f"Fetching from {source}")
typer.secho("Done!", fg=typer.colors.GREEN)
@app.command()
def process(
input_file: Path = typer.Argument(..., help="Input file"),
fmt: OutputFormat = typer.Option(OutputFormat.csv, "--format", "-f"),
):
"""Process raw data into the specified format"""
if not input_file.exists():
typer.secho(f"File not found: {input_file}", fg=typer.colors.RED, err=True)
raise typer.Exit(code=1)
typer.echo(f"Processing {input_file} -> {fmt.value}")
if __name__ == "__main__":
app()
Script Deployment
PyInstaller: Single Executable
pip install pyinstaller
# Build a single self-contained executable
pyinstaller --onefile --name datatool main.py
# Output is in dist/datatool (or dist/datatool.exe on Windows)
# Recipients don't need Python installed
Docker Containerization
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]
# Build and run
docker build -t datatool:latest .
docker run datatool:latest python main.py fetch --source https://api.example.com
# Mount local data directory
docker run -v $(pwd)/data:/app/data datatool:latest python main.py process /app/data/input.json
# Combine with cron on the server
0 9 * * * docker run --rm datatool:latest >> /var/log/datatool.log 2>&1
Project: Complete Data Pipeline CLI Tool
"""
datatool — production-ready data pipeline CLI
Commands: fetch / process / report
Dependencies: click, pydantic-settings, requests, pandas
"""
from __future__ import annotations
import sys, json
import click
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')
api_url: str = 'https://httpbin.org/json'
api_key: str = ''
output_dir: Path = Path('./output')
cfg = Settings()
@click.group()
@click.version_option('1.0.0')
@click.option('--verbose', '-v', is_flag=True, envvar='DATATOOL_VERBOSE')
@click.pass_context
def cli(ctx, verbose):
"""datatool — data pipeline CLI\n
\b
Typical workflow:
datatool fetch --date 2024-01-15
datatool process ./output/raw_2024-01-15.json
datatool report --email [email protected]
"""
ctx.ensure_object(dict)
ctx.obj.update({'verbose': verbose, 'cfg': cfg})
@cli.command()
@click.option('--date', '-d', default=datetime.today().strftime('%Y-%m-%d'))
@click.option('--source', '-s', default=None)
@click.option('--dry-run', is_flag=True)
@click.pass_context
def fetch(ctx, date, source, dry_run):
"""Fetch data for a given date from the API"""
c = ctx.obj['cfg']
url = source or c.api_url
params = {'date': date, 'apikey': c.api_key}
if dry_run:
click.echo(f"[DRY RUN] GET {url} params={params}")
return
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
except requests.RequestException as e:
click.secho(f"Request failed: {e}", fg='red', err=True)
sys.exit(1)
c.output_dir.mkdir(parents=True, exist_ok=True)
out = c.output_dir / f"raw_{date}.json"
out.write_text(json.dumps(resp.json(), ensure_ascii=False, indent=2), encoding='utf-8')
click.secho(f"Saved: {out}", fg='green')
@cli.command()
@click.argument('input_file', type=click.Path(exists=True, path_type=Path))
@click.option('--format', '-f', 'fmt',
type=click.Choice(['csv', 'json', 'parquet']), default='csv')
@click.option('--output', '-o', default=None, type=click.Path(path_type=Path))
@click.pass_context
def process(ctx, input_file, fmt, output):
"""Process raw JSON data into the specified format"""
try:
raw = json.loads(input_file.read_text(encoding='utf-8'))
except (json.JSONDecodeError, IOError) as e:
click.secho(f"Read error: {e}", fg='red', err=True)
sys.exit(1)
rows = raw if isinstance(raw, list) else raw.get('data', [raw])
df = pd.json_normalize(rows)
if output is None:
stem = input_file.stem.replace('raw_', 'processed_')
output = input_file.parent / f"{stem}.{fmt}"
if fmt == 'csv':
df.to_csv(output, index=False, encoding='utf-8-sig')
elif fmt == 'json':
df.to_json(output, orient='records', force_ascii=False, indent=2)
elif fmt == 'parquet':
df.to_parquet(output, index=False)
click.secho(f"Saved: {output} ({len(df)} rows)", fg='green')
@cli.command()
@click.option('--date', '-d', default=datetime.today().strftime('%Y-%m-%d'))
@click.option('--email', '-e', multiple=True, metavar='EMAIL')
@click.pass_context
def report(ctx, date, email):
"""Generate a summary report for the given date"""
c = ctx.obj['cfg']
files = list(c.output_dir.glob(f"processed_{date}.*"))
if not files:
click.secho(f"No processed data for {date}. Run 'process' first.", fg='yellow')
sys.exit(1)
df = pd.read_csv(files[0]) if files[0].suffix == '.csv' else pd.read_json(files[0])
rpt = c.output_dir / f"report_{date}.txt"
lines = [
f"=== Report {date} ===",
f"Rows: {len(df)} Columns: {len(df.columns)}",
f"Generated: {datetime.now():%Y-%m-%d %H:%M:%S}",
"", "Numeric summary:", df.describe().to_string(),
]
rpt.write_text('\n'.join(lines), encoding='utf-8')
click.secho(f"Report saved: {rpt}", fg='green')
if email:
click.echo(f"Sending to: {', '.join(email)}")
# Hook in Chapter 10 email module here
if __name__ == '__main__':
cli()
Production workflow tip: Chain the three commands in a shell script or Makefile target, then schedule it with cron. Containerize with Docker to eliminate environment dependencies on the server. The pattern — fetch
Previous
Next
Chapter 16: Final Project