文件系统自动化——批量处理文件就是这么简单
第5章:文件系统自动化——批量处理文件就是这么简单
文件操作是 Python 自动化里最高频、最实用的场景。整理下载文件夹、批量重命名图片、按日期归档报告、监控目录变化自动处理——这些每天都在浪费你时间的重复劳动,用 Python 几十行代码就能彻底解决。本章从现代 Python 文件操作的核心工具 pathlib 出发,逐步深入批量操作、内容处理、目录监控和压缩归档,最后以一个完整的企业文档归档系统收尾。
pathlib 深度使用:告别 os.path,拥抱现代写法
Python 3.4 引入了 pathlib 模块,提供面向对象的路径操作方式。与传统的 os.path 相比,pathlib 代码更简洁、可读性更强,是现代 Python 文件操作的首选方式。
Path 对象的常用属性与方法
pathlib 核心方法速查
from pathlib import Path
p = Path("/home/user/documents/report_2024.pdf")
# ---- 路径属性 ----
print(p.name) # "report_2024.pdf" 完整文件名(含扩展名)
print(p.stem) # "report_2024" 文件名(不含扩展名)
print(p.suffix) # ".pdf" 扩展名(含点号)
print(p.suffixes) # [".pdf"] 所有扩展名(压缩包如 .tar.gz 会有两个)
print(p.parent) # /home/user/documents 父目录
print(p.parents[1]) # /home/user 上两级目录
print(p.parts) # ('/', 'home', 'user', 'documents', 'report_2024.pdf')
# ---- 状态检查 ----
print(p.exists()) # True / False 路径是否存在
print(p.is_file()) # True 是否为文件
print(p.is_dir()) # False 是否为目录
# ---- 文件元数据 ----
stat = p.stat()
print(stat.st_size) # 文件大小(字节)
print(stat.st_mtime) # 最后修改时间(Unix 时间戳)
import datetime
mtime = datetime.datetime.fromtimestamp(stat.st_mtime)
print(mtime.strftime("%Y-%m-%d %H:%M")) # "2024-03-15 14:30"
# ---- 路径构建 ----
# 推荐:用 / 操作符拼接路径,跨平台安全
base = Path("/home/user")
reports_dir = base / "documents" / "reports" # Path('/home/user/documents/reports')
# 与 os.path.join 等价,但更优雅
import os
old_way = os.path.join("/home/user", "documents", "reports")
**/ 操作符的跨平台优势:**pathlib 的 / 操作符在 Windows 上会自动使用反斜杠(\),在 Linux/macOS 上使用正斜杠(/)。你的代码无需任何修改就能在不同操作系统上正确运行——这是相比 os.path.join 最重要的优势之一。
glob 和 rglob:强大的文件查找
glob 在当前目录查找,rglob 递归查找所有子目录。两者都支持通配符模式。
glob / rglob 示例
from pathlib import Path
folder = Path("/home/user/documents")
# ---- glob:当前目录查找 ----
# 查找所有 PDF 文件(不含子目录)
pdfs = list(folder.glob("*.pdf"))
# 查找名字以 "report_" 开头的所有 Excel 文件
reports = list(folder.glob("report_*.xlsx"))
# 查找所有文件(不含子目录)
all_files = [f for f in folder.glob("*") if f.is_file()]
# ---- rglob:递归查找所有子目录 ----
# 递归查找所有 .py 文件
all_py = list(folder.rglob("*.py"))
# 查找所有以 "temp_" 开头的任意格式文件(递归)
temp_files = list(folder.rglob("temp_*"))
# ---- 多扩展名查找(Python 的优雅写法)----
image_exts = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
all_images = [f for f in folder.rglob("*") if f.suffix.lower() in image_exts]
# ---- 按修改时间排序 ----
sorted_files = sorted(folder.glob("*.xlsx"), key=lambda f: f.stat().st_mtime, reverse=True)
print("最新修改的文件:", sorted_files[0].name if sorted_files else "无")
# ---- 按文件大小过滤(只找大于 1MB 的文件)----
large_files = [f for f in folder.rglob("*") if f.is_file() and f.stat().st_size > 1024 * 1024]
**rglob 性能注意:**在文件数量极大的目录(如系统根目录)上使用 rglob 会很慢。实际工作中,始终明确指定起始目录,避免从根目录开始递归。
路径基础操作
创建、删除、移动目录与文件
from pathlib import Path
import shutil
p = Path("/tmp/test_dir")
# 创建目录(parents=True 自动创建中间目录,exist_ok=True 目录已存在时不报错)
p.mkdir(parents=True, exist_ok=True)
# 创建文件
file = p / "hello.txt"
file.write_text("Hello, pathlib!", encoding="utf-8") # 写入文本
content = file.read_text(encoding="utf-8") # 读取文本
# 重命名 / 移动(rename 在同一文件系统内是原子操作)
new_path = p / "hello_renamed.txt"
file.rename(new_path)
# 跨文件系统移动用 shutil
shutil.move(str(new_path), "/home/user/documents/")
# 复制文件
shutil.copy2(str(new_path), "/backup/hello_renamed.txt") # copy2 保留元数据
# 删除文件
new_path.unlink(missing_ok=True) # missing_ok=True:文件不存在时不报错
# 删除空目录
p.rmdir()
# 删除非空目录(慎用!)
shutil.rmtree(str(p))
文件批量操作
批量重命名
批量重命名是日常自动化中最常见的需求。以下三种模式覆盖了 90% 的场景。
批量重命名:日期前缀、编号、关键词替换
from pathlib import Path
import datetime
folder = Path("/home/user/photos")
# ---- 模式1:添加日期前缀 ----
# 将 "photo.jpg" 重命名为 "2024-03-15_photo.jpg"
today = datetime.date.today().strftime("%Y-%m-%d")
for f in folder.glob("*.jpg"):
new_name = f"{today}_{f.name}"
f.rename(f.parent / new_name)
# ---- 模式2:顺序编号 ----
# 将文件夹内所有图片重命名为 001.jpg, 002.jpg, ...
files = sorted(folder.glob("*.jpg")) # 排序保证编号顺序一致
for i, f in enumerate(files, start=1):
new_name = f"{i:03d}{f.suffix}" # 03d:三位数字,不足补零
f.rename(f.parent / new_name)
# ---- 模式3:替换文件名中的关键词 ----
# 将所有文件名里的 " " 替换为 "_","(草稿)" 替换为 "_draft"
for f in folder.rglob("*"):
if f.is_file():
new_stem = f.stem.replace(" ", "_").replace("(草稿)", "_draft")
if new_stem != f.stem: # 只在需要时重命名,避免无谓操作
f.rename(f.parent / (new_stem + f.suffix))
# ---- 生产级写法:先预览,再执行 ----
def preview_rename(folder: Path, pattern: str, transform_fn):
"""预览重命名效果,不实际执行"""
for f in folder.glob(pattern):
if f.is_file():
new_name = transform_fn(f)
print(f" {f.name}
# 先预览
preview_rename(folder, "*.jpg", lambda f: f"{today}_{f.name}")
# 确认无误后取消注释执行
# for f in folder.glob("*.jpg"):
# f.rename(f.parent / f"{today}_{f.name}")
**生产最佳实践:先预览,再执行。**批量重命名操作不可逆(除非你有备份),建议先用 print 打印出"旧名
按类型/日期/大小分类归档
按文件类型自动分类到子目录
from pathlib import Path
import shutil
# 扩展名 CATEGORY_MAP = {
"图片": {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".bmp"},
"文档": {".pdf", ".docx", ".doc", ".txt", ".md", ".odt"},
"表格": {".xlsx", ".xls", ".csv", ".ods"},
"视频": {".mp4", ".mov", ".avi", ".mkv", ".wmv"},
"音频": {".mp3", ".wav", ".flac", ".aac", ".m4a"},
"压缩包": {".zip", ".tar", ".gz", ".rar", ".7z"},
"代码": {".py", ".js", ".ts", ".java", ".go", ".rs"},
}
def categorize_by_type(source_dir: Path, dry_run: bool = True) -> dict:
"""
将 source_dir 中的文件按类型归类到子目录。
dry_run=True 时只打印计划,不实际移动。
返回: {分类名: [文件列表]} 的统计字典
"""
stats = {}
# 构建反向映射:扩展名 ext_to_category = {}
for cat, exts in CATEGORY_MAP.items():
for ext in exts:
ext_to_category[ext] = cat
for f in source_dir.iterdir():
if not f.is_file():
continue
ext = f.suffix.lower()
category = ext_to_category.get(ext, "其他")
dest_dir = source_dir / category
dest = dest_dir / f.name
stats.setdefault(category, []).append(f.name)
if not dry_run:
dest_dir.mkdir(exist_ok=True)
# 目标已存在时自动加编号,避免覆盖
if dest.exists():
stem = f.stem
suffix = f.suffix
counter = 1
while dest.exists():
dest = dest_dir / f"{stem}_{counter}{suffix}"
counter += 1
shutil.move(str(f), str(dest))
else:
print(f" [预览] {f.name}
return stats
# 使用示例
downloads = Path.home() / "Downloads"
# 第一步:预览
stats = categorize_by_type(downloads, dry_run=True)
print("\n统计汇总:")
for cat, files in stats.items():
print(f" {cat}: {len(files)} 个文件")
# 第二步:确认后执行
# categorize_by_type(downloads, dry_run=False)
完整实战:下载文件夹整理脚本
organize_downloads.py — 可直接运行的完整脚本
"""
organize_downloads.py
将下载文件夹按类型和日期整理归档。
用法:python organize_downloads.py [--execute]
不带 --execute 参数时只预览,不实际移动文件。
"""
import argparse
import datetime
import shutil
from pathlib import Path
CATEGORY_MAP = {
"Images": {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".svg"},
"Documents": {".pdf", ".docx", ".doc", ".txt", ".md", ".pptx", ".odt"},
"Sheets": {".xlsx", ".xls", ".csv"},
"Videos": {".mp4", ".mov", ".avi", ".mkv"},
"Archives": {".zip", ".tar", ".gz", ".rar", ".7z"},
"Code": {".py", ".js", ".ts", ".html", ".css", ".json", ".yaml"},
"Audio": {".mp3", ".wav", ".flac", ".aac"},
}
def build_ext_map(category_map: dict) -> dict:
ext_map = {}
for cat, exts in category_map.items():
for ext in exts:
ext_map[ext] = cat
return ext_map
def safe_dest(dest: Path) -> Path:
"""如果目标文件已存在,自动在文件名后加 _1, _2, ... 避免覆盖"""
if not dest.exists():
return dest
counter = 1
while True:
candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
if not candidate.exists():
return candidate
counter += 1
def organize(source_dir: Path, execute: bool) -> None:
ext_map = build_ext_map(CATEGORY_MAP)
moved = 0
skipped = 0
for f in sorted(source_dir.iterdir()):
if not f.is_file():
continue
# 按类型确定目录
category = ext_map.get(f.suffix.lower(), "Others")
# 按月份子目录(如 2024-03)
mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
month_dir = f"{mtime.year}-{mtime.month:02d}"
dest_dir = source_dir / category / month_dir
dest = safe_dest(dest_dir / f.name)
if execute:
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(f), str(dest))
moved += 1
else:
print(f" {f.name:40s} -> {category}/{month_dir}/{dest.name}")
skipped += 1
if execute:
print(f"\n完成:已移动 {moved} 个文件。")
else:
print(f"\n预览完成:共 {skipped} 个文件。使用 --execute 参数执行实际移动。")
def main():
parser = argparse.ArgumentParser(description="整理下载文件夹")
parser.add_argument("--dir", default=str(Path.home() / "Downloads"), help="目标目录")
parser.add_argument("--execute", action="store_true", help="实际执行移动(默认只预览)")
args = parser.parse_args()
source = Path(args.dir)
if not source.is_dir():
print(f"错误:目录不存在:{source}")
return
print(f"{'执行模式' if args.execute else '预览模式'} 目录:{source}\n")
organize(source, execute=args.execute)
if __name__ == "__main__":
main()
文件内容处理
批量搜索替换文本文件内容
在多个文件中查找并替换文本
from pathlib import Path
import re
def batch_replace(folder: Path, pattern: str, replacement: str,
glob: str = "*.txt", encoding: str = "utf-8",
dry_run: bool = True) -> int:
"""
在 folder 下所有匹配 glob 的文件中,将 pattern 替换为 replacement。
支持正则表达式。返回修改的文件数。
"""
modified_count = 0
regex = re.compile(pattern)
for f in folder.rglob(glob):
if not f.is_file():
continue
try:
original = f.read_text(encoding=encoding)
except UnicodeDecodeError:
print(f" [跳过] 编码错误:{f}")
continue
new_content = regex.sub(replacement, original)
if new_content != original:
modified_count += 1
match_count = len(regex.findall(original))
if dry_run:
print(f" [预览] {f.relative_to(folder)} ({match_count} 处替换)")
else:
# 写入前先备份
backup = f.with_suffix(f.suffix + ".bak")
backup.write_text(original, encoding=encoding)
f.write_text(new_content, encoding=encoding)
print(f" [完成] {f.relative_to(folder)} ({match_count} 处)")
return modified_count
# 示例:在所有 Python 文件中把 "print(" 前加 "# TODO: " 注释
folder = Path("/home/user/project")
count = batch_replace(folder, r"^(print\()", r"# TODO: \1",
glob="*.py", dry_run=True)
print(f"\n共 {count} 个文件需要修改")
编码检测与批量转换
使用 chardet 检测编码并统一转为 UTF-8
from pathlib import Path
import chardet
def detect_encoding(filepath: Path) -> str:
"""检测文件编码,返回编码名称(如 'utf-8', 'gbk')"""
raw = filepath.read_bytes()
result = chardet.detect(raw)
encoding = result.get("encoding") or "utf-8"
confidence = result.get("confidence", 0)
return encoding, confidence
def convert_to_utf8(folder: Path, glob: str = "*.txt", dry_run: bool = True) -> None:
"""
递归扫描 folder,将非 UTF-8 文本文件统一转为 UTF-8 编码。
安装依赖:pip install chardet
"""
for f in folder.rglob(glob):
if not f.is_file():
continue
encoding, confidence = detect_encoding(f)
normalized = encoding.lower().replace("-", "")
# 已经是 UTF-8 则跳过
if normalized in ("utf8", "utf8bom", "ascii"):
continue
print(f" {f.name} 检测编码:{encoding}(置信度 {confidence:.0%})")
if not dry_run:
try:
text = f.read_text(encoding=encoding, errors="replace")
# 备份原文件
f.with_suffix(f.suffix + ".orig").write_bytes(f.read_bytes())
f.write_text(text, encoding="utf-8")
print(f" except Exception as e:
print(f"
# 使用:pip install chardet
# convert_to_utf8(Path("/home/user/legacy_docs"), dry_run=True)
文件哈希与去重
计算文件 MD5,找出并处理重复文件
import hashlib
from pathlib import Path
from collections import defaultdict
def file_md5(filepath: Path, chunk_size: int = 65536) -> str:
"""分块读取计算 MD5,适合大文件"""
h = hashlib.md5()
with filepath.open("rb") as f:
while chunk := f.read(chunk_size):
h.update(chunk)
return h.hexdigest()
def find_duplicates(folder: Path) -> dict[str, list[Path]]:
"""
扫描 folder 下所有文件,返回重复文件的字典。
键为 MD5 哈希,值为具有相同哈希的文件路径列表。
"""
hash_map: dict[str, list[Path]] = defaultdict(list)
all_files = [f for f in folder.rglob("*") if f.is_file()]
print(f"共扫描 {len(all_files)} 个文件...")
for f in all_files:
try:
md5 = file_md5(f)
hash_map[md5].append(f)
except (PermissionError, OSError) as e:
print(f" [跳过] {f.name}: {e}")
# 只保留有重复的条目
return {h: paths for h, paths in hash_map.items() if len(paths) > 1}
def report_duplicates(folder: Path) -> None:
"""打印重复文件报告"""
dupes = find_duplicates(folder)
if not dupes:
print("未发现重复文件。")
return
total_wasted = 0
print(f"\n发现 {len(dupes)} 组重复文件:\n")
for md5, paths in dupes.items():
size = paths[0].stat().st_size
wasted = size * (len(paths) - 1)
total_wasted += wasted
print(f" MD5: {md5[:8]}... 大小: {size/1024:.1f} KB 重复: {len(paths)} 个")
for p in paths:
print(f" {p}")
print(f"\n合计浪费空间:{total_wasted / 1024 / 1024:.2f} MB")
# report_duplicates(Path.home() / "Downloads")
目录监控:watchdog 实时响应文件变化
watchdog 是一个跨平台的文件系统事件监控库,能够在文件创建、修改、删除、移动时自动触发回调函数。
安装
# pip install watchdog
watchdog 基础用法
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
"""继承 FileSystemEventHandler,重写你需要的事件方法"""
def on_created(self, event):
if not event.is_directory:
print(f"[新文件] {event.src_path}")
def on_modified(self, event):
if not event.is_directory:
print(f"[修改] {event.src_path}")
def on_deleted(self, event):
print(f"[删除] {event.src_path}")
def on_moved(self, event):
print(f"[移动] {event.src_path}
def start_monitor(watch_dir: str) -> None:
"""启动目录监控,Ctrl+C 退出"""
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, watch_dir, recursive=True)
observer.start()
print(f"开始监控:{watch_dir} (Ctrl+C 退出)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# start_monitor("/home/user/watch_folder")
实战:新文件入库自动触发处理流程
auto_ingest.py — 监控目录,新文件自动归档并记录日志
"""
auto_ingest.py
监控 inbox/ 目录,新文件到达时自动:
1. 按类型分类
2. 移动到 archive/ 对应子目录
3. 记录处理日志
"""
import logging
import shutil
import time
from datetime import datetime
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 配置
INBOX_DIR = Path("/home/user/inbox")
ARCHIVE_DIR = Path("/home/user/archive")
LOG_FILE = Path("/home/user/archive/ingest.log")
CATEGORY_MAP = {
"pdf": {".pdf"},
"excel": {".xlsx", ".xls", ".csv"},
"image": {".jpg", ".jpeg", ".png", ".gif"},
"other": set(), # 兜底分类
}
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(),
],
)
log = logging.getLogger(__name__)
def ext_to_category(suffix: str) -> str:
for cat, exts in CATEGORY_MAP.items():
if suffix.lower() in exts:
return cat
return "other"
class IngestHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
return
src = Path(event.src_path)
# 等待文件写入完成(简单策略:等 0.5 秒)
time.sleep(0.5)
if not src.exists():
return
self._process(src)
def _process(self, src: Path) -> None:
category = ext_to_category(src.suffix)
date_str = datetime.now().strftime("%Y-%m-%d")
dest_dir = ARCHIVE_DIR / category / date_str
dest_dir.mkdir(parents=True, exist_ok=True)
# 避免覆盖
dest = dest_dir / src.name
counter = 1
while dest.exists():
dest = dest_dir / f"{src.stem}_{counter}{src.suffix}"
counter += 1
shutil.move(str(src), str(dest))
log.info(f"已归档: {src.name}
def main():
INBOX_DIR.mkdir(parents=True, exist_ok=True)
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, str(INBOX_DIR), recursive=False)
observer.start()
log.info(f"自动入库服务已启动,监控目录:{INBOX_DIR}")
try:
while True:
time.sleep(2)
except KeyboardInterrupt:
observer.stop()
observer.join()
log.info("服务已停止。")
if __name__ == "__main__":
main()
压缩与归档:zipfile / tarfile
zipfile 操作
创建、读取、解压 ZIP 文件
import zipfile
from pathlib import Path
# ---- 创建 ZIP ----
def zip_folder(source_dir: Path, output_zip: Path) -> None:
"""将整个目录打包为 ZIP,保留目录结构"""
with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for f in source_dir.rglob("*"):
if f.is_file():
# arcname 是 ZIP 内的相对路径
zf.write(f, arcname=f.relative_to(source_dir.parent))
print(f"已打包:{output_zip} ({output_zip.stat().st_size / 1024:.1f} KB)")
# ---- 查看 ZIP 内容 ----
def list_zip(zip_path: Path) -> None:
with zipfile.ZipFile(zip_path, "r") as zf:
for info in zf.infolist():
print(f" {info.filename:40s} {info.file_size:>10,} bytes")
# ---- 解压 ZIP ----
def extract_zip(zip_path: Path, dest_dir: Path) -> None:
dest_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(dest_dir)
print(f"已解压到:{dest_dir}")
# 使用示例
# zip_folder(Path("/home/user/project"), Path("/home/user/project_backup.zip"))
自动化备份脚本(保留最近 N 份)
auto_backup.py — 定时备份,自动清理旧备份
"""
auto_backup.py
备份指定目录,保留最近 N 份,自动删除旧备份。
用法:python auto_backup.py
或配合 cron / Task Scheduler 定时运行。
"""
import zipfile
import datetime
from pathlib import Path
SOURCE_DIR = Path("/home/user/important_data") # 要备份的目录
BACKUP_DIR = Path("/home/user/backups") # 备份存放目录
KEEP_LAST_N = 5 # 保留最近 N 份备份
def create_backup(source: Path, backup_dir: Path) -> Path:
"""创建带时间戳的 ZIP 备份,返回备份文件路径"""
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{source.name}_{timestamp}.zip"
backup_path = backup_dir / backup_name
with zipfile.ZipFile(backup_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for f in source.rglob("*"):
if f.is_file():
zf.write(f, arcname=f.relative_to(source.parent))
size_mb = backup_path.stat().st_size / 1024 / 1024
print(f"备份完成:{backup_path.name} ({size_mb:.2f} MB)")
return backup_path
def cleanup_old_backups(backup_dir: Path, prefix: str, keep_n: int) -> None:
"""删除旧备份,只保留最新的 keep_n 个"""
backups = sorted(backup_dir.glob(f"{prefix}_*.zip"))
to_delete = backups[:-keep_n] if len(backups) > keep_n else []
for old in to_delete:
old.unlink()
print(f"已删除旧备份:{old.name}")
def main():
if not SOURCE_DIR.exists():
print(f"错误:源目录不存在:{SOURCE_DIR}")
return
create_backup(SOURCE_DIR, BACKUP_DIR)
cleanup_old_backups(BACKUP_DIR, prefix=SOURCE_DIR.name, keep_n=KEEP_LAST_N)
# 打印当前备份列表
backups = sorted(BACKUP_DIR.glob(f"{SOURCE_DIR.name}_*.zip"))
print(f"\n当前保留 {len(backups)} 份备份:")
for b in backups:
print(f" {b.name}")
if __name__ == "__main__":
main()
实战项目:企业文档归档系统
整合本章所有技术,构建一个可以在真实企业环境中使用的文档归档系统:按年月和部门自动分类,生成索引报告。
enterprise_archiver.py — 完整企业文档归档系统
"""
enterprise_archiver.py
功能:
1. 扫描源目录下所有文件
2. 按文件修改日期的年月(YYYY/MM)和文件类型分类归档
3. 支持自定义部门前缀规则(文件名前缀 4. 生成 CSV 格式的归档索引报告
5. 干运行(dry_run)模式安全预览
用法:
python enterprise_archiver.py --source /share/incoming --dest /archive --execute
"""
import argparse
import csv
import datetime
import shutil
from pathlib import Path
# 文件名前缀 DEPT_PREFIX_MAP = {
"HR_": "人力资源",
"FIN_": "财务",
"IT_": "信息技术",
"MKT_": "市场",
"SALES_": "销售",
"LEGAL_": "法务",
}
CATEGORY_MAP = {
"文档": {".pdf", ".docx", ".doc", ".txt", ".md", ".pptx"},
"表格": {".xlsx", ".xls", ".csv"},
"图片": {".jpg", ".jpeg", ".png", ".gif", ".webp"},
"压缩": {".zip", ".rar", ".7z", ".tar", ".gz"},
}
def get_dept(filename: str) -> str:
for prefix, dept in DEPT_PREFIX_MAP.items():
if filename.upper().startswith(prefix.upper()):
return dept
return "通用"
def get_category(suffix: str) -> str:
for cat, exts in CATEGORY_MAP.items():
if suffix.lower() in exts:
return cat
return "其他"
def safe_dest(dest: Path) -> Path:
if not dest.exists():
return dest
counter = 1
while True:
candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
if not candidate.exists():
return candidate
counter += 1
def archive(source_dir: Path, dest_dir: Path, execute: bool) -> list[dict]:
"""
执行归档,返回操作记录列表(用于生成报告)。
"""
records = []
for f in sorted(source_dir.rglob("*")):
if not f.is_file():
continue
mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
year_month = f"{mtime.year}/{mtime.month:02d}"
dept = get_dept(f.name)
category = get_category(f.suffix)
dest_path = dest_dir / year_month / dept / category
final = safe_dest(dest_path / f.name)
record = {
"源文件": str(f),
"目标路径": str(final.relative_to(dest_dir)),
"部门": dept,
"类型": category,
"年月": year_month,
"大小(KB)": round(f.stat().st_size / 1024, 1),
"修改时间": mtime.strftime("%Y-%m-%d %H:%M"),
"状态": "计划" if not execute else "已归档",
}
records.append(record)
if execute:
dest_path.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(f), str(final))
record["状态"] = "已归档"
print(f" [归档] {f.name} else:
print(f" [预览] {f.name}
return records
def generate_report(records: list[dict], report_path: Path) -> None:
"""将归档记录输出为 CSV 报告"""
if not records:
return
fieldnames = list(records[0].keys())
with report_path.open("w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(records)
print(f"\n归档报告已生成:{report_path}")
def print_summary(records: list[dict]) -> None:
"""打印汇总统计"""
from collections import Counter
total = len(records)
dept_count = Counter(r["部门"] for r in records)
type_count = Counter(r["类型"] for r in records)
total_size = sum(r["大小(KB)"] for r in records)
print(f"\n{'='*50}")
print(f"文件总数:{total} 总大小:{total_size:.1f} KB")
print("按部门:", dict(dept_count))
print("按类型:", dict(type_count))
def main():
parser = argparse.ArgumentParser(description="企业文档归档系统")
parser.add_argument("--source", required=True, help="源目录(待归档文件所在目录)")
parser.add_argument("--dest", required=True, help="归档目标目录")
parser.add_argument("--execute", action="store_true", help="执行归档(默认只预览)")
args = parser.parse_args()
source_dir = Path(args.source)
dest_dir = Path(args.dest)
if not source_dir.is_dir():
print(f"错误:源目录不存在:{source_dir}")
return
mode = "执行归档" if args.execute else "预览模式(不实际移动文件)"
print(f"{mode}\n源:{source_dir}\n目标:{dest_dir}\n")
records = archive(source_dir, dest_dir, execute=args.execute)
print_summary(records)
# 生成报告
report_name = f"archive_report_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
report_path = dest_dir / report_name if args.execute else Path(".") / report_name
if records:
generate_report(records, report_path)
if __name__ == "__main__":
main()
**本章小结:**pathlib 是现代 Python 文件操作的核心,/ 操作符让路径拼接跨平台且清晰。批量操作务必先 dry_run 预览。watchdog 实现实时响应。zipfile 处理压缩归档。这套工具组合足以应对企业级文档管理的绝大多数需求。
上一章
下一章
第6章:Python 操作 Excel