Chapter 5

File System Automation — Batch File Processing Made Simple

Chapter 5: File System Automation — Batch File Processing Made Simple

File operations are the most frequent and practical domain in Python automation. Organizing your Downloads folder, batch-renaming photos, archiving reports by date, watching directories for new files — these time-consuming repetitive tasks can be fully automated in a few dozen lines of Python. This chapter starts with pathlib, Python's modern file handling library, then progresses through batch operations, content processing, directory monitoring, and compression, finishing with a complete enterprise document archiving system.

Deep Dive: pathlib — The Modern Way to Handle Paths

Python 3.4 introduced pathlib, providing an object-oriented interface for file system paths. Compared to the old os.path approach, pathlib is cleaner, more readable, and the recommended choice for all new Python code.

Path Object: Key Attributes and Methods

pathlib core reference

from pathlib import Path

p = Path("/home/user/documents/report_2024.pdf")

# ---- Path attributes ----
print(p.name)        # "report_2024.pdf"   full filename including extension
print(p.stem)        # "report_2024"       filename without extension
print(p.suffix)      # ".pdf"              extension (with dot)
print(p.suffixes)    # [".pdf"]            all extensions (e.g. [".tar", ".gz"])
print(p.parent)      # /home/user/documents
print(p.parents[1])  # /home/user          two levels up
print(p.parts)       # ('/', 'home', 'user', 'documents', 'report_2024.pdf')

# ---- Existence checks ----
print(p.exists())    # True / False
print(p.is_file())   # True
print(p.is_dir())    # False

# ---- File metadata ----
stat = p.stat()
print(stat.st_size)   # size in bytes
print(stat.st_mtime)  # last modified (Unix timestamp)

import datetime
mtime = datetime.datetime.fromtimestamp(stat.st_mtime)
print(mtime.strftime("%Y-%m-%d %H:%M"))

# ---- Path construction ----
# The / operator is cross-platform safe and highly readable
base = Path("/home/user")
reports_dir = base / "documents" / "reports"

# Equivalent to os.path.join but cleaner
import os
old_way = os.path.join("/home/user", "documents", "reports")

Why the / operator matters: On Windows, pathlib automatically uses backslashes; on Linux/macOS it uses forward slashes. Your code runs correctly on all platforms without any changes — a key advantage over string-based os.path.join.

glob / rglob patterns

from pathlib import Path

folder = Path("/home/user/documents")

# glob: search current directory only
pdfs = list(folder.glob("*.pdf"))
reports = list(folder.glob("report_*.xlsx"))
all_files = [f for f in folder.glob("*") if f.is_file()]

# rglob: recursive search through all subdirectories
all_py = list(folder.rglob("*.py"))
temp_files = list(folder.rglob("temp_*"))

# Multiple extensions (Pythonic approach)
image_exts = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
all_images = [f for f in folder.rglob("*") if f.suffix.lower() in image_exts]

# Sort by modification time (newest first)
sorted_files = sorted(folder.glob("*.xlsx"), key=lambda f: f.stat().st_mtime, reverse=True)

# Filter by size (files larger than 1MB)
large_files = [f for f in folder.rglob("*") if f.is_file() and f.stat().st_size > 1_048_576]

Basic Path Operations

Create, delete, move files and directories

from pathlib import Path
import shutil

p = Path("/tmp/test_dir")

# Create directory (parents=True creates intermediate dirs, exist_ok=True suppresses error if exists)
p.mkdir(parents=True, exist_ok=True)

# Write and read files
file = p / "hello.txt"
file.write_text("Hello, pathlib!", encoding="utf-8")
content = file.read_text(encoding="utf-8")

# Rename within same filesystem (atomic operation)
new_path = p / "hello_renamed.txt"
file.rename(new_path)

# Cross-filesystem move
shutil.move(str(new_path), "/home/user/documents/")

# Copy with metadata preserved
shutil.copy2(str(new_path), "/backup/hello_renamed.txt")

# Delete file (missing_ok=True: no error if already gone)
new_path.unlink(missing_ok=True)

# Delete empty directory
p.rmdir()

# Delete non-empty directory (use with caution!)
shutil.rmtree(str(p))

Batch File Operations

Batch Renaming

Three common batch rename patterns

from pathlib import Path
import datetime

folder = Path("/home/user/photos")

# Pattern 1: Add date prefix  photo.jpg -> 2024-03-15_photo.jpg
today = datetime.date.today().strftime("%Y-%m-%d")
for f in folder.glob("*.jpg"):
    f.rename(f.parent / f"{today}_{f.name}")

# Pattern 2: Sequential numbering  -> 001.jpg, 002.jpg, ...
files = sorted(folder.glob("*.jpg"))
for i, f in enumerate(files, start=1):
    f.rename(f.parent / f"{i:03d}{f.suffix}")

# Pattern 3: Keyword substitution
for f in folder.rglob("*"):
    if f.is_file():
        new_stem = f.stem.replace(" ", "_").replace("(draft)", "_draft")
        if new_stem != f.stem:
            f.rename(f.parent / (new_stem + f.suffix))

# Best practice: preview before executing
def preview_rename(folder: Path, pattern: str, transform_fn):
    for f in folder.glob(pattern):
        if f.is_file():
            print(f"  {f.name}  ->  {transform_fn(f)}")

preview_rename(folder, "*.jpg", lambda f: f"{today}_{f.name}")

Production best practice: preview first, execute second. Batch rename is irreversible without a backup. Always print an "old -> new" preview list first, verify it looks correct, then uncomment the actual rename call.

Archive Files by Type / Date / Size

Auto-categorize files into subdirectories by type

from pathlib import Path
import shutil

CATEGORY_MAP = {
    "Images":    {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic"},
    "Documents": {".pdf", ".docx", ".doc", ".txt", ".md"},
    "Sheets":    {".xlsx", ".xls", ".csv"},
    "Videos":    {".mp4", ".mov", ".avi", ".mkv"},
    "Audio":     {".mp3", ".wav", ".flac", ".aac"},
    "Archives":  {".zip", ".tar", ".gz", ".rar", ".7z"},
}

def categorize_by_type(source_dir: Path, dry_run: bool = True) -> dict:
    ext_to_category = {ext: cat for cat, exts in CATEGORY_MAP.items() for ext in exts}
    stats = {}

    for f in source_dir.iterdir():
        if not f.is_file():
            continue
        category = ext_to_category.get(f.suffix.lower(), "Other")
        dest_dir = source_dir / category
        dest = dest_dir / f.name
        stats.setdefault(category, []).append(f.name)

        if not dry_run:
            dest_dir.mkdir(exist_ok=True)
            if dest.exists():
                counter = 1
                while dest.exists():
                    dest = dest_dir / f"{f.stem}_{counter}{f.suffix}"
                    counter += 1
            shutil.move(str(f), str(dest))
        else:
            print(f"  [preview] {f.name}  ->  {category}/{dest.name}")

    return stats

File Content Processing

Batch Search and Replace

Find and replace text across multiple files

from pathlib import Path
import re

def batch_replace(folder: Path, pattern: str, replacement: str,
                  glob: str = "*.txt", encoding: str = "utf-8",
                  dry_run: bool = True) -> int:
    """
    Replace regex pattern with replacement in all files matching glob.
    Returns count of modified files.
    """
    modified_count = 0
    regex = re.compile(pattern)

    for f in folder.rglob(glob):
        if not f.is_file():
            continue
        try:
            original = f.read_text(encoding=encoding)
        except UnicodeDecodeError:
            print(f"  [skip] encoding error: {f}")
            continue

        new_content = regex.sub(replacement, original)
        if new_content != original:
            modified_count += 1
            count = len(regex.findall(original))
            if dry_run:
                print(f"  [preview] {f.relative_to(folder)}  ({count} replacements)")
            else:
                f.with_suffix(f.suffix + ".bak").write_text(original, encoding=encoding)
                f.write_text(new_content, encoding=encoding)
                print(f"  [done] {f.relative_to(folder)}  ({count} replacements)")

    return modified_count

Encoding Detection and Bulk Conversion

Detect encoding with chardet and convert to UTF-8

from pathlib import Path
import chardet  # pip install chardet

def detect_encoding(filepath: Path) -> tuple[str, float]:
    raw = filepath.read_bytes()
    result = chardet.detect(raw)
    return result.get("encoding") or "utf-8", result.get("confidence", 0)

def convert_to_utf8(folder: Path, glob: str = "*.txt", dry_run: bool = True) -> None:
    for f in folder.rglob(glob):
        if not f.is_file():
            continue
        encoding, confidence = detect_encoding(f)
        if encoding.lower().replace("-", "") in ("utf8", "utf8bom", "ascii"):
            continue
        print(f"  {f.name}  detected: {encoding} ({confidence:.0%})")
        if not dry_run:
            try:
                text = f.read_text(encoding=encoding, errors="replace")
                f.with_suffix(f.suffix + ".orig").write_bytes(f.read_bytes())
                f.write_text(text, encoding="utf-8")
                print(f"    -> converted to UTF-8")
            except Exception as e:
                print(f"    -> failed: {e}")

File Hashing and Deduplication

Find duplicate files by MD5 hash

import hashlib
from pathlib import Path
from collections import defaultdict

def file_md5(filepath: Path, chunk_size: int = 65536) -> str:
    h = hashlib.md5()
    with filepath.open("rb") as f:
        while chunk := f.read(chunk_size):
            h.update(chunk)
    return h.hexdigest()

def find_duplicates(folder: Path) -> dict[str, list[Path]]:
    hash_map: dict[str, list[Path]] = defaultdict(list)
    all_files = [f for f in folder.rglob("*") if f.is_file()]
    for f in all_files:
        try:
            hash_map[file_md5(f)].append(f)
        except (PermissionError, OSError):
            pass
    return {h: paths for h, paths in hash_map.items() if len(paths) > 1}

Directory Monitoring with watchdog

watchdog is a cross-platform library that triggers callbacks when files are created, modified, deleted, or moved.

Basic watchdog usage

import time
from watchdog.observers import Observer          # pip install watchdog
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:
            print(f"[new]     {event.src_path}")

    def on_modified(self, event):
        if not event.is_directory:
            print(f"[changed] {event.src_path}")

    def on_deleted(self, event):
        print(f"[deleted] {event.src_path}")

    def on_moved(self, event):
        print(f"[moved]   {event.src_path}  ->  {event.dest_path}")

def start_monitor(watch_dir: str) -> None:
    handler = MyHandler()
    observer = Observer()
    observer.schedule(handler, watch_dir, recursive=True)
    observer.start()
    print(f"Monitoring: {watch_dir}  (Ctrl+C to stop)")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

Compression and Archiving

ZIP operations with zipfile

import zipfile
from pathlib import Path

def zip_folder(source_dir: Path, output_zip: Path) -> None:
    with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for f in source_dir.rglob("*"):
            if f.is_file():
                zf.write(f, arcname=f.relative_to(source_dir.parent))

def extract_zip(zip_path: Path, dest_dir: Path) -> None:
    dest_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as zf:
        zf.extractall(dest_dir)

Auto-backup: keep last N copies, delete old ones

import zipfile
import datetime
from pathlib import Path

SOURCE_DIR = Path("/home/user/important_data")
BACKUP_DIR = Path("/home/user/backups")
KEEP_LAST_N = 5

def create_backup(source: Path, backup_dir: Path) -> Path:
    backup_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = backup_dir / f"{source.name}_{timestamp}.zip"
    with zipfile.ZipFile(backup_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for f in source.rglob("*"):
            if f.is_file():
                zf.write(f, arcname=f.relative_to(source.parent))
    print(f"Backup created: {backup_path.name}  ({backup_path.stat().st_size/1024/1024:.2f} MB)")
    return backup_path

def cleanup_old_backups(backup_dir: Path, prefix: str, keep_n: int) -> None:
    backups = sorted(backup_dir.glob(f"{prefix}_*.zip"))
    for old in backups[:-keep_n]:
        old.unlink()
        print(f"Deleted old backup: {old.name}")

def main():
    create_backup(SOURCE_DIR, BACKUP_DIR)
    cleanup_old_backups(BACKUP_DIR, SOURCE_DIR.name, KEEP_LAST_N)

if __name__ == "__main__":
    main()

Project: Enterprise Document Archiving System

This project integrates all techniques from the chapter into a production-ready document archiving system: auto-categorize by year/month and department, generate an indexed CSV report.

enterprise_archiver.py — complete, runnable project

"""
enterprise_archiver.py
Usage: python enterprise_archiver.py --source /share/incoming --dest /archive --execute
Without --execute, runs in preview mode only.
"""
import argparse
import csv
import datetime
import shutil
from pathlib import Path
from collections import Counter

DEPT_PREFIX_MAP = {
    "HR_":    "HR",
    "FIN_":   "Finance",
    "IT_":    "IT",
    "MKT_":   "Marketing",
    "SALES_": "Sales",
    "LEGAL_": "Legal",
}

CATEGORY_MAP = {
    "Docs":     {".pdf", ".docx", ".doc", ".txt", ".pptx"},
    "Sheets":   {".xlsx", ".xls", ".csv"},
    "Images":   {".jpg", ".jpeg", ".png", ".gif"},
    "Archives": {".zip", ".rar", ".7z", ".tar", ".gz"},
}

def get_dept(filename: str) -> str:
    for prefix, dept in DEPT_PREFIX_MAP.items():
        if filename.upper().startswith(prefix.upper()):
            return dept
    return "General"

def get_category(suffix: str) -> str:
    for cat, exts in CATEGORY_MAP.items():
        if suffix.lower() in exts:
            return cat
    return "Other"

def safe_dest(dest: Path) -> Path:
    if not dest.exists():
        return dest
    counter = 1
    while True:
        candidate = dest.parent / f"{dest.stem}_{counter}{dest.suffix}"
        if not candidate.exists():
            return candidate
        counter += 1

def archive(source_dir: Path, dest_dir: Path, execute: bool) -> list[dict]:
    records = []
    for f in sorted(source_dir.rglob("*")):
        if not f.is_file():
            continue
        mtime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
        year_month = f"{mtime.year}/{mtime.month:02d}"
        dept = get_dept(f.name)
        category = get_category(f.suffix)
        dest_path = dest_dir / year_month / dept / category
        final = safe_dest(dest_path / f.name)

        record = {
            "source": str(f),
            "dest": str(final.relative_to(dest_dir)),
            "dept": dept,
            "type": category,
            "year_month": year_month,
            "size_kb": round(f.stat().st_size / 1024, 1),
            "modified": mtime.strftime("%Y-%m-%d %H:%M"),
            "status": "pending",
        }
        if execute:
            dest_path.mkdir(parents=True, exist_ok=True)
            shutil.copy2(str(f), str(final))
            record["status"] = "archived"
            print(f"  [archived] {f.name}  ->  {final.relative_to(dest_dir)}")
        else:
            print(f"  [preview]  {f.name}  ->  {final.relative_to(dest_dir)}")

        records.append(record)
    return records

def generate_report(records: list[dict], report_path: Path) -> None:
    if not records:
        return
    with report_path.open("w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=list(records[0].keys()))
        writer.writeheader()
        writer.writerows(records)
    print(f"\nReport written: {report_path}")

def main():
    parser = argparse.ArgumentParser(description="Enterprise Document Archiver")
    parser.add_argument("--source",  required=True)
    parser.add_argument("--dest",    required=True)
    parser.add_argument("--execute", action="store_true")
    args = parser.parse_args()

    source_dir = Path(args.source)
    dest_dir   = Path(args.dest)

    if not source_dir.is_dir():
        print(f"Error: source not found: {source_dir}")
        return

    mode = "EXECUTE" if args.execute else "PREVIEW (no files moved)"
    print(f"Mode: {mode}\nSource: {source_dir}\nDest: {dest_dir}\n")

    records = archive(source_dir, dest_dir, execute=args.execute)

    # Summary
    total_kb = sum(r["size_kb"] for r in records)
    dept_counts = Counter(r["dept"] for r in records)
    print(f"\nTotal: {len(records)} files  ({total_kb:.1f} KB)")
    for dept, count in dept_counts.most_common():
        print(f"  {dept}: {count}")

    # Report
    ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    report_path = (dest_dir if args.execute else Path(".")) / f"archive_report_{ts}.csv"
    generate_report(records, report_path)

if __name__ == "__main__":
    main()

Chapter summary: pathlib is the modern standard for Python file operations — the / operator makes path construction cross-platform and readable. Always use dry_run/preview before executing destructive batch operations. watchdog enables real-time reactive file processing. zipfile handles compression and backup rotation. Together these tools cover the vast majority of enterprise file automation needs.

Previous

Next
Chapter 6: Python + Excel
Rate this chapter
4.9  / 5  (57 ratings)

💬 Comments