#!/usr/bin/env python3 """Screenshot-rename pipeline. Three subcommands: prep — extract frames, resize, build manifest, split into batches plan — aggregate desc-*.tsv files, validate, write rename plan execute — apply the plan with safety checks The Haiku-subagent dispatch step happens between `prep` and `plan` and is performed by Claude Code in-session, not by this script. """ import argparse import os import re import shutil import subprocess import sys from pathlib import Path WORK = Path("/tmp/screenshot-rename") FRAMES = WORK / "frames" SMALL = WORK / "small" def run(cmd, **kw): return subprocess.run(cmd, capture_output=True, text=True, **kw) def title_case(s: str) -> str: return " ".join(w.capitalize() for w in s.split()) # ---------- prep ---------- def prep(src: Path, batch_size: int, prefix: str) -> None: if not src.is_dir(): sys.exit(f"source not a directory: {src}") WORK.mkdir(parents=True, exist_ok=True) FRAMES.mkdir(exist_ok=True) SMALL.mkdir(exist_ok=True) pattern = re.compile(rf"^{re.escape(prefix)}\s+\d{{4}}-\d{{2}}-\d{{2}}.*$") files = sorted(p for p in src.iterdir() if p.is_file() and pattern.match(p.name)) if not files: sys.exit(f"no matching files (prefix='{prefix}') in {src}") print(f"found {len(files)} source files") manifest = WORK / "all.tsv" with manifest.open("w") as out: for f in files: base = f.stem ext = f.suffix.lower() if ext in (".mp4", ".mov"): frame = FRAMES / f"{base}.jpg" if not frame.exists(): r = run(["ffmpeg", "-y", "-ss", "1", "-i", str(f), "-frames:v", "1", "-q:v", "3", str(frame)]) if not frame.exists(): print(f"WARN ffmpeg failed: {f.name}", file=sys.stderr) continue vision_src = frame elif ext == ".pdf": frame = FRAMES / f"{base}.jpg" if not frame.exists(): run(["sips", "-s", "format", "jpeg", str(f), "--out", str(frame)]) if not frame.exists(): print(f"WARN sips failed on pdf: {f.name}", file=sys.stderr) continue vision_src = frame elif ext in (".png", ".gif", ".jpg", ".jpeg", ".webp"): vision_src = f else: print(f"SKIP unknown ext: {f.name}", file=sys.stderr) continue small = SMALL / f"{base}.jpg" if not small.exists(): run(["sips", "-Z", "1568", "-s", "format", "jpeg", str(vision_src), "--out", str(small)]) if not small.exists(): print(f"WARN resize failed: {f.name}", file=sys.stderr) continue out.write(f"{small}\t{f.name}\n") # split into batches for old in WORK.glob("full-batch-*"): old.unlink() lines = manifest.read_text().splitlines() n_batches = max(1, (len(lines) + batch_size - 1) // batch_size) for i in range(n_batches): chunk = lines[i * batch_size:(i + 1) * batch_size] (WORK / f"full-batch-{i+1:02d}").write_text("\n".join(chunk) + "\n") print(f"prepped {len(lines)} files into {n_batches} batches in {WORK}") print(f"\nDispatch {n_batches} Haiku subagents (one per batch).") print(f"After all desc-full-NN.tsv files exist, run: pipeline.py plan --src '{src}'") # ---------- plan ---------- def plan(src: Path, prefix: str, max_words: int) -> None: if not src.is_dir(): sys.exit(f"source not a directory: {src}") descs = sorted(WORK.glob("desc-full-*.tsv")) if not descs: sys.exit("no desc-full-*.tsv files found in /tmp/screenshot-rename") all_lines = [] for p in descs: all_lines.extend(p.read_text().splitlines()) print(f"aggregated {len(all_lines)} description lines from {len(descs)} batches") existing = set(os.listdir(src)) plan_rows = [] errors = [] seen = {} for lineno, line in enumerate(all_lines, 1): line = line.rstrip() if not line: continue parts = line.split("\t", 1) if len(parts) != 2: errors.append(f"L{lineno}: bad split: {line!r}") continue orig_claimed, desc = parts if not orig_claimed.startswith(prefix + " "): errors.append(f"L{lineno}: prefix: {orig_claimed!r}") continue # Find the actual file — Haiku occasionally returns .jpg instead of .png orig = orig_claimed if orig not in existing: base = os.path.splitext(orig_claimed)[0] for ext in (".png", ".gif", ".mp4", ".pdf", ".jpg", ".jpeg", ".webp"): cand = base + ext if cand in existing: orig = cand break else: errors.append(f"L{lineno}: source not found: {orig_claimed!r}") continue words = desc.split() if len(words) < 6: errors.append(f"L{lineno}: <6 words: {orig!r} -> {desc!r}") continue words = words[:max_words] cleaned = [] for w in words: cw = "".join(c for c in w if c.isalnum()) if cw: cleaned.append(cw) if len(cleaned) < 6: errors.append(f"L{lineno}: <6 after sanitize: {desc!r}") continue cleaned = cleaned[:max_words] titled = title_case(" ".join(cleaned)) rest = orig[len(prefix) + 1:] # everything after "Prefix " new = f"{prefix} - {titled} - {rest}" if new == orig: errors.append(f"L{lineno}: same: {orig!r}") continue if new in existing: errors.append(f"L{lineno}: target exists in DEST: {new!r}") continue if new in seen: errors.append(f"L{lineno}: plan collision: {new!r} from {orig!r} and {seen[new]!r}") continue seen[new] = orig plan_rows.append((orig, new)) print(f"plan: {len(plan_rows)} renames, {len(errors)} errors") if errors: print("\nERRORS:") for e in errors[:30]: print(f" {e}") if len(errors) > 30: print(f" ... and {len(errors) - 30} more") plan_path = WORK / "plan-full.tsv" with plan_path.open("w") as f: for orig, new in plan_rows: f.write(f"{orig}\t{new}\n") print(f"\nplan saved: {plan_path}") print(f"sample (every {max(1, len(plan_rows)//6)}th row):") step = max(1, len(plan_rows) // 6) for i in range(0, len(plan_rows), step): orig, new = plan_rows[i] print(f" {orig}\n → {new}\n") print(f"if plan looks good: pipeline.py execute --src '{src}'") # ---------- execute ---------- def execute(src: Path) -> None: if not src.is_dir(): sys.exit(f"source not a directory: {src}") plan_path = WORK / "plan-full.tsv" if not plan_path.exists(): sys.exit(f"no plan: {plan_path} (run `pipeline.py plan` first)") before = len(os.listdir(src)) ok = 0 fail = 0 fails = [] with plan_path.open() as f: for line in f: line = line.rstrip() if not line: continue orig, new = line.split("\t", 1) srcp = src / orig dstp = src / new if not srcp.exists(): fails.append(f"src missing: {orig}") fail += 1 continue if dstp.exists(): fails.append(f"target exists: {new}") fail += 1 continue try: os.rename(srcp, dstp) if dstp.exists() and not srcp.exists(): ok += 1 else: fails.append(f"post-check failed: {orig}") fail += 1 except OSError as e: fails.append(f"rename error {orig}: {e}") fail += 1 after = len(os.listdir(src)) print(f"ok={ok} fail={fail} before={before} after={after}") if before != after: print("⚠ FILE COUNT CHANGED — investigate immediately") sys.exit(2) print("file count unchanged ✓") if fails: fails_path = WORK / "rename-fails.txt" fails_path.write_text("\n".join(fails)) print(f"failures logged: {fails_path}") for x in fails[:5]: print(f" {x}") # ---------- main ---------- def main() -> None: p = argparse.ArgumentParser(description=__doc__) sub = p.add_subparsers(dest="cmd", required=True) p_prep = sub.add_parser("prep", help="extract frames, resize, build batches") p_prep.add_argument("--src", type=Path, required=True) p_prep.add_argument("--batch-size", type=int, default=19) p_prep.add_argument("--prefix", default="CleanShot", help="filename prefix to match (default CleanShot)") p_plan = sub.add_parser("plan", help="build & validate rename plan") p_plan.add_argument("--src", type=Path, required=True) p_plan.add_argument("--prefix", default="CleanShot") p_plan.add_argument("--max-words", type=int, default=8) p_exec = sub.add_parser("execute", help="apply rename plan with safety checks") p_exec.add_argument("--src", type=Path, required=True) args = p.parse_args() if args.cmd == "prep": prep(args.src, args.batch_size, args.prefix) elif args.cmd == "plan": plan(args.src, args.prefix, args.max_words) elif args.cmd == "execute": execute(args.src) if __name__ == "__main__": main()