异常处理与日志——让脚本在生产环境跑稳
第4章:异常处理与日志——让脚本在生产环境跑稳
自动化脚本在你的电脑上跑得很好,一上生产服务器就各种崩——这是每个 Python 开发者都踩过的坑。根本原因几乎都是:没有健全的异常处理,没有可追溯的日志。本章系统讲解 Python 异常体系、logging 模块最佳实践、重试机制,最后给你一个可直接复用的企业级日志模块。
异常处理体系
Python 异常层次结构
Python 的所有异常都继承自 BaseException。理解这棵树,是写出精确异常处理的基础。
BaseException ├── SystemExit ├── KeyboardInterrupt ├── GeneratorExit └── Exception ├── StopIteration ├── ArithmeticError │ ├── ZeroDivisionError │ └── OverflowError ├── LookupError │ ├── IndexError │ └── KeyError ├── OSError (IOError) │ ├── FileNotFoundError │ ├── PermissionError │ └── TimeoutError ├── ValueError ├── TypeError ├── RuntimeError └── ...(以及更多)
关键结论:
except Exception能捕获除SystemExit、KeyboardInterrupt、GeneratorExit之外的所有异常。except BaseException能捕获一切——包括 Ctrl+C 和sys.exit(),几乎不应该使用。- 捕获越精确越好:捕获
FileNotFoundError好过捕获OSError,好过捕获Exception。
try / except / else / finally 完整语义
很多人只用 try/except,忽略了 else 和 finally 的价值。四个子句的完整语义:
try:
result = risky_operation()
except SpecificError as exc:
# 只在 try 块抛出 SpecificError 时执行
handle_error(exc)
except (TypeError, ValueError) as exc:
# 同时捕获多种异常类型
handle_type_or_value_error(exc)
else:
# 只在 try 块没有抛出任何异常时执行
# 比把代码放在 try 末尾更清晰:避免意外捕获 process_result 的异常
process_result(result)
finally:
# 无论是否发生异常,总是执行
# 适合清理资源:关闭文件、释放锁、断开连接
cleanup()
**else 子句的价值:**把"成功路径的代码"放在 else 而不是 try 末尾,意味着这部分代码的异常不会被上面的 except 捕获到。这避免了意外吞掉不相关的错误,让异常处理的意图更清晰。
捕获原则:宁可精确,不要 except Exception
❌ 错误示范:过于宽泛的捕获
try: data = fetch_from_api(url) result = process(data) save_to_db(result) except Exception as e: print(f"出错了:{e}") # 什么都可能触发这里,完全不知道发生了什么
**问题:**API 超时、数据格式错误、数据库连接失败——三种完全不同的错误,你用同样的方式处理它们,而且只打印了一行没有调用栈的信息。出了问题完全无法定位。
✅ 正确示范:精确分层捕获
import httpx import logging
logger = logging.getLogger(name)
try: data = fetch_from_api(url) except httpx.TimeoutException: logger.warning("API 请求超时:%s,将跳过本次处理", url) return None except httpx.HTTPStatusError as exc: logger.error("API 返回错误状态码 %s:%s", exc.response.status_code, url) raise # 重新抛出,让上层决定如何处理 else: try: result = process(data) save_to_db(result) except ValueError as exc: logger.error("数据格式不合法:%s", exc, exc_info=True) raise DataProcessingError("数据处理失败") from exc except DatabaseError as exc: logger.critical("数据库写入失败:%s", exc, exc_info=True) raise
**改进:**每种错误类型独立处理;使用 logging 记录上下文;超时用 warning(可恢复),数据库失败用 critical(不可恢复);保留 exc_info=True 以记录完整调用栈;raise 重新抛出让上层决策。
自定义异常类
为什么要自定义异常
Python 内置异常描述的是技术层面的问题(文件不存在、类型错误),而业务层面的问题需要业务语言来表达。当你看到 FileNotFoundError,你不知道是配置文件找不到还是用户上传的文件丢失——但 ConfigFileNotFoundError 或 UploadedFileNotFoundError 就非常清晰。
设计业务异常层次
class AutomationError(Exception):
"""自动化脚本的根异常。所有业务异常继承自此类。
使用根异常的好处:调用方可以用 `except AutomationError` 捕获所有
业务异常,同时仍能单独处理特定类型。
"""
class DataFetchError(AutomationError):
"""数据获取失败(网络请求、API 调用)。"""
def __init__(self, url: str, reason: str) -> None:
self.url = url
self.reason = reason
super().__init__(f"获取数据失败 [{url}]:{reason}")
class DataProcessingError(AutomationError):
"""数据处理/转换失败。"""
def __init__(self, message: str, raw_data: object = None) -> None:
self.raw_data = raw_data
super().__init__(message)
class ConfigurationError(AutomationError):
"""配置错误(缺少必要配置项、格式不合法)。"""
异常链:raise X from Y
当你把一个底层异常包装成业务异常时,应该保留原始异常信息。raise X from Y 建立显式的异常链:
import httpx
def fetch_user_data(user_id: int) -> dict:
"""从 API 获取用户数据。"""
try:
response = httpx.get(f"https://api.example.com/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
# raise X from Y:明确表示 DataFetchError 是"由于" HTTPStatusError 引起的
# Python 会在 Traceback 中显示两个异常和它们的关系
raise DataFetchError(
url=str(exc.request.url),
reason=f"HTTP {exc.response.status_code}"
) from exc
except httpx.RequestError as exc:
raise DataFetchError(
url=str(exc.request.url),
reason=f"网络错误:{exc}"
) from exc
# 调用方可以这样使用:
try:
data = fetch_user_data(42)
except DataFetchError as exc:
print(f"用户数据获取失败:{exc}")
print(f"原始原因:{exc.__cause__}") # 访问原始异常
**raise X from None:**如果你明确不想暴露底层细节(比如不想让调用方看到数据库连接字符串),用
raise BusinessError(...) from None隐藏原始异常链。但要确保你在日志里已经记录了足够的调试信息。
logging 模块精讲
logging vs print:为什么 print 不够用
| 能力 | logging | |
|---|---|---|
| 输出级别控制 | 无 | DEBUG/INFO/WARNING/ERROR/CRITICAL,可按级别过滤 |
| 输出目标 | 只有控制台 | 控制台 + 文件 + 网络 + 第三方服务,同时输出 |
| 时间戳 | 无 | 自动记录,精确到毫秒 |
| 调用位置 | 无 | 自动记录文件名、函数名、行号 |
| 完整异常栈 | 需要手动 traceback.format_exc() | exc_info=True 自动附加 |
| 生产环境关闭 | 需要逐个删除 | 设置 level=WARNING 即可过滤所有 DEBUG/INFO |
Logger / Handler / Formatter 体系
理解三个核心概念,就理解了 logging 的全部架构:
- **Logger:**你的代码直接交互的对象(
logger.info()、logger.error())。每个模块有自己的 Logger,以模块路径命名(logging.getLogger(__name__))。 - **Handler:**决定日志发往哪里——控制台(
StreamHandler)、文件(FileHandler)、按大小轮转(RotatingFileHandler)、按日期轮转(TimedRotatingFileHandler)。 - **Formatter:**决定每条日志长什么样——时间格式、是否包含文件名/行号、是否输出 JSON。
完整配置:同时输出控制台和按大小轮转的文件:
import logging
import logging.handlers
import sys
from pathlib import Path
def setup_logging(
log_file: str | Path = "app.log",
level: int = logging.INFO,
max_bytes: int = 10 * 1024 * 1024, # 10 MB
backup_count: int = 5,
) -> logging.Logger:
"""配置应用级别的日志,同时输出到控制台和轮转文件。
Args:
log_file: 日志文件路径。
level: 全局日志级别,默认 INFO。
max_bytes: 单个日志文件最大字节数,默认 10 MB。
backup_count: 保留的历史日志文件数量,默认 5 个。
Returns:
配置好的根 Logger 实例。
"""
# 创建日志目录(如果不存在)
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
# 定义格式:时间 | 级别 | 模块:行号 | 消息
fmt = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 控制台 Handler:输出到 stderr,WARNING 及以上级别
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(fmt)
# 文件 Handler:按大小轮转,INFO 及以上级别,UTF-8 编码
file_handler = logging.handlers.RotatingFileHandler(
filename=log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 配置根 Logger
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
return root_logger
在每个模块里使用:
# 任意模块内
import logging
logger = logging.getLogger(__name__) # 以模块路径命名,如 myproject.scraper
def process_order(order_id: int) -> None:
logger.debug("开始处理订单 %s", order_id) # 调试信息,生产不显示
logger.info("订单 %s 处理完成", order_id) # 正常流程记录
logger.warning("订单 %s 金额异常,已标记", order_id) # 需要关注
logger.error("订单 %s 处理失败", order_id, exc_info=True) # 附带调用栈
JSON 日志输出(生产环境标准)
在生产环境,纯文本日志很难被日志聚合平台(ELK Stack、Datadog、CloudWatch)解析。标准做法是输出结构化的 JSON 日志:
import logging
import json
from datetime import datetime, timezone
class JsonFormatter(logging.Formatter):
"""将日志记录格式化为 JSON,方便日志平台解析。"""
def format(self, record: logging.LogRecord) -> str:
log_entry: dict = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 附加异常信息
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# 附加自定义字段(通过 extra 参数传入)
for key, value in record.__dict__.items():
if key not in logging.LogRecord.__dict__ and not key.startswith("_"):
log_entry[key] = value
return json.dumps(log_entry, ensure_ascii=False)
# 使用方式
json_handler = logging.StreamHandler()
json_handler.setFormatter(JsonFormatter())
logging.getLogger().addHandler(json_handler)
# 带自定义字段的日志
logger.info(
"订单支付成功",
extra={"order_id": 12345, "amount": 299.0, "user_id": 678}
)
# 输出:{"timestamp": "2024-01-15T10:30:00+00:00", "level": "INFO",
# "message": "订单支付成功", "order_id": 12345, ...}
上下文管理器
contextlib.contextmanager 装饰器
上下文管理器(with 语句)保证资源在使用完后一定被释放,即使中途发生异常。用 @contextmanager 装饰器,可以用最简洁的方式创建自定义上下文管理器:
from contextlib import contextmanager
from typing import Generator
import logging
import time
logger = logging.getLogger(__name__)
@contextmanager
def timer(operation_name: str) -> Generator[None, None, None]:
"""计时上下文管理器:记录代码块的执行时间。
使用示例:
with timer("数据导入"):
import_data(file_path)
# 输出:数据导入 完成,耗时 2.34 秒
"""
start = time.perf_counter()
try:
yield # with 块中的代码在此处执行
finally:
elapsed = time.perf_counter() - start
logger.info("%s 完成,耗时 %.2f 秒", operation_name, elapsed)
@contextmanager
def managed_db_connection(conn_string: str):
"""数据库连接上下文管理器:自动处理提交和回滚。"""
import sqlite3
conn = sqlite3.connect(conn_string)
try:
yield conn
conn.commit() # 正常退出时提交
logger.debug("数据库事务已提交")
except Exception:
conn.rollback() # 异常时回滚
logger.error("数据库事务已回滚", exc_info=True)
raise # 回滚后继续向上传播异常
finally:
conn.close() # 无论如何都关闭连接
# 使用
with timer("批量导入"):
with managed_db_connection("data.db") as conn:
conn.execute("INSERT INTO orders VALUES (?, ?)", (1, 100))
资源清理模式
上下文管理器在自动化脚本中有三个核心用途:
- 文件操作:
with open(path) as f— Python 内置,文件总是会被关闭。 - **网络连接:**httpx / aiohttp 的客户端都支持
async with,连接池自动管理。 - **分布式锁:**确保同一时间只有一个进程在处理某个任务,避免并发冲突。
import fcntl
from contextlib import contextmanager
from pathlib import Path
@contextmanager
def file_lock(lock_path: str | Path):
"""基于文件的进程锁,防止脚本并发运行。
使用示例:
with file_lock("/tmp/my_script.lock"):
run_task() # 同一时间只有一个进程能进入这里
"""
lock_file = Path(lock_path)
fd = lock_file.open("w")
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
yield
except BlockingIOError:
raise RuntimeError(f"脚本已在运行(锁文件:{lock_path})") from None
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
fd.close()
重试机制:tenacity 库
网络请求失败、数据库临时不可用——这类瞬态错误通过重试往往能自动恢复。tenacity 是 Python 最成熟的重试库,配置简洁而功能完整。
pip install tenacity
基础重试:@retry 装饰器
import httpx
import logging
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
logger = logging.getLogger(__name__)
@retry(
# 最多重试 4 次(第 1 次是正常调用,后续 3 次是重试)
stop=stop_after_attempt(4),
# 指数退避:第 1 次等 1 秒,第 2 次等 2 秒,第 3 次等 4 秒,最大 30 秒
wait=wait_exponential(multiplier=1, min=1, max=30),
# 只在这些异常类型时重试(其他异常直接抛出)
retry=retry_if_exception_type((
httpx.TimeoutException,
httpx.NetworkError,
)),
# 每次重试前打印日志
before_sleep=before_sleep_log(logger, logging.WARNING),
# 所有重试用尽后,重新抛出最后一次异常
reraise=True,
)
def fetch_with_retry(url: str, timeout: float = 10.0) -> dict:
"""带自动重试的 HTTP GET 请求。
Args:
url: 请求目标 URL。
timeout: 单次请求超时(秒),默认 10 秒。
Returns:
解析后的 JSON 响应体。
Raises:
httpx.TimeoutException: 4 次重试全部超时后抛出。
httpx.HTTPStatusError: 非 2xx 响应(不重试,直接抛出)。
"""
response = httpx.get(url, timeout=timeout)
response.raise_for_status() # 4xx/5xx 抛 HTTPStatusError,不在重试范围内
return response.json()
更精细的重试控制
from tenacity import retry_if_exception, RetryCallState
import httpx
def should_retry_http(exc: BaseException) -> bool:
"""仅对可重试的 HTTP 状态码重试。"""
if isinstance(exc, httpx.HTTPStatusError):
# 429 Too Many Requests 和 5xx 服务端错误才重试
# 4xx 客户端错误(除 429)不重试(你的请求有问题,重试也没用)
return exc.response.status_code == 429 or exc.response.status_code >= 500
return isinstance(exc, (httpx.TimeoutException, httpx.NetworkError))
def log_retry_attempt(retry_state: RetryCallState) -> None:
"""自定义重试日志,包含更多上下文信息。"""
logger.warning(
"重试第 %d 次(上次异常:%s)",
retry_state.attempt_number,
retry_state.outcome.exception(),
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=1, max=60),
retry=retry_if_exception(should_retry_http),
before_sleep=log_retry_attempt,
reraise=True,
)
def robust_api_call(url: str) -> dict:
"""具备精细重试控制的 API 调用。"""
response = httpx.get(url, timeout=15.0)
response.raise_for_status()
return response.json()
**注意:**重试不是银弹。如果服务器返回 400(请求格式错误),重试 10 次也是徒劳的。始终明确"哪些错误值得重试"——通常只有超时、网络抖动、限流(429)和服务端临时错误(5xx)。
实战:企业级日志模块
把本章所有知识整合成一个完整的、可直接复用的日志模块。支持:多进程安全、JSON 格式(生产)/ 易读格式(开发)、Sentry 集成预留接口。
"""
logger_setup.py — 企业级日志模块,可直接复制到项目中使用。
使用方式:
from logger_setup import setup_logger, get_logger
# 在程序入口(main.py)调用一次
setup_logger(env="production", log_file="logs/app.log")
# 在任意模块中使用
logger = get_logger(__name__)
logger.info("处理开始", extra={"task_id": "abc123"})
"""
import logging
import logging.handlers
import json
import sys
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
# ──────────────────────────────────────────────
# JSON Formatter(生产环境)
# ──────────────────────────────────────────────
class JsonFormatter(logging.Formatter):
"""结构化 JSON 日志,兼容 ELK / Datadog / CloudWatch。"""
# 标准 LogRecord 字段,不重复输出到 extra
_SKIP_FIELDS = frozenset(logging.LogRecord.__dict__.keys()) | {
"message", "asctime", "msg", "args",
}
def format(self, record: logging.LogRecord) -> str:
record.message = record.getMessage()
entry: dict = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.message,
"file": f"{record.filename}:{record.lineno}",
"pid": os.getpid(),
}
if record.exc_info:
entry["exc"] = self.formatException(record.exc_info)
# 附加 extra 字段
for k, v in record.__dict__.items():
if k not in self._SKIP_FIELDS and not k.startswith("_"):
entry[k] = v
return json.dumps(entry, ensure_ascii=False, default=str)
# ──────────────────────────────────────────────
# 人类可读 Formatter(开发环境)
# ──────────────────────────────────────────────
HUMAN_FMT = logging.Formatter(
fmt="%(asctime)s [%(levelname)-8s] %(name)s:%(lineno)d — %(message)s",
datefmt="%H:%M:%S",
)
# ──────────────────────────────────────────────
# 多进程安全的 RotatingFileHandler
# ──────────────────────────────────────────────
class SafeRotatingFileHandler(logging.handlers.RotatingFileHandler):
"""在多进程环境下安全的轮转文件 Handler。
标准 RotatingFileHandler 在多进程写入时可能损坏文件。
此版本在每次 emit 前重新打开文件,以支持多进程安全写入。
(适合低并发场景;高并发场景推荐使用集中式日志服务)
"""
def emit(self, record: logging.LogRecord) -> None:
try:
if self.shouldRollover(record):
self.doRollover()
logging.FileHandler.emit(self, record)
except Exception:
self.handleError(record)
# ──────────────────────────────────────────────
# 公开 API
# ──────────────────────────────────────────────
def setup_logger(
env: Literal["development", "production"] = "development",
log_file: str | Path | None = None,
level: int = logging.DEBUG,
max_bytes: int = 20 * 1024 * 1024, # 20 MB
backup_count: int = 7,
sentry_dsn: str | None = None,
) -> None:
"""初始化全局日志配置。在 main.py 入口处调用一次即可。
Args:
env: 运行环境。production 使用 JSON 格式;development 使用易读格式。
log_file: 日志文件路径。为 None 时只输出到控制台。
level: 根 Logger 的最低级别,默认 DEBUG(开发)。
max_bytes: 单个日志文件最大字节数。
backup_count: 保留的历史日志文件数量。
sentry_dsn: Sentry DSN 字符串。提供时自动集成 Sentry。
"""
formatter = JsonFormatter() if env == "production" else HUMAN_FMT
handlers: list[logging.Handler] = []
# 控制台输出
console = logging.StreamHandler(sys.stderr)
console.setFormatter(formatter)
console.setLevel(logging.DEBUG if env == "development" else logging.WARNING)
handlers.append(console)
# 文件输出(可选)
if log_file is not None:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
fh = SafeRotatingFileHandler(
filename=log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
fh.setFormatter(JsonFormatter()) # 文件始终用 JSON,方便后续分析
fh.setLevel(logging.DEBUG)
handlers.append(fh)
# Sentry 集成(预留接口)
if sentry_dsn:
try:
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
sentry_sdk.init(
dsn=sentry_dsn,
integrations=[
LoggingIntegration(
level=logging.INFO, # INFO 以上作为面包屑
event_level=logging.ERROR, # ERROR 以上发送 Sentry 事件
)
],
)
logging.getLogger(__name__).info("Sentry 集成已启用")
except ImportError:
logging.getLogger(__name__).warning(
"sentry_dsn 已提供但未安装 sentry-sdk,跳过 Sentry 集成。"
"运行 pip install sentry-sdk 以启用。"
)
# 应用配置
root = logging.getLogger()
root.setLevel(level)
# 清除可能存在的旧 handler(避免重复初始化时日志重复输出)
root.handlers.clear()
for h in handlers:
root.addHandler(h)
def get_logger(name: str) -> logging.Logger:
"""获取以模块路径命名的 Logger。
这是对 logging.getLogger(name) 的薄封装,
未来可在此处添加统一的过滤器或上下文注入逻辑。
"""
return logging.getLogger(name)
使用示例(main.py)
"""程序入口:初始化日志,然后运行主业务逻辑。"""
import os
from logger_setup import setup_logger, get_logger
def main() -> None:
env = os.getenv("APP_ENV", "development")
setup_logger(
env=env,
log_file="logs/automation.log",
sentry_dsn=os.getenv("SENTRY_DSN"), # 生产环境通过环境变量注入
)
logger = get_logger(__name__)
logger.info("程序启动,环境:%s", env)
try:
run_automation()
except Exception:
logger.critical("未预期的致命错误,程序退出", exc_info=True)
raise SystemExit(1)
if __name__ == "__main__":
main()
**本章总结:**健壮的自动化脚本需要三层保障——精确的异常处理(捕获你预期的,让未知的向上传播)、完整的日志记录(JSON 格式 + 结构化字段)、自动重试(只对瞬态错误重试)。把本章的日志模块作为你所有 Python 项目的起点,你会在第一次出现生产问题时感谢自己做了这个决定。
上一章
下一章
第5章:文件系统处理