Files
Anthony Cardinale 030a40aa4b add btime fallback, app-library exclusion, --year, --include-untagged
Behavior changes (all opt-in or safety-first):
- prep refuses to operate inside .photoslibrary, .lrlibrary, .aplibrary,
  .fcpbundle, .band, .logicx, .app, etc. unless --allow-app-libraries
- --year YYYY restricts to files whose embedded ts (or btime) starts with YYYY
- --include-untagged accepts hand-named image files (no CleanShot/Screenshot
  prefix) and dates them via stat btime → mtime fallback. Gated on the folder
  containing ≥10 tagged matches to prevent sweeping ~/Pictures or similar
- prep pre-pass auto-normalizes the missing-space typo
  ('foo barCleanShot 2026-...' → 'foo bar CleanShot 2026-...') by os.rename
- plan now iterates the desc-tsv contents instead of the full src dir, with
  alt-extension fallback for Haiku's occasional .jpg-instead-of-.png echo
- build_new_name supports app=None (untagged) — emits
  '<keywords> - <Description> - YYYY-MM-DD.ext'

SKILL.md: gotchas #14-17 documenting each new guard, run-order updated
with the new flags, common-mistakes table extended.

Verified by smoke test with seeded files: --year filter, --include-untagged
threshold gate, app-library refusal, and typo normalization all behave.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 11:14:55 -04:00

627 lines
21 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Screenshot-rename pipeline.
Three subcommands:
prep — extract frames, resize, build manifest, split into batches
plan — aggregate desc-*.tsv files, validate, write rename plan
execute — apply the plan with safety checks
The Haiku-subagent dispatch step happens between `prep` and `plan` and is
performed by Claude Code in-session, not by this script.
Recognizes both `CleanShot ...` and Apple `Screenshot ...` filenames in one
pass, preserves any leading user-typed keyword prefix, and skips files that
are already in the renamed `App - Description - timestamp.ext` form.
Also handles, behind opt-in flags:
--year YYYY restrict to files whose embedded ts (or file btime)
starts with YYYY
--include-untagged include image files that lack any CleanShot/Screenshot
prefix, dating them from filesystem btime/mtime;
requires the folder to look like a screenshot dump
(≥10 tagged matches) so we don't sweep up arbitrary
photos.
Refuses to operate on paths inside known app library packages
(.photoslibrary, .aplibrary, .lrlibrary, etc.) unless --allow-app-libraries
is passed — guards against accidental runs over Apple Photos / Lightroom
catalogs when invoked on a parent dir.
"""
import argparse
import os
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
WORK = Path("/tmp/screenshot-rename")
FRAMES = WORK / "frames"
SMALL = WORK / "small"
# Apple's Screenshot tool inserts U+202F (narrow no-break space) before AM/PM.
# Haiku normalizes it to ASCII space when echoing the filename, so desc-dict
# lookups fail silently. Normalize on both sides AND emit ASCII space.
NNBSP = ""
def norm_ws(s: str) -> str:
return s.replace(NNBSP, " ")
# Filename parser. Captures:
# keywords — optional leading user-typed prefix (e.g. "jojo travel flight")
# app — CleanShot | Screenshot
# ts — "2026-MM-DD at HH.MM.SS" optionally followed by " AM" or " PM"
# dup — optional "(2)" or " 2" duplicate marker
# ext — file extension
#
# Run norm_ws() on the filename BEFORE matching so U+202F doesn't break the
# meridiem branch.
APP_PATTERN = re.compile(
r"^(?:(?P<keywords>.+?)\s+)?"
r"(?P<app>CleanShot|Screenshot)\s+"
r"(?P<ts>\d{4}-\d{2}-\d{2}\s+at\s+\d{1,2}\.\d{2}\.\d{2}(?:\s*[AP]M)?)"
r"(?P<dup>\(\d+\)|\s+\d+)?"
r"\.(?P<ext>[^.]+)$"
)
# Already-renamed: "App - <description> - <timestamp>(<dup>)?.<ext>"
ALREADY_RENAMED = re.compile(
r"^(?:CleanShot|Screenshot)\s+-\s+.+?\s+-\s+"
r"\d{4}-\d{2}-\d{2}\s+at\s+\d{1,2}\.\d{2}\.\d{2}(?:\s*[AP]M)?"
r"(?:\(\d+\))?\.[^.]+$"
)
# Untagged-already-renamed: "<keywords> - <description> - YYYY-MM-DD.<ext>"
# We use this to skip the result of a previous --include-untagged run.
UNTAGGED_RENAMED = re.compile(
r"^.+?\s+-\s+.+?\s+-\s+\d{4}-\d{2}-\d{2}(?:\(\d+\))?\.[^.]+$"
)
# User keyword abutting CleanShot/Screenshot with no space.
# e.g. "weird hightlighted tabCleanShot 2026-..." → insert space.
MISSING_SPACE_PATTERN = re.compile(
r"(?P<pre>\S)(?P<app>CleanShot|Screenshot)(?P<post>\s+\d{4}-)"
)
# Folder-name patterns we refuse to walk into. Apple Photos packages, Lightroom
# catalogs, Aperture, Final Cut, etc. — these contain images managed by other
# apps and should never be renamed by this skill.
APP_LIB_SUFFIXES = (
".photoslibrary",
".aplibrary",
".lrlibrary",
".lrcat",
".lrcat-data",
".tvlibrary",
".tvprojcache",
".fcpbundle",
".band",
".logicx",
".app",
)
APP_LIB_NAMES = ("Photo Booth Library", "Photos Library")
IMAGE_EXTS = (".png", ".gif", ".jpg", ".jpeg", ".webp", ".heic")
VIDEO_EXTS = (".mp4", ".mov")
PDF_EXTS = (".pdf",)
def is_in_app_library(p: Path) -> bool:
"""True if any segment of p is an app library package (or a known name)."""
try:
rp = p.resolve()
except OSError:
rp = p
for seg in rp.parts:
if any(seg.endswith(suf) for suf in APP_LIB_SUFFIXES):
return True
if seg in APP_LIB_NAMES:
return True
return False
def file_date(p: Path) -> str:
"""YYYY-MM-DD from stat btime when sane, else mtime.
On macOS `stat -f %SB -t %F` returns the file's birth time. If unset or
before 1990 (suggests fallback or broken metadata), use mtime instead.
"""
try:
r = subprocess.run(
["stat", "-f", "%SB", "-t", "%F", str(p)],
capture_output=True, text=True, timeout=5,
)
if r.returncode == 0:
s = r.stdout.strip()
if s and s.startswith(("19", "20")) and s >= "1990-01-01":
return s
except (OSError, subprocess.SubprocessError):
pass
return datetime.fromtimestamp(p.stat().st_mtime).strftime("%Y-%m-%d")
def title_case(s: str) -> str:
s = re.sub(r"\s+", " ", s.strip())
return " ".join(w[:1].upper() + w[1:] if w else w for w in s.split(" "))
def parse_filename(name: str):
"""Parts dict for tagged filenames; None for already-renamed or non-match."""
n = norm_ws(name)
if ALREADY_RENAMED.match(n):
return None
m = APP_PATTERN.match(n)
if not m:
return None
return {
"keywords": (m.group("keywords") or "").strip(),
"app": m.group("app"),
"ts": m.group("ts"),
"dup": m.group("dup") or "",
"ext": m.group("ext"),
}
def synthesize_untagged_parts(p: Path):
"""Parts dict for an untagged file (no CleanShot/Screenshot prefix).
Date is the file's btime/mtime since the filename has no embedded ts.
Returns None if file doesn't exist or has no extension.
"""
if not p.is_file():
return None
name = norm_ws(p.name)
if UNTAGGED_RENAMED.match(name):
return None
stem, dotext = os.path.splitext(name)
if not dotext:
return None
return {
"keywords": stem,
"app": None,
"ts": file_date(p),
"dup": "",
"ext": dotext[1:],
}
def normalize_typo_filename(name: str) -> str:
"""Insert space between user-keyword and CleanShot/Screenshot if abutting.
'weird tabCleanShot 2026-...''weird tab CleanShot 2026-...'
No-op if the pattern doesn't match.
"""
return MISSING_SPACE_PATTERN.sub(r"\g<pre> \g<app>\g<post>", name)
def build_new_name(parts: dict, ai_desc: str, max_words: int) -> str:
words = ai_desc.split()[:max_words]
cleaned = []
for w in words:
cw = "".join(c for c in w if c.isalnum())
if cw:
cleaned.append(cw)
if len(cleaned) < 6:
raise ValueError(f"<6 words after sanitize: {ai_desc!r}")
titled = title_case(" ".join(cleaned[:max_words]))
dup = parts["dup"]
if dup and not dup.startswith("("):
dup = "(" + dup.strip() + ")"
if parts["app"]:
pieces = []
if parts["keywords"]:
pieces.append(title_case(parts["keywords"]))
pieces.append(titled)
full_desc = " ".join(pieces)
return f'{parts["app"]} - {full_desc} - {parts["ts"]}{dup}.{parts["ext"]}'
# Untagged: <keywords> - <ai-desc> - <date>.<ext> with explicit separator
kw = title_case(parts["keywords"]) if parts["keywords"] else ""
if kw:
return f"{kw} - {titled} - {parts['ts']}{dup}.{parts['ext']}"
return f"{titled} - {parts['ts']}{dup}.{parts['ext']}"
def run(cmd, **kw):
return subprocess.run(cmd, capture_output=True, text=True, **kw)
def parts_year(parts) -> str:
"""Extract YYYY from parts (tagged or untagged)."""
m = re.match(r"(\d{4})", parts["ts"])
return m.group(1) if m else ""
# ---------- prep ----------
def prep(
src: Path,
batch_size: int,
year: str | None = None,
include_untagged: bool = False,
allow_app_libraries: bool = False,
untagged_threshold: int = 10,
) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
if is_in_app_library(src) and not allow_app_libraries:
sys.exit(
f"refusing to run inside an app library package: {src}\n"
f"if intentional, pass --allow-app-libraries"
)
WORK.mkdir(parents=True, exist_ok=True)
FRAMES.mkdir(exist_ok=True)
SMALL.mkdir(exist_ok=True)
# Pre-pass: normalize missing-space typos in source filenames.
typo_renamed = 0
for p in sorted(src.iterdir()):
if not p.is_file():
continue
n = norm_ws(p.name)
fixed = normalize_typo_filename(n)
if fixed != n:
new_path = src / fixed
if not new_path.exists():
os.rename(p, new_path)
typo_renamed += 1
print(f"normalized typo: {p.name!r}{fixed!r}")
if typo_renamed:
print(f"pre-pass: normalized {typo_renamed} missing-space typo(s)\n")
# Main pass: classify each file.
tagged_count = 0
untagged_candidates = []
eligible = [] # list of (path, parts) tuples
skipped_already = 0
skipped_other = 0
skipped_year = 0
refused_lib = 0
for p in sorted(src.iterdir()):
if not p.is_file():
continue
if is_in_app_library(p) and not allow_app_libraries:
refused_lib += 1
continue
parts = parse_filename(p.name)
if parts is not None:
tagged_count += 1
if year and parts_year(parts) != year:
skipped_year += 1
continue
eligible.append((p, parts))
continue
n = norm_ws(p.name)
if ALREADY_RENAMED.match(n) or UNTAGGED_RENAMED.match(n):
skipped_already += 1
continue
# Untagged candidate — defer until we know whether the folder qualifies
# as a screenshot dump.
if p.suffix.lower() in IMAGE_EXTS + VIDEO_EXTS + PDF_EXTS:
untagged_candidates.append(p)
else:
skipped_other += 1
if include_untagged:
if tagged_count >= untagged_threshold:
for p in untagged_candidates:
parts = synthesize_untagged_parts(p)
if parts is None:
skipped_other += 1
continue
if year and parts_year(parts) != year:
skipped_year += 1
continue
eligible.append((p, parts))
else:
print(
f"--include-untagged ignored: only {tagged_count} tagged file(s), "
f"need ≥{untagged_threshold} for the folder to qualify as a screenshot dump"
)
skipped_other += len(untagged_candidates)
else:
if untagged_candidates:
print(
f"hint: {len(untagged_candidates)} untagged image/video file(s) skipped; "
f"pass --include-untagged to include them (date from btime/mtime)"
)
skipped_other += len(untagged_candidates)
if not eligible:
sys.exit(
f"no eligible files in {src} "
f"(skipped: {skipped_already} already-renamed, "
f"{skipped_year} wrong-year, "
f"{skipped_other} other"
+ (f", {refused_lib} in app libraries" if refused_lib else "")
+ ")"
)
summary = (
f"found {len(eligible)} eligible files "
f"(skipped: {skipped_already} already-renamed, "
f"{skipped_year} wrong-year, "
f"{skipped_other} other"
)
if refused_lib:
summary += f", {refused_lib} in app libraries"
summary += ")"
print(summary)
# Resize/extract for vision and write manifest.
manifest = WORK / "all.tsv"
with manifest.open("w") as out:
for f, _parts in eligible:
base = f.stem
ext = f.suffix.lower()
if ext in VIDEO_EXTS:
frame = FRAMES / f"{base}.jpg"
if not frame.exists():
run([
"ffmpeg", "-y", "-ss", "1", "-i", str(f),
"-frames:v", "1", "-q:v", "3", str(frame),
])
if not frame.exists():
print(f"WARN ffmpeg failed: {f.name}", file=sys.stderr)
continue
vision_src = frame
elif ext in PDF_EXTS:
frame = FRAMES / f"{base}.jpg"
if not frame.exists():
run(["sips", "-s", "format", "jpeg", str(f), "--out", str(frame)])
if not frame.exists():
print(f"WARN sips failed on pdf: {f.name}", file=sys.stderr)
continue
vision_src = frame
elif ext in IMAGE_EXTS:
vision_src = f
else:
print(f"SKIP unknown ext: {f.name}", file=sys.stderr)
continue
small = SMALL / f"{base}.jpg"
if not small.exists():
run([
"sips", "-Z", "1568", "-s", "format", "jpeg",
str(vision_src), "--out", str(small),
])
if not small.exists():
print(f"WARN resize failed: {f.name}", file=sys.stderr)
continue
out.write(f"{small}\t{f.name}\n")
for old in WORK.glob("full-batch-*"):
old.unlink()
lines = manifest.read_text().splitlines()
n_batches = max(1, (len(lines) + batch_size - 1) // batch_size)
for i in range(n_batches):
chunk = lines[i * batch_size : (i + 1) * batch_size]
(WORK / f"full-batch-{i+1:02d}").write_text("\n".join(chunk) + "\n")
print(f"prepped {len(lines)} files into {n_batches} batches in {WORK}")
print(f"\nDispatch {n_batches} Haiku subagents (one per batch).")
print(f"After all desc-full-NN.tsv files exist, run: pipeline.py plan --src '{src}'")
# ---------- plan ----------
def _find_alt_extension(orig: str, existing: set[str]) -> str | None:
"""Haiku sometimes returns the resized .jpg extension instead of the
real .png/.gif/.mp4. Try alt extensions of the same stem."""
stem, dotext = os.path.splitext(orig)
if not dotext:
return None
for alt in IMAGE_EXTS + VIDEO_EXTS + PDF_EXTS:
cand = stem + alt
if cand != orig and cand in existing:
return cand
return None
def plan(src: Path, max_words: int) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
descs_paths = sorted(WORK.glob("desc-full-*.tsv"))
if not descs_paths:
sys.exit("no desc-full-*.tsv files found in /tmp/screenshot-rename")
descs = {}
bad_split = []
for p in descs_paths:
for lineno, line in enumerate(p.read_text().splitlines(), 1):
line = line.rstrip()
if not line:
continue
cols = line.split("\t", 1)
if len(cols) != 2:
bad_split.append(f"{p.name}:L{lineno}: {line!r}")
continue
descs[norm_ws(cols[0])] = cols[1].strip()
print(f"aggregated {len(descs)} description rows from {len(descs_paths)} batches")
existing = set(os.listdir(src))
plan_rows = []
errors = list(bad_split)
seen = {}
for orig in sorted(descs.keys()):
# Locate the actual file in src (may have an alt extension if Haiku
# echoed the resized .jpg).
if orig in existing:
actual = orig
else:
alt = _find_alt_extension(orig, existing)
if alt is None:
errors.append(f"src not found: {orig!r}")
continue
actual = alt
parts = parse_filename(actual)
if parts is None:
parts = synthesize_untagged_parts(src / actual)
if parts is None:
errors.append(f"can't parse: {actual!r}")
continue
desc = descs[orig]
try:
new = build_new_name(parts, desc, max_words)
except ValueError as e:
errors.append(f"{actual!r}: {e}")
continue
if new == actual:
errors.append(f"same: {actual!r}")
continue
if new in existing:
errors.append(f"target exists in DEST: {new!r}")
continue
if new in seen:
errors.append(
f"plan collision: {new!r} from {actual!r} and {seen[new]!r}"
)
continue
seen[new] = actual
plan_rows.append((actual, new))
print(f"plan: {len(plan_rows)} renames, {len(errors)} errors")
if errors:
print("\nERRORS:")
for e in errors[:30]:
print(f" {e}")
if len(errors) > 30:
print(f" ... and {len(errors) - 30} more")
plan_path = WORK / "plan-full.tsv"
with plan_path.open("w") as f:
for orig, new in plan_rows:
f.write(f"{orig}\t{new}\n")
print(f"\nplan saved: {plan_path}")
if plan_rows:
step = max(1, len(plan_rows) // 6)
print(f"sample (every {step}th row):")
for i in range(0, len(plan_rows), step):
orig, new = plan_rows[i]
print(f" {orig}\n{new}\n")
print(f"if plan looks good: pipeline.py execute --src '{src}'")
# ---------- execute ----------
def execute(src: Path) -> None:
if not src.is_dir():
sys.exit(f"source not a directory: {src}")
plan_path = WORK / "plan-full.tsv"
if not plan_path.exists():
sys.exit(f"no plan: {plan_path} (run `pipeline.py plan` first)")
before = len(os.listdir(src))
ok = 0
fail = 0
fails = []
with plan_path.open() as f:
for line in f:
line = line.rstrip()
if not line:
continue
orig, new = line.split("\t", 1)
srcp = src / orig
dstp = src / new
if not srcp.exists():
fails.append(f"src missing: {orig}")
fail += 1
continue
if dstp.exists():
fails.append(f"target exists: {new}")
fail += 1
continue
try:
os.rename(srcp, dstp)
if dstp.exists() and not srcp.exists():
ok += 1
else:
fails.append(f"post-check failed: {orig}")
fail += 1
except OSError as e:
fails.append(f"rename error {orig}: {e}")
fail += 1
after = len(os.listdir(src))
print(f"ok={ok} fail={fail} before={before} after={after}")
if before != after:
print("⚠ FILE COUNT CHANGED — investigate immediately")
sys.exit(2)
print("file count unchanged ✓")
if fails:
fails_path = WORK / "rename-fails.txt"
fails_path.write_text("\n".join(fails))
print(f"failures logged: {fails_path}")
for x in fails[:5]:
print(f" {x}")
# ---------- main ----------
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
sub = p.add_subparsers(dest="cmd", required=True)
p_prep = sub.add_parser("prep", help="extract frames, resize, build batches")
p_prep.add_argument("--src", type=Path, required=True)
p_prep.add_argument("--batch-size", type=int, default=19)
p_prep.add_argument(
"--year",
type=str,
default=None,
help="restrict to YYYY (matches embedded ts or btime)",
)
p_prep.add_argument(
"--include-untagged",
action="store_true",
help="include image files that lack a CleanShot/Screenshot prefix; "
"requires the folder to have ≥10 tagged files (configurable)",
)
p_prep.add_argument(
"--untagged-threshold",
type=int,
default=10,
help="minimum tagged-file count for a folder to be treated as a "
"screenshot dump (default 10)",
)
p_prep.add_argument(
"--allow-app-libraries",
action="store_true",
help="bypass the .photoslibrary / .lrlibrary etc. guard (DANGEROUS)",
)
p_plan = sub.add_parser("plan", help="build & validate rename plan")
p_plan.add_argument("--src", type=Path, required=True)
p_plan.add_argument("--max-words", type=int, default=8)
p_exec = sub.add_parser("execute", help="apply rename plan with safety checks")
p_exec.add_argument("--src", type=Path, required=True)
args = p.parse_args()
if args.cmd == "prep":
prep(
args.src,
args.batch_size,
year=args.year,
include_untagged=args.include_untagged,
allow_app_libraries=args.allow_app_libraries,
untagged_threshold=args.untagged_threshold,
)
elif args.cmd == "plan":
plan(args.src, args.max_words)
elif args.cmd == "execute":
execute(args.src)
if __name__ == "__main__":
main()