File System Automation — Batch File Processing Made Simple
Chapter 5: File System Automation — Batch File Processing Made Simple
File operations are the most frequent and practical domain in Python automation. Organizing your Downloads folder, batch-renaming photos, archiving reports by date, watching directories for new files — these time-consuming repetitive tasks can be fully automated in a few dozen lines of Python. This chapter starts with pathlib, Python's modern file handling library, then progresses through batch operations, content processing, directory monitoring, and compression, finishing with a complete enterprise document archiving system.
Deep Dive: pathlib — The Modern Way to Handle Paths
Python 3.4 introduced pathlib, providing an object-oriented interface for file system paths. Compared to the old os.path approach, pathlib is cleaner, more readable, and the recommended choice for all new Python code.
Path Object: Key Attributes and Methods
pathlib core reference
from pathlib import Path
p = Path("/home/user/documents/report_2024.pdf")
# ---- Path attributes ----
print(p.name) # "report_2024.pdf" full filename including extension
print(p.stem) # "report_2024" filename without extension
print(p.suffix) # ".pdf" extension (with dot)
print(p.suffixes) # [".pdf"] all extensions (e.g. [".tar", ".gz"])
print(p.parent) # /home/user/documents
print(p.parents[1]) # /home/user two levels up
print(p.parts) # ('/', 'home', 'user', 'documents', 'report_2024.pdf')
# ---- Existence checks ----
print(p.exists()) # True / False
print(p.is_file()) # True
print(p.is_dir()) # False
# ---- File metadata ----
stat = p.stat()
print(stat.st_size) # size in bytes
print(stat.st_mtime) # last modified (Unix timestamp)
import datetime
mtime = datetime.datetime.fromtimestamp(stat.st_mtime)
print(mtime.strftime("%Y-%m-%d %H:%M"))
# ---- Path construction ----
# The / operator is cross-platform safe and highly readable
base = Path("/home/user")
reports_dir = base / "documents" / "reports"
# Equivalent to os.path.join but cleaner
import os
old_way = os.path.join("/home/user", "documents", "reports")
Why the / operator matters: On Windows, pathlib automatically uses backslashes; on Linux/macOS it uses forward slashes. Your code runs correctly on all platforms without any changes — a key advantage over string-based os.path.join.
glob and rglob: Powerful File Search
glob / rglob patterns
from pathlib import Path
folder = Path("/home/user/documents")
# glob: search current directory only
pdfs = list(folder.glob("*.pdf"))
reports = list(folder.glob("report_*.xlsx"))
all_files = [f for f in folder.glob("*") if f.is_file()]
# rglob: recursive search through all subdirectories
all_py = list(folder.rglob("*.py"))
temp_files = list(folder.rglob("temp_*"))
# Multiple extensions (Pythonic approach)
image_exts = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
all_images = [f for f in folder.rglob("*") if f.suffix.lower() in image_exts]
# Sort by modification time (newest first)
sorted_files = sorted(folder.glob("*.xlsx"), key=lambda f: f.stat().st_mtime, reverse=True)
# Filter by size (files larger than 1MB)
large_files = [f for f in folder.rglob("*") if f.is_file() and f.stat().st_size > 1_048_576]
Basic Path Operations
Create, delete, move files and directories
from pathlib import Path
import shutil
p = Path("/tmp/test_dir")
# Create directory (parents=True creates intermediate dirs, exist_ok=True suppresses error if exists)
p.mkdir(parents=True, exist_ok=True)
# Write and read files
file = p / "hello.txt"
file.write_text("Hello, pathlib!", encoding="utf-8")
content = file.read_text(encoding="utf-8")
# Rename within same filesystem (atomic operation)
new_path = p / "hello_renamed.txt"
file.rename(new_path)
# Cross-filesystem move
shutil.move(str(new_path), "/home/user/documents/")
# Copy with metadata preserved
shutil.copy2(str(new_path), "/backup/hello_renamed.txt")
# Delete file (missing_ok=True: no error if already gone)
new_path.unlink(missing_ok=True)
# Delete empty directory
p.rmdir()
# Delete non-empty directory (use with caution!)
shutil.rmtree(str(p))
Batch File Operations
Batch Renaming
Three common batch rename patterns
from pathlib import Path
import datetime
folder = Path("/home/user/photos")
# Pattern 1: Add date prefix photo.jpg -> 2024-03-15_photo.jpg
today = datetime.date.today().strftime("%Y-%m-%d")
for f in folder.glob("*.jpg"):
f.rename(f.parent / f"{today}_{f.name}")
# Pattern 2: Sequential numbering -> 001.jpg, 002.jpg, ...
files = sorted(folder.glob("*.jpg"))
for i, f in enumerate(files, start=1):
f.rename(f.parent / f"{i:03d}{f.suffix}")
# Pattern 3: Keyword substitution
for f in folder.rglob("*"):
if f.is_file():
new_stem = f.stem.replace(" ", "_").replace("(draft)", "_draft")
if new_stem != f.stem:
f.rename(f.parent / (new_stem + f.suffix))
# Best practice: preview before executing
def preview_rename(folder: Path, pattern: str, transform_fn):
for f in folder.glob(pattern):
if f.is_file():
print(f" {f.name} -> {transform_fn(f)}")
preview_rename(folder, "*.jpg", lambda f: f"{today}_{f.name}")
Production best practice: preview first, execute second. Batch rename is irreversible without a backup. Always print an "old -> new" preview list first, verify it looks correct, then uncomment the actual rename call.
Archive Files by Type / Date / Size
Auto-categorize files into subdirectories by type
from pathlib import Path
import shutil
CATEGORY_MAP = {
"Images": {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic"},
"Documents": {".pdf", ".docx", ".doc", ".txt", ".md"},
"Sheets": {".xlsx", ".xls", ".csv"},
"Videos": {".mp4", ".mov", ".avi", ".mkv"},
"Audio": {".mp3", ".wav", ".flac", ".aac"},
"Archives": {".zip", ".tar", ".gz", ".rar", ".7z"},
}
def categorize_by_type(source_dir: Path, dry_run: bool = True) -> dict:
ext_to_category = {ext: cat for cat, exts in CATEGORY_MAP.items() for ext in exts}
stats = {}
for f in source_dir.iterdir():
if not f.is_file():
continue
category = ext_to_category.get(f.suffix.lower(), "Other")
dest_dir = source_dir / category
dest = dest_dir / f.name
stats.setdefault(category, []).append(f.name)
if not dry_run:
dest_dir.mkdir(exist_ok=True)
if dest.exists():
counter = 1
while dest.exists():
dest = dest_dir / f"{f.stem}_{counter}{f.suffix}"
counter += 1
shutil.move(str(f), str(dest))
else:
print(f" [preview] {f.name} -> {category}/{dest.name}")
return stats
File Content Processing
Batch Search and Replace
Find and replace text across multiple files
from pathlib import Path
import re
def batch_replace(folder: Path, pattern: str, replacement: str,
glob: str = "*.txt", encoding: str = "utf-8",
dry_run: bool = True) -> int:
"""
Replace regex pattern with replacement in all files matching glob.
Returns count of modified files.
"""
modified_count = 0
regex = re.compile(pattern)
for f in folder.rglob(glob):
if not f.is_file():
continue
try:
original = f.read_text(encoding=encoding)
except UnicodeDecodeError:
print(f" [skip] encoding error: {f}")
continue
new_content = regex.sub(replacement, original)
if new_content != original:
modified_count += 1
count = len(regex.findall(original))
if dry_run:
print(f" [preview] {f.relative_to(folder)} ({count} replacements)")
else:
f.with_suffix(f.suffix + ".bak").write_text(original, encoding=encoding)
f.write_text(new_content, encoding=encoding)
print(f" [done] {f.relative_to(folder)} ({count} replacements)")
return modified_count
Encoding Detection and Bulk Conversion
Detect encoding with chardet and convert to UTF-8
from pathlib import Path
import chardet # pip install chardet
def detect_encoding(filepath: Path) -> tuple[str, float]:
raw = filepath.read_bytes()
result = chardet.detect(raw)
return result.get("encoding") or "utf-8", result.get("confidence", 0)
def convert_to_utf8(folder: Path, glob: str = "*.txt", dry_run: bool = True) -> None:
for f in folder.rglob(glob):
if not f.is_file():
continue
encoding, confidence = detect_encoding(f)
if encoding.lower().replace("-", "") in ("utf8", "utf8bom", "ascii"):
continue
print(f" {f.name} detected: {encoding} ({confidence:.0%})")
if not dry_run:
try:
text = f.read_text(encoding=encoding, errors="replace")
f.with_suffix(f.suffix + ".orig").write_bytes(f.read_bytes())
f.write_text(text, encoding="utf-8")
print(f" -> converted to UTF-8")
except Exception as e:
print(f" -> failed: {e}")
File Hashing and Deduplication
Find duplicate files by MD5 hash
import hashlib
from pathlib import Path
from collections import defaultdict
def file_md5(filepath: Path, chunk_size: int = 65536) -> str:
h = hashlib.md5()
with filepath.open("rb") as f:
while chunk := f.read(chunk_size):
h.update(chunk)
return h.hexdigest()
def find_duplicates(folder: Path) -> dict[str, list[Path]]:
hash_map: dict[str, list[Path]] = defaultdict(list)
all_files = [f for f in folder.rglob("*") if f.is_file()]
for f in all_files:
try:
hash_map[file_md5(f)].append(f)
except (PermissionError, OSError):
pass
return {h: paths for h, paths in hash_map.items() if len(paths) > 1}
Directory Monitoring with watchdog
watchdog is a cross-platform library that triggers callbacks when files are created, modified, deleted, or moved.
Basic watchdog usage
import time
from watchdog.observers import Observer # pip install watchdog
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
print(f"[new] {event.src_path}")
def on_modified(self, event):
if not event.is_directory:
print(f"[changed] {event.src_path}")
def on_deleted(self, event):
print(f"[deleted] {event.src_path}")
def on_moved(self, event):
print(f"[moved] {event.src_path} -> {event.dest_path}")
def start_monitor(watch_dir: str) -> None:
handler = MyHandler()
observer = Observer()
observer.schedule(handler, watch_dir, recursive=True)
observer.start()
print(f"Monitoring: {watch_dir} (Ctrl+C to stop)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Compression and Archiving
ZIP operations with zipfile
import zipfile
from pathlib import Path
def zip_folder(source_dir: Path, output_zip: Path) -> None:
with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for f in source_dir.rglob("*"):
if f.is_file():
zf.write(f, arcname=f.relative_to(source_dir.parent))
def extract_zip(zip_path: Path, dest_dir: Path) -> None:
dest_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(dest_dir)
Auto-backup: keep last N copies, delete old ones
import zipfile
import datetime
from pathlib import Path
SOURCE_DIR = Path("/home/user/important_data")
BACKUP_DIR = Path("/home/user/backups")
KEEP_LAST_N = 5
def create_backup(source: Path, backup_dir: Path) -> Path:
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{source.name}_{timestamp}.zip"
with zipfile.ZipFile(backup_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for f in source.rglob("*"):
if f.is_file():
zf.write(f, arcname=f.relative_to(source.parent))
print(f"Backup created: {backup_path.name} ({backup_path.stat().st_size/1024/1024:.2f} MB)")
return backup_path
def cleanup_old_backups(backup_dir: Path, prefix: str, keep_n: int) -> None:
backups = sorted(backup_dir.glob(f"{prefix}_*.zip"))
for old in backups[:-keep_n]:
old.unlink()
print(f"Deleted old backup: {old.name}")
def main():
create_backup(SOURCE_DIR, BACKUP_DIR)
cleanup_old_backups(BACKUP_DIR, SOURCE_DIR.name, KEEP_LAST_N)
if __name__ == "__main__":
main()
Project: Enterprise Document Archiving System
This project integrates all techniques from the chapter into a production-ready document archiving system: auto-categorize by year/month and department, generate an indexed CSV report.
enterprise_archiver.py — complete, runnable project
"""
enterprise_archiver.py
Usage: python enterprise_archiver.py --source /share/incoming --dest /archive --execute
Without --execute, runs in preview mode only.
"""
import argparse
import csv
import datetime
import shutil
from pathlib import Path
from collections import Counter
DEPT_PREFIX_MAP = {
"HR_": "HR",
"FIN_": "Finance",
"IT_": "IT",
"MKT_": "Marketing",
"SALES_": "Sales",
"LEGAL_": "Legal",
}
CATEGORY_MAP = {
"Docs": {".pdf", ".docx", ".doc", ".txt", ".pptx"},
"Sheets": {".xlsx", ".xls", ".csv"},
"Images": {".jpg", ".jpeg", ".png", ".gif"},
"Archives": {".zip", ".rar", ".7z", ".tar", ".gz"},
}
def get_dept(filename: str) -> str:
for prefix, dept in DEPT_PREFIX_MAP.items():
if filename.upper().startswith(prefix.upper()):
return dept
return "General"
def get_category(suffix: str) -> str:
for cat, exts in CATEGORY_MAP.items():
if suffix.lower() in exts:
return cat
return "Other"
def safe_dest(dest: Path) -> Path:
if not dest.exists():
return dest
counter = 1
while True:
candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
if not candidate.exists():
return candidate
counter += 1
def archive(source_dir: Path, dest_dir: Path, execute: bool) -> list[dict]:
records = []
for f in sorted(source_dir.rglob("*")):
if not f.is_file():
continue
mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
year_month = f"{mtime.year}/{mtime.month:02d}"
dept = get_dept(f.name)
category = get_category(f.suffix)
dest_path = dest_dir / year_month / dept / category
final = safe_dest(dest_path / f.name)
record = {
"source": str(f),
"dest": str(final.relative_to(dest_dir)),
"dept": dept,
"type": category,
"year_month": year_month,
"size_kb": round(f.stat().st_size / 1024, 1),
"modified": mtime.strftime("%Y-%m-%d %H:%M"),
"status": "pending",
}
if execute:
dest_path.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(f), str(final))
record["status"] = "archived"
print(f" [archived] {f.name} -> {final.relative_to(dest_dir)}")
else:
print(f" [preview] {f.name} -> {final.relative_to(dest_dir)}")
records.append(record)
return records
def generate_report(records: list[dict], report_path: Path) -> None:
if not records:
return
with report_path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=list(records[0].keys()))
writer.writeheader()
writer.writerows(records)
print(f"\nReport written: {report_path}")
def main():
parser = argparse.ArgumentParser(description="Enterprise Document Archiver")
parser.add_argument("--source", required=True)
parser.add_argument("--dest", required=True)
parser.add_argument("--execute", action="store_true")
args = parser.parse_args()
source_dir = Path(args.source)
dest_dir = Path(args.dest)
if not source_dir.is_dir():
print(f"Error: source not found: {source_dir}")
return
mode = "EXECUTE" if args.execute else "PREVIEW (no files moved)"
print(f"Mode: {mode}\nSource: {source_dir}\nDest: {dest_dir}\n")
records = archive(source_dir, dest_dir, execute=args.execute)
# Summary
total_kb = sum(r["size_kb"] for r in records)
dept_counts = Counter(r["dept"] for r in records)
print(f"\nTotal: {len(records)} files ({total_kb:.1f} KB)")
for dept, count in dept_counts.most_common():
print(f" {dept}: {count}")
# Report
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = (dest_dir if args.execute else Path(".")) / f"archive_report_{ts}.csv"
generate_report(records, report_path)
if __name__ == "__main__":
main()
Chapter summary: pathlib is the modern standard for Python file operations — the / operator makes path construction cross-platform and readable. Always use dry_run/preview before executing destructive batch operations. watchdog enables real-time reactive file processing. zipfile handles compression and backup rotation. Together these tools cover the vast majority of enterprise file automation needs.
Previous
Next
Chapter 6: Python + Excel