第 4 章

异常处理与日志——让脚本在生产环境跑稳

第4章:异常处理与日志——让脚本在生产环境跑稳

自动化脚本在你的电脑上跑得很好,一上生产服务器就各种崩——这是每个 Python 开发者都踩过的坑。根本原因几乎都是:没有健全的异常处理,没有可追溯的日志。本章系统讲解 Python 异常体系、logging 模块最佳实践、重试机制,最后给你一个可直接复用的企业级日志模块。

异常处理体系

Python 异常层次结构

Python 的所有异常都继承自 BaseException。理解这棵树,是写出精确异常处理的基础。

BaseException ├── SystemExit ├── KeyboardInterrupt ├── GeneratorExit └── Exception ├── StopIteration ├── ArithmeticError │ ├── ZeroDivisionError │ └── OverflowError ├── LookupError │ ├── IndexError │ └── KeyError ├── OSError (IOError) │ ├── FileNotFoundError │ ├── PermissionError │ └── TimeoutError ├── ValueError ├── TypeError ├── RuntimeError └── ...(以及更多)

关键结论:

try / except / else / finally 完整语义

很多人只用 try/except,忽略了 elsefinally 的价值。四个子句的完整语义:

try:
    result = risky_operation()
except SpecificError as exc:
    # 只在 try 块抛出 SpecificError 时执行
    handle_error(exc)
except (TypeError, ValueError) as exc:
    # 同时捕获多种异常类型
    handle_type_or_value_error(exc)
else:
    # 只在 try 块没有抛出任何异常时执行
    # 比把代码放在 try 末尾更清晰:避免意外捕获 process_result 的异常
    process_result(result)
finally:
    # 无论是否发生异常,总是执行
    # 适合清理资源:关闭文件、释放锁、断开连接
    cleanup()

**else 子句的价值:**把"成功路径的代码"放在 else 而不是 try 末尾,意味着这部分代码的异常不会被上面的 except 捕获到。这避免了意外吞掉不相关的错误,让异常处理的意图更清晰。

捕获原则:宁可精确,不要 except Exception

❌ 错误示范:过于宽泛的捕获

try: data = fetch_from_api(url) result = process(data) save_to_db(result) except Exception as e: print(f"出错了:{e}") # 什么都可能触发这里,完全不知道发生了什么

**问题:**API 超时、数据格式错误、数据库连接失败——三种完全不同的错误,你用同样的方式处理它们,而且只打印了一行没有调用栈的信息。出了问题完全无法定位。

✅ 正确示范:精确分层捕获

import httpx import logging

logger = logging.getLogger(name)

try: data = fetch_from_api(url) except httpx.TimeoutException: logger.warning("API 请求超时:%s,将跳过本次处理", url) return None except httpx.HTTPStatusError as exc: logger.error("API 返回错误状态码 %s:%s", exc.response.status_code, url) raise # 重新抛出,让上层决定如何处理 else: try: result = process(data) save_to_db(result) except ValueError as exc: logger.error("数据格式不合法:%s", exc, exc_info=True) raise DataProcessingError("数据处理失败") from exc except DatabaseError as exc: logger.critical("数据库写入失败:%s", exc, exc_info=True) raise

**改进:**每种错误类型独立处理;使用 logging 记录上下文;超时用 warning(可恢复),数据库失败用 critical(不可恢复);保留 exc_info=True 以记录完整调用栈;raise 重新抛出让上层决策。

自定义异常类

为什么要自定义异常

Python 内置异常描述的是技术层面的问题(文件不存在、类型错误),而业务层面的问题需要业务语言来表达。当你看到 FileNotFoundError,你不知道是配置文件找不到还是用户上传的文件丢失——但 ConfigFileNotFoundErrorUploadedFileNotFoundError 就非常清晰。

设计业务异常层次

class AutomationError(Exception):
    """自动化脚本的根异常。所有业务异常继承自此类。

    使用根异常的好处:调用方可以用 `except AutomationError` 捕获所有
    业务异常,同时仍能单独处理特定类型。
    """

class DataFetchError(AutomationError):
    """数据获取失败(网络请求、API 调用)。"""

    def __init__(self, url: str, reason: str) -> None:
        self.url = url
        self.reason = reason
        super().__init__(f"获取数据失败 [{url}]:{reason}")

class DataProcessingError(AutomationError):
    """数据处理/转换失败。"""

    def __init__(self, message: str, raw_data: object = None) -> None:
        self.raw_data = raw_data
        super().__init__(message)

class ConfigurationError(AutomationError):
    """配置错误(缺少必要配置项、格式不合法)。"""

异常链:raise X from Y

当你把一个底层异常包装成业务异常时,应该保留原始异常信息。raise X from Y 建立显式的异常链:

import httpx

def fetch_user_data(user_id: int) -> dict:
    """从 API 获取用户数据。"""
    try:
        response = httpx.get(f"https://api.example.com/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as exc:
        # raise X from Y:明确表示 DataFetchError 是"由于" HTTPStatusError 引起的
        # Python 会在 Traceback 中显示两个异常和它们的关系
        raise DataFetchError(
            url=str(exc.request.url),
            reason=f"HTTP {exc.response.status_code}"
        ) from exc
    except httpx.RequestError as exc:
        raise DataFetchError(
            url=str(exc.request.url),
            reason=f"网络错误:{exc}"
        ) from exc

# 调用方可以这样使用:
try:
    data = fetch_user_data(42)
except DataFetchError as exc:
    print(f"用户数据获取失败:{exc}")
    print(f"原始原因:{exc.__cause__}")  # 访问原始异常

**raise X from None:**如果你明确不想暴露底层细节(比如不想让调用方看到数据库连接字符串),用 raise BusinessError(...) from None 隐藏原始异常链。但要确保你在日志里已经记录了足够的调试信息。

logging 模块精讲

logging vs print:为什么 print 不够用

能力 print logging
输出级别控制 DEBUG/INFO/WARNING/ERROR/CRITICAL,可按级别过滤
输出目标 只有控制台 控制台 + 文件 + 网络 + 第三方服务,同时输出
时间戳 自动记录,精确到毫秒
调用位置 自动记录文件名、函数名、行号
完整异常栈 需要手动 traceback.format_exc() exc_info=True 自动附加
生产环境关闭 需要逐个删除 设置 level=WARNING 即可过滤所有 DEBUG/INFO

Logger / Handler / Formatter 体系

理解三个核心概念,就理解了 logging 的全部架构:

完整配置:同时输出控制台和按大小轮转的文件:

import logging
import logging.handlers
import sys
from pathlib import Path

def setup_logging(
    log_file: str | Path = "app.log",
    level: int = logging.INFO,
    max_bytes: int = 10 * 1024 * 1024,  # 10 MB
    backup_count: int = 5,
) -> logging.Logger:
    """配置应用级别的日志,同时输出到控制台和轮转文件。

    Args:
        log_file: 日志文件路径。
        level: 全局日志级别,默认 INFO。
        max_bytes: 单个日志文件最大字节数,默认 10 MB。
        backup_count: 保留的历史日志文件数量,默认 5 个。

    Returns:
        配置好的根 Logger 实例。
    """
    # 创建日志目录(如果不存在)
    Path(log_file).parent.mkdir(parents=True, exist_ok=True)

    # 定义格式:时间 | 级别 | 模块:行号 | 消息
    fmt = logging.Formatter(
        fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    # 控制台 Handler:输出到 stderr,WARNING 及以上级别
    console_handler = logging.StreamHandler(sys.stderr)
    console_handler.setLevel(logging.WARNING)
    console_handler.setFormatter(fmt)

    # 文件 Handler:按大小轮转,INFO 及以上级别,UTF-8 编码
    file_handler = logging.handlers.RotatingFileHandler(
        filename=log_file,
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8",
    )
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(fmt)

    # 配置根 Logger
    root_logger = logging.getLogger()
    root_logger.setLevel(level)
    root_logger.addHandler(console_handler)
    root_logger.addHandler(file_handler)

    return root_logger

在每个模块里使用:

# 任意模块内
import logging

logger = logging.getLogger(__name__)  # 以模块路径命名,如 myproject.scraper

def process_order(order_id: int) -> None:
    logger.debug("开始处理订单 %s", order_id)       # 调试信息,生产不显示
    logger.info("订单 %s 处理完成", order_id)        # 正常流程记录
    logger.warning("订单 %s 金额异常,已标记", order_id)  # 需要关注
    logger.error("订单 %s 处理失败", order_id, exc_info=True)  # 附带调用栈

JSON 日志输出(生产环境标准)

在生产环境,纯文本日志很难被日志聚合平台(ELK Stack、Datadog、CloudWatch)解析。标准做法是输出结构化的 JSON 日志:

import logging
import json
from datetime import datetime, timezone

class JsonFormatter(logging.Formatter):
    """将日志记录格式化为 JSON,方便日志平台解析。"""

    def format(self, record: logging.LogRecord) -> str:
        log_entry: dict = {
            "timestamp": datetime.fromtimestamp(
                record.created, tz=timezone.utc
            ).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }

        # 附加异常信息
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)

        # 附加自定义字段(通过 extra 参数传入)
        for key, value in record.__dict__.items():
            if key not in logging.LogRecord.__dict__ and not key.startswith("_"):
                log_entry[key] = value

        return json.dumps(log_entry, ensure_ascii=False)

# 使用方式
json_handler = logging.StreamHandler()
json_handler.setFormatter(JsonFormatter())
logging.getLogger().addHandler(json_handler)

# 带自定义字段的日志
logger.info(
    "订单支付成功",
    extra={"order_id": 12345, "amount": 299.0, "user_id": 678}
)
# 输出:{"timestamp": "2024-01-15T10:30:00+00:00", "level": "INFO",
#        "message": "订单支付成功", "order_id": 12345, ...}

上下文管理器

contextlib.contextmanager 装饰器

上下文管理器(with 语句)保证资源在使用完后一定被释放,即使中途发生异常。用 @contextmanager 装饰器,可以用最简洁的方式创建自定义上下文管理器:

from contextlib import contextmanager
from typing import Generator
import logging
import time

logger = logging.getLogger(__name__)

@contextmanager
def timer(operation_name: str) -> Generator[None, None, None]:
    """计时上下文管理器:记录代码块的执行时间。

    使用示例:
        with timer("数据导入"):
            import_data(file_path)
        # 输出:数据导入 完成,耗时 2.34 秒
    """
    start = time.perf_counter()
    try:
        yield  # with 块中的代码在此处执行
    finally:
        elapsed = time.perf_counter() - start
        logger.info("%s 完成,耗时 %.2f 秒", operation_name, elapsed)

@contextmanager
def managed_db_connection(conn_string: str):
    """数据库连接上下文管理器:自动处理提交和回滚。"""
    import sqlite3
    conn = sqlite3.connect(conn_string)
    try:
        yield conn
        conn.commit()  # 正常退出时提交
        logger.debug("数据库事务已提交")
    except Exception:
        conn.rollback()  # 异常时回滚
        logger.error("数据库事务已回滚", exc_info=True)
        raise  # 回滚后继续向上传播异常
    finally:
        conn.close()  # 无论如何都关闭连接

# 使用
with timer("批量导入"):
    with managed_db_connection("data.db") as conn:
        conn.execute("INSERT INTO orders VALUES (?, ?)", (1, 100))

资源清理模式

上下文管理器在自动化脚本中有三个核心用途:

import fcntl
from contextlib import contextmanager
from pathlib import Path

@contextmanager
def file_lock(lock_path: str | Path):
    """基于文件的进程锁,防止脚本并发运行。

    使用示例:
        with file_lock("/tmp/my_script.lock"):
            run_task()  # 同一时间只有一个进程能进入这里
    """
    lock_file = Path(lock_path)
    fd = lock_file.open("w")
    try:
        fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        yield
    except BlockingIOError:
        raise RuntimeError(f"脚本已在运行(锁文件:{lock_path})") from None
    finally:
        fcntl.flock(fd, fcntl.LOCK_UN)
        fd.close()

重试机制:tenacity 库

网络请求失败、数据库临时不可用——这类瞬态错误通过重试往往能自动恢复。tenacity 是 Python 最成熟的重试库,配置简洁而功能完整。

pip install tenacity

基础重试:@retry 装饰器

import httpx
import logging
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)

logger = logging.getLogger(__name__)

@retry(
    # 最多重试 4 次(第 1 次是正常调用,后续 3 次是重试)
    stop=stop_after_attempt(4),
    # 指数退避:第 1 次等 1 秒,第 2 次等 2 秒,第 3 次等 4 秒,最大 30 秒
    wait=wait_exponential(multiplier=1, min=1, max=30),
    # 只在这些异常类型时重试(其他异常直接抛出)
    retry=retry_if_exception_type((
        httpx.TimeoutException,
        httpx.NetworkError,
    )),
    # 每次重试前打印日志
    before_sleep=before_sleep_log(logger, logging.WARNING),
    # 所有重试用尽后,重新抛出最后一次异常
    reraise=True,
)
def fetch_with_retry(url: str, timeout: float = 10.0) -> dict:
    """带自动重试的 HTTP GET 请求。

    Args:
        url: 请求目标 URL。
        timeout: 单次请求超时(秒),默认 10 秒。

    Returns:
        解析后的 JSON 响应体。

    Raises:
        httpx.TimeoutException: 4 次重试全部超时后抛出。
        httpx.HTTPStatusError: 非 2xx 响应(不重试,直接抛出)。
    """
    response = httpx.get(url, timeout=timeout)
    response.raise_for_status()  # 4xx/5xx 抛 HTTPStatusError,不在重试范围内
    return response.json()

更精细的重试控制

from tenacity import retry_if_exception, RetryCallState
import httpx

def should_retry_http(exc: BaseException) -> bool:
    """仅对可重试的 HTTP 状态码重试。"""
    if isinstance(exc, httpx.HTTPStatusError):
        # 429 Too Many Requests 和 5xx 服务端错误才重试
        # 4xx 客户端错误(除 429)不重试(你的请求有问题,重试也没用)
        return exc.response.status_code == 429 or exc.response.status_code >= 500
    return isinstance(exc, (httpx.TimeoutException, httpx.NetworkError))

def log_retry_attempt(retry_state: RetryCallState) -> None:
    """自定义重试日志,包含更多上下文信息。"""
    logger.warning(
        "重试第 %d 次(上次异常:%s)",
        retry_state.attempt_number,
        retry_state.outcome.exception(),
    )

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=1, max=60),
    retry=retry_if_exception(should_retry_http),
    before_sleep=log_retry_attempt,
    reraise=True,
)
def robust_api_call(url: str) -> dict:
    """具备精细重试控制的 API 调用。"""
    response = httpx.get(url, timeout=15.0)
    response.raise_for_status()
    return response.json()

**注意:**重试不是银弹。如果服务器返回 400(请求格式错误),重试 10 次也是徒劳的。始终明确"哪些错误值得重试"——通常只有超时、网络抖动、限流(429)和服务端临时错误(5xx)。

实战:企业级日志模块

把本章所有知识整合成一个完整的、可直接复用的日志模块。支持:多进程安全、JSON 格式(生产)/ 易读格式(开发)、Sentry 集成预留接口。

"""
logger_setup.py — 企业级日志模块,可直接复制到项目中使用。

使用方式:
    from logger_setup import setup_logger, get_logger

    # 在程序入口(main.py)调用一次
    setup_logger(env="production", log_file="logs/app.log")

    # 在任意模块中使用
    logger = get_logger(__name__)
    logger.info("处理开始", extra={"task_id": "abc123"})
"""
import logging
import logging.handlers
import json
import sys
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal

# ──────────────────────────────────────────────
# JSON Formatter(生产环境)
# ──────────────────────────────────────────────

class JsonFormatter(logging.Formatter):
    """结构化 JSON 日志,兼容 ELK / Datadog / CloudWatch。"""

    # 标准 LogRecord 字段,不重复输出到 extra
    _SKIP_FIELDS = frozenset(logging.LogRecord.__dict__.keys()) | {
        "message", "asctime", "msg", "args",
    }

    def format(self, record: logging.LogRecord) -> str:
        record.message = record.getMessage()
        entry: dict = {
            "ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "msg": record.message,
            "file": f"{record.filename}:{record.lineno}",
            "pid": os.getpid(),
        }
        if record.exc_info:
            entry["exc"] = self.formatException(record.exc_info)
        # 附加 extra 字段
        for k, v in record.__dict__.items():
            if k not in self._SKIP_FIELDS and not k.startswith("_"):
                entry[k] = v
        return json.dumps(entry, ensure_ascii=False, default=str)

# ──────────────────────────────────────────────
# 人类可读 Formatter(开发环境)
# ──────────────────────────────────────────────

HUMAN_FMT = logging.Formatter(
    fmt="%(asctime)s [%(levelname)-8s] %(name)s:%(lineno)d — %(message)s",
    datefmt="%H:%M:%S",
)

# ──────────────────────────────────────────────
# 多进程安全的 RotatingFileHandler
# ──────────────────────────────────────────────

class SafeRotatingFileHandler(logging.handlers.RotatingFileHandler):
    """在多进程环境下安全的轮转文件 Handler。

    标准 RotatingFileHandler 在多进程写入时可能损坏文件。
    此版本在每次 emit 前重新打开文件,以支持多进程安全写入。
    (适合低并发场景;高并发场景推荐使用集中式日志服务)
    """

    def emit(self, record: logging.LogRecord) -> None:
        try:
            if self.shouldRollover(record):
                self.doRollover()
            logging.FileHandler.emit(self, record)
        except Exception:
            self.handleError(record)

# ──────────────────────────────────────────────
# 公开 API
# ──────────────────────────────────────────────

def setup_logger(
    env: Literal["development", "production"] = "development",
    log_file: str | Path | None = None,
    level: int = logging.DEBUG,
    max_bytes: int = 20 * 1024 * 1024,   # 20 MB
    backup_count: int = 7,
    sentry_dsn: str | None = None,
) -> None:
    """初始化全局日志配置。在 main.py 入口处调用一次即可。

    Args:
        env: 运行环境。production 使用 JSON 格式;development 使用易读格式。
        log_file: 日志文件路径。为 None 时只输出到控制台。
        level: 根 Logger 的最低级别,默认 DEBUG(开发)。
        max_bytes: 单个日志文件最大字节数。
        backup_count: 保留的历史日志文件数量。
        sentry_dsn: Sentry DSN 字符串。提供时自动集成 Sentry。
    """
    formatter = JsonFormatter() if env == "production" else HUMAN_FMT

    handlers: list[logging.Handler] = []

    # 控制台输出
    console = logging.StreamHandler(sys.stderr)
    console.setFormatter(formatter)
    console.setLevel(logging.DEBUG if env == "development" else logging.WARNING)
    handlers.append(console)

    # 文件输出(可选)
    if log_file is not None:
        Path(log_file).parent.mkdir(parents=True, exist_ok=True)
        fh = SafeRotatingFileHandler(
            filename=log_file,
            maxBytes=max_bytes,
            backupCount=backup_count,
            encoding="utf-8",
        )
        fh.setFormatter(JsonFormatter())  # 文件始终用 JSON,方便后续分析
        fh.setLevel(logging.DEBUG)
        handlers.append(fh)

    # Sentry 集成(预留接口)
    if sentry_dsn:
        try:
            import sentry_sdk
            from sentry_sdk.integrations.logging import LoggingIntegration

            sentry_sdk.init(
                dsn=sentry_dsn,
                integrations=[
                    LoggingIntegration(
                        level=logging.INFO,        # INFO 以上作为面包屑
                        event_level=logging.ERROR, # ERROR 以上发送 Sentry 事件
                    )
                ],
            )
            logging.getLogger(__name__).info("Sentry 集成已启用")
        except ImportError:
            logging.getLogger(__name__).warning(
                "sentry_dsn 已提供但未安装 sentry-sdk,跳过 Sentry 集成。"
                "运行 pip install sentry-sdk 以启用。"
            )

    # 应用配置
    root = logging.getLogger()
    root.setLevel(level)
    # 清除可能存在的旧 handler(避免重复初始化时日志重复输出)
    root.handlers.clear()
    for h in handlers:
        root.addHandler(h)

def get_logger(name: str) -> logging.Logger:
    """获取以模块路径命名的 Logger。

    这是对 logging.getLogger(name) 的薄封装,
    未来可在此处添加统一的过滤器或上下文注入逻辑。
    """
    return logging.getLogger(name)

使用示例(main.py)

"""程序入口:初始化日志,然后运行主业务逻辑。"""
import os
from logger_setup import setup_logger, get_logger

def main() -> None:
    env = os.getenv("APP_ENV", "development")
    setup_logger(
        env=env,
        log_file="logs/automation.log",
        sentry_dsn=os.getenv("SENTRY_DSN"),  # 生产环境通过环境变量注入
    )

    logger = get_logger(__name__)
    logger.info("程序启动,环境:%s", env)

    try:
        run_automation()
    except Exception:
        logger.critical("未预期的致命错误,程序退出", exc_info=True)
        raise SystemExit(1)

if __name__ == "__main__":
    main()

**本章总结:**健壮的自动化脚本需要三层保障——精确的异常处理(捕获你预期的,让未知的向上传播)、完整的日志记录(JSON 格式 + 结构化字段)、自动重试(只对瞬态错误重试)。把本章的日志模块作为你所有 Python 项目的起点,你会在第一次出现生产问题时感谢自己做了这个决定。

上一章

下一章
第5章:文件系统处理
本章评分
4.5  / 5  (65 评分)

💬 留言讨论