第 5 章

文件系统自动化——批量处理文件就是这么简单

第5章:文件系统自动化——批量处理文件就是这么简单

文件操作是 Python 自动化里最高频、最实用的场景。整理下载文件夹、批量重命名图片、按日期归档报告、监控目录变化自动处理——这些每天都在浪费你时间的重复劳动,用 Python 几十行代码就能彻底解决。本章从现代 Python 文件操作的核心工具 pathlib 出发,逐步深入批量操作、内容处理、目录监控和压缩归档,最后以一个完整的企业文档归档系统收尾。

pathlib 深度使用:告别 os.path,拥抱现代写法

Python 3.4 引入了 pathlib 模块,提供面向对象的路径操作方式。与传统的 os.path 相比,pathlib 代码更简洁、可读性更强,是现代 Python 文件操作的首选方式。

Path 对象的常用属性与方法

pathlib 核心方法速查

from pathlib import Path

p = Path("/home/user/documents/report_2024.pdf")

# ---- 路径属性 ----
print(p.name)        # "report_2024.pdf"   完整文件名(含扩展名)
print(p.stem)        # "report_2024"       文件名(不含扩展名)
print(p.suffix)      # ".pdf"              扩展名(含点号)
print(p.suffixes)    # [".pdf"]            所有扩展名(压缩包如 .tar.gz 会有两个)
print(p.parent)      # /home/user/documents  父目录
print(p.parents[1])  # /home/user            上两级目录
print(p.parts)       # ('/', 'home', 'user', 'documents', 'report_2024.pdf')

# ---- 状态检查 ----
print(p.exists())    # True / False  路径是否存在
print(p.is_file())   # True  是否为文件
print(p.is_dir())    # False 是否为目录

# ---- 文件元数据 ----
stat = p.stat()
print(stat.st_size)   # 文件大小(字节)
print(stat.st_mtime)  # 最后修改时间(Unix 时间戳)

import datetime
mtime = datetime.datetime.fromtimestamp(stat.st_mtime)
print(mtime.strftime("%Y-%m-%d %H:%M"))  # "2024-03-15 14:30"

# ---- 路径构建 ----
# 推荐:用 / 操作符拼接路径,跨平台安全
base = Path("/home/user")
reports_dir = base / "documents" / "reports"  # Path('/home/user/documents/reports')

# 与 os.path.join 等价,但更优雅
import os
old_way = os.path.join("/home/user", "documents", "reports")

**/ 操作符的跨平台优势:**pathlib 的 / 操作符在 Windows 上会自动使用反斜杠(\),在 Linux/macOS 上使用正斜杠(/)。你的代码无需任何修改就能在不同操作系统上正确运行——这是相比 os.path.join 最重要的优势之一。

glob 和 rglob:强大的文件查找

glob 在当前目录查找,rglob 递归查找所有子目录。两者都支持通配符模式。

glob / rglob 示例

from pathlib import Path

folder = Path("/home/user/documents")

# ---- glob:当前目录查找 ----
# 查找所有 PDF 文件(不含子目录)
pdfs = list(folder.glob("*.pdf"))

# 查找名字以 "report_" 开头的所有 Excel 文件
reports = list(folder.glob("report_*.xlsx"))

# 查找所有文件(不含子目录)
all_files = [f for f in folder.glob("*") if f.is_file()]

# ---- rglob:递归查找所有子目录 ----
# 递归查找所有 .py 文件
all_py = list(folder.rglob("*.py"))

# 查找所有以 "temp_" 开头的任意格式文件(递归)
temp_files = list(folder.rglob("temp_*"))

# ---- 多扩展名查找(Python 的优雅写法)----
image_exts = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
all_images = [f for f in folder.rglob("*") if f.suffix.lower() in image_exts]

# ---- 按修改时间排序 ----
sorted_files = sorted(folder.glob("*.xlsx"), key=lambda f: f.stat().st_mtime, reverse=True)
print("最新修改的文件:", sorted_files[0].name if sorted_files else "无")

# ---- 按文件大小过滤(只找大于 1MB 的文件)----
large_files = [f for f in folder.rglob("*") if f.is_file() and f.stat().st_size > 1024 * 1024]

**rglob 性能注意:**在文件数量极大的目录(如系统根目录)上使用 rglob 会很慢。实际工作中,始终明确指定起始目录,避免从根目录开始递归。

路径基础操作

创建、删除、移动目录与文件

from pathlib import Path
import shutil

p = Path("/tmp/test_dir")

# 创建目录(parents=True 自动创建中间目录,exist_ok=True 目录已存在时不报错)
p.mkdir(parents=True, exist_ok=True)

# 创建文件
file = p / "hello.txt"
file.write_text("Hello, pathlib!", encoding="utf-8")  # 写入文本
content = file.read_text(encoding="utf-8")             # 读取文本

# 重命名 / 移动(rename 在同一文件系统内是原子操作)
new_path = p / "hello_renamed.txt"
file.rename(new_path)

# 跨文件系统移动用 shutil
shutil.move(str(new_path), "/home/user/documents/")

# 复制文件
shutil.copy2(str(new_path), "/backup/hello_renamed.txt")  # copy2 保留元数据

# 删除文件
new_path.unlink(missing_ok=True)  # missing_ok=True:文件不存在时不报错

# 删除空目录
p.rmdir()

# 删除非空目录(慎用!)
shutil.rmtree(str(p))

文件批量操作

批量重命名

批量重命名是日常自动化中最常见的需求。以下三种模式覆盖了 90% 的场景。

批量重命名:日期前缀、编号、关键词替换

from pathlib import Path
import datetime

folder = Path("/home/user/photos")

# ---- 模式1:添加日期前缀 ----
# 将 "photo.jpg" 重命名为 "2024-03-15_photo.jpg"
today = datetime.date.today().strftime("%Y-%m-%d")
for f in folder.glob("*.jpg"):
    new_name = f"{today}_{f.name}"
    f.rename(f.parent / new_name)

# ---- 模式2:顺序编号 ----
# 将文件夹内所有图片重命名为 001.jpg, 002.jpg, ...
files = sorted(folder.glob("*.jpg"))  # 排序保证编号顺序一致
for i, f in enumerate(files, start=1):
    new_name = f"{i:03d}{f.suffix}"   # 03d:三位数字,不足补零
    f.rename(f.parent / new_name)

# ---- 模式3:替换文件名中的关键词 ----
# 将所有文件名里的 " " 替换为 "_","(草稿)" 替换为 "_draft"
for f in folder.rglob("*"):
    if f.is_file():
        new_stem = f.stem.replace(" ", "_").replace("(草稿)", "_draft")
        if new_stem != f.stem:  # 只在需要时重命名,避免无谓操作
            f.rename(f.parent / (new_stem + f.suffix))

# ---- 生产级写法:先预览,再执行 ----
def preview_rename(folder: Path, pattern: str, transform_fn):
    """预览重命名效果,不实际执行"""
    for f in folder.glob(pattern):
        if f.is_file():
            new_name = transform_fn(f)
            print(f"  {f.name}
# 先预览
preview_rename(folder, "*.jpg", lambda f: f"{today}_{f.name}")

# 确认无误后取消注释执行
# for f in folder.glob("*.jpg"):
#     f.rename(f.parent / f"{today}_{f.name}")

**生产最佳实践:先预览,再执行。**批量重命名操作不可逆(除非你有备份),建议先用 print 打印出"旧名

按类型/日期/大小分类归档

按文件类型自动分类到子目录

from pathlib import Path
import shutil

# 扩展名 CATEGORY_MAP = {
    "图片": {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".bmp"},
    "文档": {".pdf", ".docx", ".doc", ".txt", ".md", ".odt"},
    "表格": {".xlsx", ".xls", ".csv", ".ods"},
    "视频": {".mp4", ".mov", ".avi", ".mkv", ".wmv"},
    "音频": {".mp3", ".wav", ".flac", ".aac", ".m4a"},
    "压缩包": {".zip", ".tar", ".gz", ".rar", ".7z"},
    "代码": {".py", ".js", ".ts", ".java", ".go", ".rs"},
}

def categorize_by_type(source_dir: Path, dry_run: bool = True) -> dict:
    """
    将 source_dir 中的文件按类型归类到子目录。
    dry_run=True 时只打印计划,不实际移动。
    返回: {分类名: [文件列表]} 的统计字典
    """
    stats = {}

    # 构建反向映射:扩展名     ext_to_category = {}
    for cat, exts in CATEGORY_MAP.items():
        for ext in exts:
            ext_to_category[ext] = cat

    for f in source_dir.iterdir():
        if not f.is_file():
            continue
        ext = f.suffix.lower()
        category = ext_to_category.get(ext, "其他")
        dest_dir = source_dir / category
        dest = dest_dir / f.name

        stats.setdefault(category, []).append(f.name)

        if not dry_run:
            dest_dir.mkdir(exist_ok=True)
            # 目标已存在时自动加编号,避免覆盖
            if dest.exists():
                stem = f.stem
                suffix = f.suffix
                counter = 1
                while dest.exists():
                    dest = dest_dir / f"{stem}_{counter}{suffix}"
                    counter += 1
            shutil.move(str(f), str(dest))
        else:
            print(f"  [预览] {f.name}
    return stats

# 使用示例
downloads = Path.home() / "Downloads"

# 第一步:预览
stats = categorize_by_type(downloads, dry_run=True)
print("\n统计汇总:")
for cat, files in stats.items():
    print(f"  {cat}: {len(files)} 个文件")

# 第二步:确认后执行
# categorize_by_type(downloads, dry_run=False)

完整实战:下载文件夹整理脚本

organize_downloads.py — 可直接运行的完整脚本

"""
organize_downloads.py
将下载文件夹按类型和日期整理归档。
用法:python organize_downloads.py [--execute]
不带 --execute 参数时只预览,不实际移动文件。
"""
import argparse
import datetime
import shutil
from pathlib import Path

CATEGORY_MAP = {
    "Images":    {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".svg"},
    "Documents": {".pdf", ".docx", ".doc", ".txt", ".md", ".pptx", ".odt"},
    "Sheets":    {".xlsx", ".xls", ".csv"},
    "Videos":    {".mp4", ".mov", ".avi", ".mkv"},
    "Archives":  {".zip", ".tar", ".gz", ".rar", ".7z"},
    "Code":      {".py", ".js", ".ts", ".html", ".css", ".json", ".yaml"},
    "Audio":     {".mp3", ".wav", ".flac", ".aac"},
}

def build_ext_map(category_map: dict) -> dict:
    ext_map = {}
    for cat, exts in category_map.items():
        for ext in exts:
            ext_map[ext] = cat
    return ext_map

def safe_dest(dest: Path) -> Path:
    """如果目标文件已存在,自动在文件名后加 _1, _2, ... 避免覆盖"""
    if not dest.exists():
        return dest
    counter = 1
    while True:
        candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
        if not candidate.exists():
            return candidate
        counter += 1

def organize(source_dir: Path, execute: bool) -> None:
    ext_map = build_ext_map(CATEGORY_MAP)
    moved = 0
    skipped = 0

    for f in sorted(source_dir.iterdir()):
        if not f.is_file():
            continue

        # 按类型确定目录
        category = ext_map.get(f.suffix.lower(), "Others")

        # 按月份子目录(如 2024-03)
        mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
        month_dir = f"{mtime.year}-{mtime.month:02d}"

        dest_dir = source_dir / category / month_dir
        dest = safe_dest(dest_dir / f.name)

        if execute:
            dest_dir.mkdir(parents=True, exist_ok=True)
            shutil.move(str(f), str(dest))
            moved += 1
        else:
            print(f"  {f.name:40s}  ->  {category}/{month_dir}/{dest.name}")
            skipped += 1

    if execute:
        print(f"\n完成:已移动 {moved} 个文件。")
    else:
        print(f"\n预览完成:共 {skipped} 个文件。使用 --execute 参数执行实际移动。")

def main():
    parser = argparse.ArgumentParser(description="整理下载文件夹")
    parser.add_argument("--dir", default=str(Path.home() / "Downloads"), help="目标目录")
    parser.add_argument("--execute", action="store_true", help="实际执行移动(默认只预览)")
    args = parser.parse_args()

    source = Path(args.dir)
    if not source.is_dir():
        print(f"错误:目录不存在:{source}")
        return

    print(f"{'执行模式' if args.execute else '预览模式'}  目录:{source}\n")
    organize(source, execute=args.execute)

if __name__ == "__main__":
    main()

文件内容处理

批量搜索替换文本文件内容

在多个文件中查找并替换文本

from pathlib import Path
import re

def batch_replace(folder: Path, pattern: str, replacement: str,
                  glob: str = "*.txt", encoding: str = "utf-8",
                  dry_run: bool = True) -> int:
    """
    在 folder 下所有匹配 glob 的文件中,将 pattern 替换为 replacement。
    支持正则表达式。返回修改的文件数。
    """
    modified_count = 0
    regex = re.compile(pattern)

    for f in folder.rglob(glob):
        if not f.is_file():
            continue
        try:
            original = f.read_text(encoding=encoding)
        except UnicodeDecodeError:
            print(f"  [跳过] 编码错误:{f}")
            continue

        new_content = regex.sub(replacement, original)
        if new_content != original:
            modified_count += 1
            match_count = len(regex.findall(original))
            if dry_run:
                print(f"  [预览] {f.relative_to(folder)}  ({match_count} 处替换)")
            else:
                # 写入前先备份
                backup = f.with_suffix(f.suffix + ".bak")
                backup.write_text(original, encoding=encoding)
                f.write_text(new_content, encoding=encoding)
                print(f"  [完成] {f.relative_to(folder)}  ({match_count} 处)")

    return modified_count

# 示例:在所有 Python 文件中把 "print(" 前加 "# TODO: " 注释
folder = Path("/home/user/project")
count = batch_replace(folder, r"^(print\()", r"# TODO: \1",
                      glob="*.py", dry_run=True)
print(f"\n共 {count} 个文件需要修改")

编码检测与批量转换

使用 chardet 检测编码并统一转为 UTF-8

from pathlib import Path
import chardet

def detect_encoding(filepath: Path) -> str:
    """检测文件编码,返回编码名称(如 'utf-8', 'gbk')"""
    raw = filepath.read_bytes()
    result = chardet.detect(raw)
    encoding = result.get("encoding") or "utf-8"
    confidence = result.get("confidence", 0)
    return encoding, confidence

def convert_to_utf8(folder: Path, glob: str = "*.txt", dry_run: bool = True) -> None:
    """
    递归扫描 folder,将非 UTF-8 文本文件统一转为 UTF-8 编码。
    安装依赖:pip install chardet
    """
    for f in folder.rglob(glob):
        if not f.is_file():
            continue
        encoding, confidence = detect_encoding(f)
        normalized = encoding.lower().replace("-", "")

        # 已经是 UTF-8 则跳过
        if normalized in ("utf8", "utf8bom", "ascii"):
            continue

        print(f"  {f.name}  检测编码:{encoding}(置信度 {confidence:.0%})")
        if not dry_run:
            try:
                text = f.read_text(encoding=encoding, errors="replace")
                # 备份原文件
                f.with_suffix(f.suffix + ".orig").write_bytes(f.read_bytes())
                f.write_text(text, encoding="utf-8")
                print(f"                except Exception as e:
                print(f"
# 使用:pip install chardet
# convert_to_utf8(Path("/home/user/legacy_docs"), dry_run=True)

文件哈希与去重

计算文件 MD5,找出并处理重复文件

import hashlib
from pathlib import Path
from collections import defaultdict

def file_md5(filepath: Path, chunk_size: int = 65536) -> str:
    """分块读取计算 MD5,适合大文件"""
    h = hashlib.md5()
    with filepath.open("rb") as f:
        while chunk := f.read(chunk_size):
            h.update(chunk)
    return h.hexdigest()

def find_duplicates(folder: Path) -> dict[str, list[Path]]:
    """
    扫描 folder 下所有文件,返回重复文件的字典。
    键为 MD5 哈希,值为具有相同哈希的文件路径列表。
    """
    hash_map: dict[str, list[Path]] = defaultdict(list)

    all_files = [f for f in folder.rglob("*") if f.is_file()]
    print(f"共扫描 {len(all_files)} 个文件...")

    for f in all_files:
        try:
            md5 = file_md5(f)
            hash_map[md5].append(f)
        except (PermissionError, OSError) as e:
            print(f"  [跳过] {f.name}: {e}")

    # 只保留有重复的条目
    return {h: paths for h, paths in hash_map.items() if len(paths) > 1}

def report_duplicates(folder: Path) -> None:
    """打印重复文件报告"""
    dupes = find_duplicates(folder)
    if not dupes:
        print("未发现重复文件。")
        return

    total_wasted = 0
    print(f"\n发现 {len(dupes)} 组重复文件:\n")
    for md5, paths in dupes.items():
        size = paths[0].stat().st_size
        wasted = size * (len(paths) - 1)
        total_wasted += wasted
        print(f"  MD5: {md5[:8]}...  大小: {size/1024:.1f} KB  重复: {len(paths)} 个")
        for p in paths:
            print(f"    {p}")

    print(f"\n合计浪费空间:{total_wasted / 1024 / 1024:.2f} MB")

# report_duplicates(Path.home() / "Downloads")

目录监控:watchdog 实时响应文件变化

watchdog 是一个跨平台的文件系统事件监控库,能够在文件创建、修改、删除、移动时自动触发回调函数。

安装

# pip install watchdog

watchdog 基础用法

import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    """继承 FileSystemEventHandler,重写你需要的事件方法"""

    def on_created(self, event):
        if not event.is_directory:
            print(f"[新文件] {event.src_path}")

    def on_modified(self, event):
        if not event.is_directory:
            print(f"[修改]   {event.src_path}")

    def on_deleted(self, event):
        print(f"[删除]   {event.src_path}")

    def on_moved(self, event):
        print(f"[移动]   {event.src_path}

def start_monitor(watch_dir: str) -> None:
    """启动目录监控,Ctrl+C 退出"""
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, watch_dir, recursive=True)
    observer.start()
    print(f"开始监控:{watch_dir}  (Ctrl+C 退出)")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# start_monitor("/home/user/watch_folder")

实战:新文件入库自动触发处理流程

auto_ingest.py — 监控目录,新文件自动归档并记录日志

"""
auto_ingest.py
监控 inbox/ 目录,新文件到达时自动:
1. 按类型分类
2. 移动到 archive/ 对应子目录
3. 记录处理日志
"""
import logging
import shutil
import time
from datetime import datetime
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# 配置
INBOX_DIR = Path("/home/user/inbox")
ARCHIVE_DIR = Path("/home/user/archive")
LOG_FILE = Path("/home/user/archive/ingest.log")

CATEGORY_MAP = {
    "pdf":    {".pdf"},
    "excel":  {".xlsx", ".xls", ".csv"},
    "image":  {".jpg", ".jpeg", ".png", ".gif"},
    "other":  set(),  # 兜底分类
}

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(LOG_FILE, encoding="utf-8"),
        logging.StreamHandler(),
    ],
)
log = logging.getLogger(__name__)

def ext_to_category(suffix: str) -> str:
    for cat, exts in CATEGORY_MAP.items():
        if suffix.lower() in exts:
            return cat
    return "other"

class IngestHandler(FileSystemEventHandler):

    def on_created(self, event):
        if event.is_directory:
            return
        src = Path(event.src_path)
        # 等待文件写入完成(简单策略:等 0.5 秒)
        time.sleep(0.5)
        if not src.exists():
            return
        self._process(src)

    def _process(self, src: Path) -> None:
        category = ext_to_category(src.suffix)
        date_str = datetime.now().strftime("%Y-%m-%d")
        dest_dir = ARCHIVE_DIR / category / date_str
        dest_dir.mkdir(parents=True, exist_ok=True)

        # 避免覆盖
        dest = dest_dir / src.name
        counter = 1
        while dest.exists():
            dest = dest_dir / f"{src.stem}_{counter}{src.suffix}"
            counter += 1

        shutil.move(str(src), str(dest))
        log.info(f"已归档: {src.name}

def main():
    INBOX_DIR.mkdir(parents=True, exist_ok=True)
    ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)

    handler = IngestHandler()
    observer = Observer()
    observer.schedule(handler, str(INBOX_DIR), recursive=False)
    observer.start()
    log.info(f"自动入库服务已启动,监控目录:{INBOX_DIR}")

    try:
        while True:
            time.sleep(2)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
    log.info("服务已停止。")

if __name__ == "__main__":
    main()

压缩与归档:zipfile / tarfile

zipfile 操作

创建、读取、解压 ZIP 文件

import zipfile
from pathlib import Path

# ---- 创建 ZIP ----
def zip_folder(source_dir: Path, output_zip: Path) -> None:
    """将整个目录打包为 ZIP,保留目录结构"""
    with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for f in source_dir.rglob("*"):
            if f.is_file():
                # arcname 是 ZIP 内的相对路径
                zf.write(f, arcname=f.relative_to(source_dir.parent))
    print(f"已打包:{output_zip}  ({output_zip.stat().st_size / 1024:.1f} KB)")

# ---- 查看 ZIP 内容 ----
def list_zip(zip_path: Path) -> None:
    with zipfile.ZipFile(zip_path, "r") as zf:
        for info in zf.infolist():
            print(f"  {info.filename:40s}  {info.file_size:>10,} bytes")

# ---- 解压 ZIP ----
def extract_zip(zip_path: Path, dest_dir: Path) -> None:
    dest_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as zf:
        zf.extractall(dest_dir)
    print(f"已解压到:{dest_dir}")

# 使用示例
# zip_folder(Path("/home/user/project"), Path("/home/user/project_backup.zip"))

自动化备份脚本(保留最近 N 份)

auto_backup.py — 定时备份,自动清理旧备份

"""
auto_backup.py
备份指定目录,保留最近 N 份,自动删除旧备份。
用法:python auto_backup.py
或配合 cron / Task Scheduler 定时运行。
"""
import zipfile
import datetime
from pathlib import Path

SOURCE_DIR = Path("/home/user/important_data")   # 要备份的目录
BACKUP_DIR = Path("/home/user/backups")          # 备份存放目录
KEEP_LAST_N = 5                                   # 保留最近 N 份备份

def create_backup(source: Path, backup_dir: Path) -> Path:
    """创建带时间戳的 ZIP 备份,返回备份文件路径"""
    backup_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_name = f"{source.name}_{timestamp}.zip"
    backup_path = backup_dir / backup_name

    with zipfile.ZipFile(backup_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for f in source.rglob("*"):
            if f.is_file():
                zf.write(f, arcname=f.relative_to(source.parent))

    size_mb = backup_path.stat().st_size / 1024 / 1024
    print(f"备份完成:{backup_path.name}  ({size_mb:.2f} MB)")
    return backup_path

def cleanup_old_backups(backup_dir: Path, prefix: str, keep_n: int) -> None:
    """删除旧备份,只保留最新的 keep_n 个"""
    backups = sorted(backup_dir.glob(f"{prefix}_*.zip"))
    to_delete = backups[:-keep_n] if len(backups) > keep_n else []
    for old in to_delete:
        old.unlink()
        print(f"已删除旧备份:{old.name}")

def main():
    if not SOURCE_DIR.exists():
        print(f"错误:源目录不存在:{SOURCE_DIR}")
        return

    create_backup(SOURCE_DIR, BACKUP_DIR)
    cleanup_old_backups(BACKUP_DIR, prefix=SOURCE_DIR.name, keep_n=KEEP_LAST_N)

    # 打印当前备份列表
    backups = sorted(BACKUP_DIR.glob(f"{SOURCE_DIR.name}_*.zip"))
    print(f"\n当前保留 {len(backups)} 份备份:")
    for b in backups:
        print(f"  {b.name}")

if __name__ == "__main__":
    main()

实战项目:企业文档归档系统

整合本章所有技术,构建一个可以在真实企业环境中使用的文档归档系统:按年月和部门自动分类,生成索引报告

enterprise_archiver.py — 完整企业文档归档系统

"""
enterprise_archiver.py

功能:
  1. 扫描源目录下所有文件
  2. 按文件修改日期的年月(YYYY/MM)和文件类型分类归档
  3. 支持自定义部门前缀规则(文件名前缀   4. 生成 CSV 格式的归档索引报告
  5. 干运行(dry_run)模式安全预览

用法:
  python enterprise_archiver.py --source /share/incoming --dest /archive --execute
"""
import argparse
import csv
import datetime
import shutil
from pathlib import Path

# 文件名前缀 DEPT_PREFIX_MAP = {
    "HR_":      "人力资源",
    "FIN_":     "财务",
    "IT_":      "信息技术",
    "MKT_":     "市场",
    "SALES_":   "销售",
    "LEGAL_":   "法务",
}

CATEGORY_MAP = {
    "文档":  {".pdf", ".docx", ".doc", ".txt", ".md", ".pptx"},
    "表格":  {".xlsx", ".xls", ".csv"},
    "图片":  {".jpg", ".jpeg", ".png", ".gif", ".webp"},
    "压缩":  {".zip", ".rar", ".7z", ".tar", ".gz"},
}

def get_dept(filename: str) -> str:
    for prefix, dept in DEPT_PREFIX_MAP.items():
        if filename.upper().startswith(prefix.upper()):
            return dept
    return "通用"

def get_category(suffix: str) -> str:
    for cat, exts in CATEGORY_MAP.items():
        if suffix.lower() in exts:
            return cat
    return "其他"

def safe_dest(dest: Path) -> Path:
    if not dest.exists():
        return dest
    counter = 1
    while True:
        candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
        if not candidate.exists():
            return candidate
        counter += 1

def archive(source_dir: Path, dest_dir: Path, execute: bool) -> list[dict]:
    """
    执行归档,返回操作记录列表(用于生成报告)。
    """
    records = []

    for f in sorted(source_dir.rglob("*")):
        if not f.is_file():
            continue

        mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
        year_month = f"{mtime.year}/{mtime.month:02d}"
        dept = get_dept(f.name)
        category = get_category(f.suffix)

        dest_path = dest_dir / year_month / dept / category
        final = safe_dest(dest_path / f.name)

        record = {
            "源文件": str(f),
            "目标路径": str(final.relative_to(dest_dir)),
            "部门": dept,
            "类型": category,
            "年月": year_month,
            "大小(KB)": round(f.stat().st_size / 1024, 1),
            "修改时间": mtime.strftime("%Y-%m-%d %H:%M"),
            "状态": "计划" if not execute else "已归档",
        }
        records.append(record)

        if execute:
            dest_path.mkdir(parents=True, exist_ok=True)
            shutil.copy2(str(f), str(final))
            record["状态"] = "已归档"
            print(f"  [归档] {f.name}          else:
            print(f"  [预览] {f.name}
    return records

def generate_report(records: list[dict], report_path: Path) -> None:
    """将归档记录输出为 CSV 报告"""
    if not records:
        return
    fieldnames = list(records[0].keys())
    with report_path.open("w", newline="", encoding="utf-8-sig") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(records)
    print(f"\n归档报告已生成:{report_path}")

def print_summary(records: list[dict]) -> None:
    """打印汇总统计"""
    from collections import Counter
    total = len(records)
    dept_count = Counter(r["部门"] for r in records)
    type_count = Counter(r["类型"] for r in records)
    total_size = sum(r["大小(KB)"] for r in records)

    print(f"\n{'='*50}")
    print(f"文件总数:{total}  总大小:{total_size:.1f} KB")
    print("按部门:", dict(dept_count))
    print("按类型:", dict(type_count))

def main():
    parser = argparse.ArgumentParser(description="企业文档归档系统")
    parser.add_argument("--source", required=True, help="源目录(待归档文件所在目录)")
    parser.add_argument("--dest",   required=True, help="归档目标目录")
    parser.add_argument("--execute", action="store_true", help="执行归档(默认只预览)")
    args = parser.parse_args()

    source_dir = Path(args.source)
    dest_dir   = Path(args.dest)

    if not source_dir.is_dir():
        print(f"错误:源目录不存在:{source_dir}")
        return

    mode = "执行归档" if args.execute else "预览模式(不实际移动文件)"
    print(f"{mode}\n源:{source_dir}\n目标:{dest_dir}\n")

    records = archive(source_dir, dest_dir, execute=args.execute)
    print_summary(records)

    # 生成报告
    report_name = f"archive_report_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    report_path = dest_dir / report_name if args.execute else Path(".") / report_name
    if records:
        generate_report(records, report_path)

if __name__ == "__main__":
    main()

**本章小结:**pathlib 是现代 Python 文件操作的核心,/ 操作符让路径拼接跨平台且清晰。批量操作务必先 dry_run 预览。watchdog 实现实时响应。zipfile 处理压缩归档。这套工具组合足以应对企业级文档管理的绝大多数需求。

上一章

下一章
第6章:Python 操作 Excel
本章评分
4.9  / 5  (57 评分)

💬 留言讨论