Exceptions & Logging — Production-Grade Python Scripts
Chapter 4: Exceptions & Logging — Production-Grade Python Scripts
Your automation script runs perfectly on your laptop but crashes constantly in production. The root cause is almost always the same: no robust exception handling, no traceable logs. This chapter systematically covers Python's exception hierarchy, logging module best practices, retry mechanisms, and delivers a production-ready logger module you can drop into any project.
The Exception Handling System
Python Exception Hierarchy
BaseException ├── SystemExit ├── KeyboardInterrupt ├── GeneratorExit └── Exception ├── StopIteration ├── ArithmeticError │ ├── ZeroDivisionError │ └── OverflowError ├── LookupError │ ├── IndexError │ └── KeyError ├── OSError (IOError) │ ├── FileNotFoundError │ ├── PermissionError │ └── TimeoutError ├── ValueError ├── TypeError ├── RuntimeError └── ... (and many more)
Key takeaways:
except Exceptioncatches everything exceptSystemExit,KeyboardInterrupt, andGeneratorExit.except BaseExceptioncatches literally everything — almost never appropriate.- Be as specific as possible: catching
FileNotFoundErroris better thanOSErrorwhich is better thanException.
try / except / else / finally — Full Semantics
try:
result = risky_operation()
except SpecificError as exc:
# Only runs when try block raises SpecificError
handle_error(exc)
except (TypeError, ValueError) as exc:
# Catch multiple exception types at once
handle_type_or_value_error(exc)
else:
# Only runs when try block raised NO exception
# Better than putting this at the end of try: exceptions from
# process_result won't be accidentally caught by the except above
process_result(result)
finally:
# Always runs, exception or not — perfect for resource cleanup
cleanup()
Principle: Be Specific, Avoid bare except Exception
❌ Anti-pattern: Catch-all Exception
try: data = fetch_from_api(url) result = process(data) save_to_db(result) except Exception as e: print(f"Error: {e}") # API timeout, bad data, DB failure — all look the same Three completely different failure modes handled identically with no stack trace. Impossible to debug in production.
✅ Correct: Layered, Specific Handling
try: data = fetch_from_api(url) except httpx.TimeoutException: logger.warning("API request timed out: %s — skipping", url) return None except httpx.HTTPStatusError as exc: logger.error("API returned %s for %s", exc.response.status_code, url) raise else: try: result = process(data) save_to_db(result) except ValueError as exc: logger.error("Invalid data format: %s", exc, exc_info=True) raise DataProcessingError("Processing failed") from exc except DatabaseError as exc: logger.critical("Database write failed: %s", exc, exc_info=True) raise Each failure type handled independently. Logging uses appropriate severity. exc_info=True captures the full stack trace. raise re-propagates to let the caller decide policy.
Custom Exception Classes
Designing a Business Exception Hierarchy
class AutomationError(Exception):
"""Root exception for all automation script errors.
Callers can use `except AutomationError` to catch any business
error while still being able to handle specific subtypes individually.
"""
class DataFetchError(AutomationError):
"""Failed to retrieve data (network request, API call)."""
def __init__(self, url: str, reason: str) -> None:
self.url = url
self.reason = reason
super().__init__(f"Failed to fetch data [{url}]: {reason}")
class DataProcessingError(AutomationError):
"""Failed to process or transform data."""
def __init__(self, message: str, raw_data: object = None) -> None:
self.raw_data = raw_data
super().__init__(message)
class ConfigurationError(AutomationError):
"""Invalid or missing configuration."""
Exception Chaining: raise X from Y
def fetch_user_data(user_id: int) -> dict:
"""Fetch user data from API."""
try:
response = httpx.get(f"https://api.example.com/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
# Explicit chaining: Traceback shows both exceptions and their relationship
raise DataFetchError(
url=str(exc.request.url),
reason=f"HTTP {exc.response.status_code}"
) from exc
except httpx.RequestError as exc:
raise DataFetchError(
url=str(exc.request.url),
reason=f"Network error: {exc}"
) from exc
raise X from None: Use this when you explicitly don't want to expose the underlying cause (e.g., hiding database connection strings). But always make sure you've logged sufficient debugging information before suppressing the chain.
Logging Deep Dive
logging vs print
| Capability | logging | |
|---|---|---|
| Level filtering | None | DEBUG/INFO/WARNING/ERROR/CRITICAL — filter by level |
| Output destinations | Console only | Console + file + network + third-party services, simultaneously |
| Timestamps | None | Automatic, millisecond precision |
| Call location | None | Filename, function name, line number — automatic |
| Full exception stack | Manual traceback.format_exc() | exc_info=True — automatic |
| Silence in production | Delete each print manually | Set level=WARNING — all DEBUG/INFO vanishes |
Logger / Handler / Formatter Architecture
import logging
import logging.handlers
import sys
from pathlib import Path
def setup_logging(
log_file: str | Path = "app.log",
level: int = logging.INFO,
max_bytes: int = 10 * 1024 * 1024,
backup_count: int = 5,
) -> logging.Logger:
"""Configure application-level logging: console + rotating file.
Args:
log_file: Path to the log file.
level: Minimum logging level. Defaults to INFO.
max_bytes: Max bytes per log file before rotation. Defaults to 10 MB.
backup_count: Number of rotated log files to keep. Defaults to 5.
Returns:
Configured root Logger instance.
"""
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
fmt = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Console: stderr, WARNING and above only
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(fmt)
# File: rotating by size, DEBUG and above, UTF-8
file_handler = logging.handlers.RotatingFileHandler(
filename=log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
return root_logger
JSON Logging for Production
import logging
import json
import os
from datetime import datetime, timezone
class JsonFormatter(logging.Formatter):
"""Format log records as JSON for ELK / Datadog / CloudWatch ingestion."""
_SKIP_FIELDS = frozenset(logging.LogRecord.__dict__.keys()) | {
"message", "asctime", "msg", "args",
}
def format(self, record: logging.LogRecord) -> str:
record.message = record.getMessage()
entry: dict = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.message,
"file": f"{record.filename}:{record.lineno}",
"pid": os.getpid(),
}
if record.exc_info:
entry["exc"] = self.formatException(record.exc_info)
for k, v in record.__dict__.items():
if k not in self._SKIP_FIELDS and not k.startswith("_"):
entry[k] = v
return json.dumps(entry, ensure_ascii=False, default=str)
# Usage with extra fields:
logger.info(
"Payment processed",
extra={"order_id": 12345, "amount": 299.0, "user_id": 678}
)
# Output: {"ts": "2024-01-15T10:30:00+00:00", "level": "INFO",
# "msg": "Payment processed", "order_id": 12345, ...}
Context Managers
from contextlib import contextmanager
from typing import Generator
import logging
import time
logger = logging.getLogger(__name__)
@contextmanager
def timer(operation_name: str) -> Generator[None, None, None]:
"""Timing context manager: logs elapsed time for a code block.
Usage:
with timer("data import"):
import_data(file_path)
# logs: data import completed in 2.34 seconds
"""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info("%s completed in %.2f seconds", operation_name, elapsed)
@contextmanager
def managed_db_connection(conn_string: str):
"""Database connection context manager: auto commit/rollback."""
import sqlite3
conn = sqlite3.connect(conn_string)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
logger.error("Transaction rolled back", exc_info=True)
raise
finally:
conn.close()
Retry Mechanism: tenacity
import httpx
import logging
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=1, max=30),
retry=retry_if_exception_type((
httpx.TimeoutException,
httpx.NetworkError,
)),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
def fetch_with_retry(url: str, timeout: float = 10.0) -> dict:
"""HTTP GET with automatic retry on transient errors.
Retries on: timeout, network errors (exponential backoff, max 4 attempts).
Does NOT retry: 4xx client errors, non-transient 5xx errors.
"""
response = httpx.get(url, timeout=timeout)
response.raise_for_status()
return response.json()
Fine-Grained Retry Control
from tenacity import retry_if_exception
def should_retry_http(exc: BaseException) -> bool:
"""Only retry on 429 and 5xx — not on 4xx client errors."""
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code == 429 or exc.response.status_code >= 500
return isinstance(exc, (httpx.TimeoutException, httpx.NetworkError))
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=1, max=60),
retry=retry_if_exception(should_retry_http),
reraise=True,
)
def robust_api_call(url: str) -> dict:
response = httpx.get(url, timeout=15.0)
response.raise_for_status()
return response.json()
Retry is not a cure-all. If the server returns 400 (bad request), retrying 10 times changes nothing. Always define explicitly which errors are worth retrying — typically only timeouts, network jitter, rate limiting (429), and transient server errors (5xx).
Production Logger Module
A complete, drop-in logger module combining everything from this chapter: multiprocess-safe rotation, JSON format for production, human-readable format for development, and a Sentry integration hook.
"""
logger_setup.py — Drop-in enterprise logger module.
Usage:
from logger_setup import setup_logger, get_logger
# Call once in your entry point (main.py)
setup_logger(env="production", log_file="logs/app.log")
# Use in any module
logger = get_logger(__name__)
logger.info("Processing started", extra={"task_id": "abc123"})
"""
import logging
import logging.handlers
import json
import sys
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
class JsonFormatter(logging.Formatter):
_SKIP_FIELDS = frozenset(logging.LogRecord.__dict__.keys()) | {
"message", "asctime", "msg", "args",
}
def format(self, record: logging.LogRecord) -> str:
record.message = record.getMessage()
entry: dict = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.message,
"file": f"{record.filename}:{record.lineno}",
"pid": os.getpid(),
}
if record.exc_info:
entry["exc"] = self.formatException(record.exc_info)
for k, v in record.__dict__.items():
if k not in self._SKIP_FIELDS and not k.startswith("_"):
entry[k] = v
return json.dumps(entry, ensure_ascii=False, default=str)
HUMAN_FMT = logging.Formatter(
fmt="%(asctime)s [%(levelname)-8s] %(name)s:%(lineno)d — %(message)s",
datefmt="%H:%M:%S",
)
class SafeRotatingFileHandler(logging.handlers.RotatingFileHandler):
"""Multiprocess-safe rotating file handler."""
def emit(self, record: logging.LogRecord) -> None:
try:
if self.shouldRollover(record):
self.doRollover()
logging.FileHandler.emit(self, record)
except Exception:
self.handleError(record)
def setup_logger(
env: Literal["development", "production"] = "development",
log_file: str | Path | None = None,
level: int = logging.DEBUG,
max_bytes: int = 20 * 1024 * 1024,
backup_count: int = 7,
sentry_dsn: str | None = None,
) -> None:
"""Initialize global logging configuration. Call once in main.py.
Args:
env: Runtime environment. "production" uses JSON; "development" uses
human-readable format.
log_file: Log file path. None means console output only.
level: Root logger minimum level. Defaults to DEBUG.
max_bytes: Max bytes per log file before rotation. Defaults to 20 MB.
backup_count: Number of rotated log files to retain.
sentry_dsn: Sentry DSN string. Enables Sentry integration when provided.
"""
formatter = JsonFormatter() if env == "production" else HUMAN_FMT
handlers: list[logging.Handler] = []
console = logging.StreamHandler(sys.stderr)
console.setFormatter(formatter)
console.setLevel(logging.DEBUG if env == "development" else logging.WARNING)
handlers.append(console)
if log_file is not None:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
fh = SafeRotatingFileHandler(
filename=log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
fh.setFormatter(JsonFormatter())
fh.setLevel(logging.DEBUG)
handlers.append(fh)
if sentry_dsn:
try:
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
sentry_sdk.init(
dsn=sentry_dsn,
integrations=[LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR,
)],
)
except ImportError:
logging.getLogger(__name__).warning(
"sentry_dsn provided but sentry-sdk is not installed. "
"Run: pip install sentry-sdk"
)
root = logging.getLogger()
root.setLevel(level)
root.handlers.clear()
for h in handlers:
root.addHandler(h)
def get_logger(name: str) -> logging.Logger:
"""Get a logger named by module path. Thin wrapper over logging.getLogger."""
return logging.getLogger(name)
Chapter summary: Production-grade automation scripts need three layers of protection — precise exception handling (catch what you expect, let the unknown propagate), complete logging (JSON format + structured fields), and smart retries (only for transient errors). Use this logger module as the foundation for every Python project you build. You'll thank yourself the first time something goes wrong in production.
Previous
Next
Chapter 5: Filesystem Processing