Files
screenshot-rename/pipeline.py
T
Anthony Cardinale 3a9997e990 sync: dual-prefix support, U+202F handling, keyword preservation
Brings repo HEAD up to current live skill state in ~/.claude/skills/screenshot-rename/.
- recognize CleanShot AND Apple Screenshot filenames in one pass
- normalize U+202F (NARROW NO-BREAK SPACE) before AM/PM in Apple Screenshot names
- preserve user-typed keyword prefix and merge into description
- skip files already in renamed form (idempotent re-run)
- gotchas #11-13 added to SKILL.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 11:09:54 -04:00

372 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Screenshot-rename pipeline.
Three subcommands:
prep — extract frames, resize, build manifest, split into batches
plan — aggregate desc-*.tsv files, validate, write rename plan
execute — apply the plan with safety checks
The Haiku-subagent dispatch step happens between `prep` and `plan` and is
performed by Claude Code in-session, not by this script.
Recognizes both `CleanShot ...` and Apple `Screenshot ...` filenames in one
pass, preserves any leading user-typed keyword prefix, and skips files that
are already in the renamed `App - Description - timestamp.ext` form.
"""
import argparse
import os
import re
import subprocess
import sys
from pathlib import Path
WORK = Path("/tmp/screenshot-rename")
FRAMES = WORK / "frames"
SMALL = WORK / "small"
# Apple's Screenshot tool inserts U+202F (narrow no-break space) before AM/PM.
# Haiku normalizes it to ASCII space when echoing the filename, so desc-dict
# lookups fail silently. Normalize on both sides AND emit ASCII space.
NNBSP = ""
def norm_ws(s: str) -> str:
return s.replace(NNBSP, " ")
# Filename parser. Captures:
# keywords — optional leading user-typed prefix (e.g. "jojo travel flight")
# app — CleanShot | Screenshot
# ts — "2026-MM-DD at HH.MM.SS" optionally followed by " AM" or " PM"
# dup — optional "(2)" or " 2" duplicate marker
# ext — file extension
#
# Run norm_ws() on the filename BEFORE matching so U+202F doesn't break the
# meridiem branch.
APP_PATTERN = re.compile(
r"^(?:(?P<keywords>.+?)\s+)?"
r"(?P<app>CleanShot|Screenshot)\s+"
r"(?P<ts>\d{4}-\d{2}-\d{2}\s+at\s+\d{1,2}\.\d{2}\.\d{2}(?:\s*[AP]M)?)"
r"(?P<dup>\(\d+\)|\s+\d+)?"
r"\.(?P<ext>[^.]+)$"
)
# Already-renamed: "App - <description> - <timestamp>(<dup>)?.<ext>"
ALREADY_RENAMED = re.compile(
r"^(?:CleanShot|Screenshot)\s+-\s+.+?\s+-\s+"
r"\d{4}-\d{2}-\d{2}\s+at\s+\d{1,2}\.\d{2}\.\d{2}(?:\s*[AP]M)?"
r"(?:\(\d+\))?\.[^.]+$"
)
def title_case(s: str) -> str:
s = re.sub(r"\s+", " ", s.strip())
return " ".join(w[:1].upper() + w[1:] if w else w for w in s.split(" "))
def parse_filename(name: str):
"""Return parts dict, or None if the file is not a rename target.
None means: already renamed, or doesn't look like a screenshot. Caller
should skip.
"""
n = norm_ws(name)
if ALREADY_RENAMED.match(n):
return None
m = APP_PATTERN.match(n)
if not m:
return None
return {
"keywords": (m.group("keywords") or "").strip(),
"app": m.group("app"),
"ts": m.group("ts"),
"dup": m.group("dup") or "",
"ext": m.group("ext"),
}
def build_new_name(parts: dict, ai_desc: str, max_words: int) -> str:
words = ai_desc.split()[:max_words]
cleaned = []
for w in words:
cw = "".join(c for c in w if c.isalnum())
if cw:
cleaned.append(cw)
if len(cleaned) < 6:
raise ValueError(f"<6 words after sanitize: {ai_desc!r}")
titled = title_case(" ".join(cleaned[:max_words]))
pieces = []
if parts["keywords"]:
pieces.append(title_case(parts["keywords"]))
pieces.append(titled)
full_desc = " ".join(pieces)
dup = parts["dup"]
if dup and not dup.startswith("("):
dup = "(" + dup.strip() + ")"
return f'{parts["app"]} - {full_desc} - {parts["ts"]}{dup}.{parts["ext"]}'
def run(cmd, **kw):
return subprocess.run(cmd, capture_output=True, text=True, **kw)
# ---------- prep ----------
def prep(src: Path, batch_size: int) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
WORK.mkdir(parents=True, exist_ok=True)
FRAMES.mkdir(exist_ok=True)
SMALL.mkdir(exist_ok=True)
eligible = []
skipped_already = 0
skipped_other = 0
for p in sorted(src.iterdir()):
if not p.is_file():
continue
parts = parse_filename(p.name)
if parts is None:
n = norm_ws(p.name)
if ALREADY_RENAMED.match(n):
skipped_already += 1
else:
skipped_other += 1
continue
eligible.append(p)
if not eligible:
sys.exit(
f"no eligible files in {src} "
f"(skipped: {skipped_already} already-renamed, {skipped_other} other)"
)
print(
f"found {len(eligible)} eligible files "
f"(skipped: {skipped_already} already-renamed, {skipped_other} other)"
)
manifest = WORK / "all.tsv"
with manifest.open("w") as out:
for f in eligible:
base = f.stem
ext = f.suffix.lower()
if ext in (".mp4", ".mov"):
frame = FRAMES / f"{base}.jpg"
if not frame.exists():
run(
[
"ffmpeg", "-y", "-ss", "1", "-i", str(f),
"-frames:v", "1", "-q:v", "3", str(frame),
]
)
if not frame.exists():
print(f"WARN ffmpeg failed: {f.name}", file=sys.stderr)
continue
vision_src = frame
elif ext == ".pdf":
frame = FRAMES / f"{base}.jpg"
if not frame.exists():
run(["sips", "-s", "format", "jpeg", str(f), "--out", str(frame)])
if not frame.exists():
print(f"WARN sips failed on pdf: {f.name}", file=sys.stderr)
continue
vision_src = frame
elif ext in (".png", ".gif", ".jpg", ".jpeg", ".webp"):
vision_src = f
else:
print(f"SKIP unknown ext: {f.name}", file=sys.stderr)
continue
small = SMALL / f"{base}.jpg"
if not small.exists():
run(
[
"sips", "-Z", "1568", "-s", "format", "jpeg",
str(vision_src), "--out", str(small),
]
)
if not small.exists():
print(f"WARN resize failed: {f.name}", file=sys.stderr)
continue
out.write(f"{small}\t{f.name}\n")
for old in WORK.glob("full-batch-*"):
old.unlink()
lines = manifest.read_text().splitlines()
n_batches = max(1, (len(lines) + batch_size - 1) // batch_size)
for i in range(n_batches):
chunk = lines[i * batch_size : (i + 1) * batch_size]
(WORK / f"full-batch-{i+1:02d}").write_text("\n".join(chunk) + "\n")
print(f"prepped {len(lines)} files into {n_batches} batches in {WORK}")
print(f"\nDispatch {n_batches} Haiku subagents (one per batch).")
print(f"After all desc-full-NN.tsv files exist, run: pipeline.py plan --src '{src}'")
# ---------- plan ----------
def plan(src: Path, max_words: int) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
descs_paths = sorted(WORK.glob("desc-full-*.tsv"))
if not descs_paths:
sys.exit("no desc-full-*.tsv files found in /tmp/screenshot-rename")
# Map normalized-filename → AI description. Haiku may write the filename
# with or without U+202F; normalize on both sides.
descs = {}
bad_split = []
for p in descs_paths:
for lineno, line in enumerate(p.read_text().splitlines(), 1):
line = line.rstrip()
if not line:
continue
cols = line.split("\t", 1)
if len(cols) != 2:
bad_split.append(f"{p.name}:L{lineno}: {line!r}")
continue
descs[norm_ws(cols[0])] = cols[1].strip()
print(f"aggregated {len(descs)} description rows from {len(descs_paths)} batches")
existing = set(os.listdir(src))
plan_rows = []
errors = list(bad_split)
seen = {}
for actual in sorted(existing):
parts = parse_filename(actual)
if parts is None:
continue
norm_name = norm_ws(actual)
desc = descs.get(norm_name)
if not desc:
errors.append(f"no desc for: {actual!r}")
continue
try:
new = build_new_name(parts, desc, max_words)
except ValueError as e:
errors.append(f"{actual!r}: {e}")
continue
if new == actual:
errors.append(f"same: {actual!r}")
continue
if new in existing:
errors.append(f"target exists in DEST: {new!r}")
continue
if new in seen:
errors.append(f"plan collision: {new!r} from {actual!r} and {seen[new]!r}")
continue
seen[new] = actual
plan_rows.append((actual, new))
print(f"plan: {len(plan_rows)} renames, {len(errors)} errors")
if errors:
print("\nERRORS:")
for e in errors[:30]:
print(f" {e}")
if len(errors) > 30:
print(f" ... and {len(errors) - 30} more")
plan_path = WORK / "plan-full.tsv"
with plan_path.open("w") as f:
for orig, new in plan_rows:
f.write(f"{orig}\t{new}\n")
print(f"\nplan saved: {plan_path}")
if plan_rows:
print(f"sample (every {max(1, len(plan_rows)//6)}th row):")
step = max(1, len(plan_rows) // 6)
for i in range(0, len(plan_rows), step):
orig, new = plan_rows[i]
print(f" {orig}\n{new}\n")
print(f"if plan looks good: pipeline.py execute --src '{src}'")
# ---------- execute ----------
def execute(src: Path) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
plan_path = WORK / "plan-full.tsv"
if not plan_path.exists():
sys.exit(f"no plan: {plan_path} (run `pipeline.py plan` first)")
before = len(os.listdir(src))
ok = 0
fail = 0
fails = []
with plan_path.open() as f:
for line in f:
line = line.rstrip()
if not line:
continue
orig, new = line.split("\t", 1)
srcp = src / orig
dstp = src / new
if not srcp.exists():
fails.append(f"src missing: {orig}")
fail += 1
continue
if dstp.exists():
fails.append(f"target exists: {new}")
fail += 1
continue
try:
os.rename(srcp, dstp)
if dstp.exists() and not srcp.exists():
ok += 1
else:
fails.append(f"post-check failed: {orig}")
fail += 1
except OSError as e:
fails.append(f"rename error {orig}: {e}")
fail += 1
after = len(os.listdir(src))
print(f"ok={ok} fail={fail} before={before} after={after}")
if before != after:
print("⚠ FILE COUNT CHANGED — investigate immediately")
sys.exit(2)
print("file count unchanged ✓")
if fails:
fails_path = WORK / "rename-fails.txt"
fails_path.write_text("\n".join(fails))
print(f"failures logged: {fails_path}")
for x in fails[:5]:
print(f" {x}")
# ---------- main ----------
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
sub = p.add_subparsers(dest="cmd", required=True)
p_prep = sub.add_parser("prep", help="extract frames, resize, build batches")
p_prep.add_argument("--src", type=Path, required=True)
p_prep.add_argument("--batch-size", type=int, default=19)
p_plan = sub.add_parser("plan", help="build & validate rename plan")
p_plan.add_argument("--src", type=Path, required=True)
p_plan.add_argument("--max-words", type=int, default=8)
p_exec = sub.add_parser("execute", help="apply rename plan with safety checks")
p_exec.add_argument("--src", type=Path, required=True)
args = p.parse_args()
if args.cmd == "prep":
prep(args.src, args.batch_size)
elif args.cmd == "plan":
plan(args.src, args.max_words)
elif args.cmd == "execute":
execute(args.src)
if __name__ == "__main__":
main()