Files
screenshot-rename/pipeline.py
T
Anthony Cardinale 63edc33fc4 Initial commit: skill files, docs site, README
- SKILL.md and pipeline.py from ~/.claude/skills/screenshot-rename/
- docs/index.html — archival/typewriter aesthetic homepage with hero
  monument, problem, 4-stage pipeline, before/after split, run-log
  receipt, ten gotchas, four use cases, install snippets
- MIT license

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 09:23:02 -04:00

281 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""Screenshot-rename pipeline.
Three subcommands:
prep — extract frames, resize, build manifest, split into batches
plan — aggregate desc-*.tsv files, validate, write rename plan
execute — apply the plan with safety checks
The Haiku-subagent dispatch step happens between `prep` and `plan` and is
performed by Claude Code in-session, not by this script.
"""
import argparse
import os
import re
import shutil
import subprocess
import sys
from pathlib import Path
WORK = Path("/tmp/screenshot-rename")
FRAMES = WORK / "frames"
SMALL = WORK / "small"
def run(cmd, **kw):
return subprocess.run(cmd, capture_output=True, text=True, **kw)
def title_case(s: str) -> str:
return " ".join(w.capitalize() for w in s.split())
# ---------- prep ----------
def prep(src: Path, batch_size: int, prefix: str) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
WORK.mkdir(parents=True, exist_ok=True)
FRAMES.mkdir(exist_ok=True)
SMALL.mkdir(exist_ok=True)
pattern = re.compile(rf"^{re.escape(prefix)}\s+\d{{4}}-\d{{2}}-\d{{2}}.*$")
files = sorted(p for p in src.iterdir() if p.is_file() and pattern.match(p.name))
if not files:
sys.exit(f"no matching files (prefix='{prefix}') in {src}")
print(f"found {len(files)} source files")
manifest = WORK / "all.tsv"
with manifest.open("w") as out:
for f in files:
base = f.stem
ext = f.suffix.lower()
if ext in (".mp4", ".mov"):
frame = FRAMES / f"{base}.jpg"
if not frame.exists():
r = run(["ffmpeg", "-y", "-ss", "1", "-i", str(f),
"-frames:v", "1", "-q:v", "3", str(frame)])
if not frame.exists():
print(f"WARN ffmpeg failed: {f.name}", file=sys.stderr)
continue
vision_src = frame
elif ext == ".pdf":
frame = FRAMES / f"{base}.jpg"
if not frame.exists():
run(["sips", "-s", "format", "jpeg", str(f), "--out", str(frame)])
if not frame.exists():
print(f"WARN sips failed on pdf: {f.name}", file=sys.stderr)
continue
vision_src = frame
elif ext in (".png", ".gif", ".jpg", ".jpeg", ".webp"):
vision_src = f
else:
print(f"SKIP unknown ext: {f.name}", file=sys.stderr)
continue
small = SMALL / f"{base}.jpg"
if not small.exists():
run(["sips", "-Z", "1568", "-s", "format", "jpeg",
str(vision_src), "--out", str(small)])
if not small.exists():
print(f"WARN resize failed: {f.name}", file=sys.stderr)
continue
out.write(f"{small}\t{f.name}\n")
# split into batches
for old in WORK.glob("full-batch-*"):
old.unlink()
lines = manifest.read_text().splitlines()
n_batches = max(1, (len(lines) + batch_size - 1) // batch_size)
for i in range(n_batches):
chunk = lines[i * batch_size:(i + 1) * batch_size]
(WORK / f"full-batch-{i+1:02d}").write_text("\n".join(chunk) + "\n")
print(f"prepped {len(lines)} files into {n_batches} batches in {WORK}")
print(f"\nDispatch {n_batches} Haiku subagents (one per batch).")
print(f"After all desc-full-NN.tsv files exist, run: pipeline.py plan --src '{src}'")
# ---------- plan ----------
def plan(src: Path, prefix: str, max_words: int) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
descs = sorted(WORK.glob("desc-full-*.tsv"))
if not descs:
sys.exit("no desc-full-*.tsv files found in /tmp/screenshot-rename")
all_lines = []
for p in descs:
all_lines.extend(p.read_text().splitlines())
print(f"aggregated {len(all_lines)} description lines from {len(descs)} batches")
existing = set(os.listdir(src))
plan_rows = []
errors = []
seen = {}
for lineno, line in enumerate(all_lines, 1):
line = line.rstrip()
if not line:
continue
parts = line.split("\t", 1)
if len(parts) != 2:
errors.append(f"L{lineno}: bad split: {line!r}")
continue
orig_claimed, desc = parts
if not orig_claimed.startswith(prefix + " "):
errors.append(f"L{lineno}: prefix: {orig_claimed!r}")
continue
# Find the actual file — Haiku occasionally returns .jpg instead of .png
orig = orig_claimed
if orig not in existing:
base = os.path.splitext(orig_claimed)[0]
for ext in (".png", ".gif", ".mp4", ".pdf", ".jpg", ".jpeg", ".webp"):
cand = base + ext
if cand in existing:
orig = cand
break
else:
errors.append(f"L{lineno}: source not found: {orig_claimed!r}")
continue
words = desc.split()
if len(words) < 6:
errors.append(f"L{lineno}: <6 words: {orig!r} -> {desc!r}")
continue
words = words[:max_words]
cleaned = []
for w in words:
cw = "".join(c for c in w if c.isalnum())
if cw:
cleaned.append(cw)
if len(cleaned) < 6:
errors.append(f"L{lineno}: <6 after sanitize: {desc!r}")
continue
cleaned = cleaned[:max_words]
titled = title_case(" ".join(cleaned))
rest = orig[len(prefix) + 1:] # everything after "Prefix "
new = f"{prefix} - {titled} - {rest}"
if new == orig:
errors.append(f"L{lineno}: same: {orig!r}")
continue
if new in existing:
errors.append(f"L{lineno}: target exists in DEST: {new!r}")
continue
if new in seen:
errors.append(f"L{lineno}: plan collision: {new!r} from {orig!r} and {seen[new]!r}")
continue
seen[new] = orig
plan_rows.append((orig, new))
print(f"plan: {len(plan_rows)} renames, {len(errors)} errors")
if errors:
print("\nERRORS:")
for e in errors[:30]:
print(f" {e}")
if len(errors) > 30:
print(f" ... and {len(errors) - 30} more")
plan_path = WORK / "plan-full.tsv"
with plan_path.open("w") as f:
for orig, new in plan_rows:
f.write(f"{orig}\t{new}\n")
print(f"\nplan saved: {plan_path}")
print(f"sample (every {max(1, len(plan_rows)//6)}th row):")
step = max(1, len(plan_rows) // 6)
for i in range(0, len(plan_rows), step):
orig, new = plan_rows[i]
print(f" {orig}\n{new}\n")
print(f"if plan looks good: pipeline.py execute --src '{src}'")
# ---------- execute ----------
def execute(src: Path) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
plan_path = WORK / "plan-full.tsv"
if not plan_path.exists():
sys.exit(f"no plan: {plan_path} (run `pipeline.py plan` first)")
before = len(os.listdir(src))
ok = 0
fail = 0
fails = []
with plan_path.open() as f:
for line in f:
line = line.rstrip()
if not line:
continue
orig, new = line.split("\t", 1)
srcp = src / orig
dstp = src / new
if not srcp.exists():
fails.append(f"src missing: {orig}")
fail += 1
continue
if dstp.exists():
fails.append(f"target exists: {new}")
fail += 1
continue
try:
os.rename(srcp, dstp)
if dstp.exists() and not srcp.exists():
ok += 1
else:
fails.append(f"post-check failed: {orig}")
fail += 1
except OSError as e:
fails.append(f"rename error {orig}: {e}")
fail += 1
after = len(os.listdir(src))
print(f"ok={ok} fail={fail} before={before} after={after}")
if before != after:
print("⚠ FILE COUNT CHANGED — investigate immediately")
sys.exit(2)
print("file count unchanged ✓")
if fails:
fails_path = WORK / "rename-fails.txt"
fails_path.write_text("\n".join(fails))
print(f"failures logged: {fails_path}")
for x in fails[:5]:
print(f" {x}")
# ---------- main ----------
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
sub = p.add_subparsers(dest="cmd", required=True)
p_prep = sub.add_parser("prep", help="extract frames, resize, build batches")
p_prep.add_argument("--src", type=Path, required=True)
p_prep.add_argument("--batch-size", type=int, default=19)
p_prep.add_argument("--prefix", default="CleanShot",
help="filename prefix to match (default CleanShot)")
p_plan = sub.add_parser("plan", help="build & validate rename plan")
p_plan.add_argument("--src", type=Path, required=True)
p_plan.add_argument("--prefix", default="CleanShot")
p_plan.add_argument("--max-words", type=int, default=8)
p_exec = sub.add_parser("execute", help="apply rename plan with safety checks")
p_exec.add_argument("--src", type=Path, required=True)
args = p.parse_args()
if args.cmd == "prep":
prep(args.src, args.batch_size, args.prefix)
elif args.cmd == "plan":
plan(args.src, args.prefix, args.max_words)
elif args.cmd == "execute":
execute(args.src)
if __name__ == "__main__":
main()