#!/usr/bin/env python3 """cj-scan — Parse an org file for cj annotations and VERIFY-placement audit. Output: JSON to stdout with three top-level keys: - cj_blocks: every cj annotation found (source-block or legacy-inline form) - verify_tasks: every VERIFY heading with placement validity + suggested promotion target - unclosed_blocks: any source-block fence that opened but never closed Usage: cj-scan FILE.org Companion to the /respond-to-cj-comments skill — the skill calls this script to get a single structured view of every cj annotation and every VERIFY placement violation in a single tool call, instead of stitching the picture together from multiple grep + Read round-trips. """ from __future__ import annotations import json import re import sys from dataclasses import asdict, dataclass from pathlib import Path # VERIFY placement: top-level under a `*` section, or first-level child of a # `**` parent task. Anything else gets a promotion_target suggestion. VALID_VERIFY_DEPTHS = {2, 3} HEADING_RE = re.compile(r"^(\*+)\s+(.*)$") SRC_OPEN_RE = re.compile(r"^\s*#\+begin_src\s+cj:\s*(\S*)\s*$", re.IGNORECASE) SRC_CLOSE_RE = re.compile(r"^\s*#\+end_src\s*$", re.IGNORECASE) BLOCK_OPEN_RE = re.compile(r"^\s*#\+begin_(\w+)(?:\s.*)?$", re.IGNORECASE) LEGACY_CJ_RE = re.compile(r"^\s*cj:\s*(.*)$") VERIFY_KEYWORD_RE = re.compile(r"^VERIFY(\s|\[|$)") @dataclass class HeadingFrame: depth: int heading: str def promotion_target(depth: int) -> int | None: """Return the suggested target depth for a misplaced VERIFY, or None if valid.""" if depth in VALID_VERIFY_DEPTHS: return None if depth < 2: return 2 return 3 def is_verify_heading(heading_text: str) -> bool: """True when heading text begins with the VERIFY keyword (optional priority cookie).""" return bool(VERIFY_KEYWORD_RE.match(heading_text)) def scan_file(path: Path) -> dict[str, object]: """Scan an org file and return cj_blocks + verify_tasks + unclosed_blocks.""" cj_blocks: list[dict[str, object]] = [] verify_tasks: list[dict[str, object]] = [] unclosed_blocks: list[dict[str, object]] = [] heading_stack: list[HeadingFrame] = [] in_cj_block = False block_start_line: int | None = None block_label: str | None = None block_body: list[str] = [] # Tracks a non-cj `#+begin_` wrapper currently in scope. Inside a # wrapper, cj fence patterns are *content* (documentation examples, # quoted prose, snippet definitions) -- not annotations -- so we # suppress matching until the wrapper closes. The closer is type-keyed: # `#+end_example` for example, `#+end_src` for src, etc. wrapper_type: str | None = None file_str = str(path) lines = path.read_text().splitlines() for lineno, line in enumerate(lines, start=1): if in_cj_block: if SRC_CLOSE_RE.match(line): cj_blocks.append({ "file": file_str, "form": "source-block", "start_line": block_start_line, "end_line": lineno, "body": "\n".join(block_body), "label": block_label, "parent_heading_chain": [asdict(h) for h in heading_stack], "parent_depth": heading_stack[-1].depth if heading_stack else 0, }) in_cj_block = False block_start_line = None block_label = None block_body = [] else: block_body.append(line) continue if wrapper_type is not None: wrapper_close_re = re.compile( rf"^\s*#\+end_{re.escape(wrapper_type)}\s*$", re.IGNORECASE, ) if wrapper_close_re.match(line): wrapper_type = None continue m_heading = HEADING_RE.match(line) if m_heading: depth = len(m_heading.group(1)) heading_text = m_heading.group(2).strip() # Pop frames at this depth or deeper before pushing the new one. while heading_stack and heading_stack[-1].depth >= depth: heading_stack.pop() heading_stack.append(HeadingFrame(depth=depth, heading=heading_text)) if is_verify_heading(heading_text): pt = promotion_target(depth) verify_tasks.append({ "file": file_str, "line": lineno, "depth": depth, "heading": heading_text, "valid_depth": pt is None, "promotion_target": pt, }) continue # cj-open must be checked before the generic begin-block match: a # `#+begin_src cj: ...` line matches both patterns, and cj-open is # the more specific intent. m_src_open = SRC_OPEN_RE.match(line) if m_src_open: in_cj_block = True block_start_line = lineno block_label = m_src_open.group(1) or None block_body = [] continue m_block_open = BLOCK_OPEN_RE.match(line) if m_block_open: wrapper_type = m_block_open.group(1).lower() continue m_legacy = LEGACY_CJ_RE.match(line) if m_legacy: cj_blocks.append({ "file": file_str, "form": "legacy-inline", "start_line": lineno, "end_line": lineno, "body": m_legacy.group(1).strip(), "parent_heading_chain": [asdict(h) for h in heading_stack], "parent_depth": heading_stack[-1].depth if heading_stack else 0, }) if in_cj_block: unclosed_blocks.append({ "file": file_str, "start_line": block_start_line, "label": block_label, }) return { "cj_blocks": cj_blocks, "verify_tasks": verify_tasks, "unclosed_blocks": unclosed_blocks, } def main() -> int: if len(sys.argv) != 2: print("Usage: cj-scan FILE.org", file=sys.stderr) return 2 path = Path(sys.argv[1]) if not path.is_file(): print(f"Not a file: {path}", file=sys.stderr) return 2 result = scan_file(path) json.dump(result, sys.stdout, indent=2) sys.stdout.write("\n") return 0 if __name__ == "__main__": sys.exit(main())