#!/usr/bin/env python3 """Point-in-time snapshot of pending cross-agent messages across local projects. See cross-agent-status.md. Pending = messages in inbox/from-agents/ whose CONVERSATION_ID has no MESSAGE_TYPE: release at a later #+TIMESTAMP. HALT: prints a prominent banner before normal output, but continues to enumerate. """ from __future__ import annotations import argparse import glob import json import os import re import sys from pathlib import Path CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" HALT_FILE = CONFIG_DIR / "HALT" DEFAULT_GLOB = str(Path.home() / "projects" / "*" / "inbox" / "from-agents") + "/" def parse_frontmatter(path: Path) -> dict[str, str]: try: text = path.read_text() except OSError: return {} fm: dict[str, str] = {} for line in text.splitlines(): line = line.rstrip() if not line: if fm: break continue m = re.match(r"#\+([A-Z_]+):\s*(.*)", line) if m: fm[m.group(1)] = m.group(2).strip() elif fm: break return fm def project_name_from_path(path: str) -> str: """Walk up from path to find ~/projects//...""" home = str(Path.home()) parts = Path(path).parts for i, part in enumerate(parts): if part == "projects" and i + 1 < len(parts) and str(Path(*parts[: i + 1])) == os.path.join(home, "projects"): return parts[i + 1] # Fallback: dir three levels up from the .org file (project/inbox/from-agents/file.org) return Path(path).parent.parent.parent.name def scan_project(inbox_dir: Path) -> tuple[int, str | None, int | None]: """Return (pending_count, most_recent_filename_or_None, most_recent_age_seconds_or_None).""" if not inbox_dir.is_dir(): return 0, None, None # Group .org files by CONVERSATION_ID, also collect release timestamps per conv. org_files = sorted(inbox_dir.glob("*.org")) if not org_files: return 0, None, None by_conv: dict[str, list[tuple[str, str, Path]]] = {} # conv_id -> [(timestamp, msg_type, path)] for f in org_files: fm = parse_frontmatter(f) conv = fm.get("CONVERSATION_ID") ts = fm.get("TIMESTAMP") mt = fm.get("MESSAGE_TYPE") if not conv or not ts or not mt: # Malformed file: count as pending under conv "_unparseable". by_conv.setdefault("_unparseable", []).append(("", "request", f)) continue by_conv.setdefault(conv, []).append((ts, mt, f)) pending_files: list[Path] = [] for conv, entries in by_conv.items(): entries.sort(key=lambda e: e[0]) # Find the latest release timestamp. release_ts = None for ts, mt, _f in entries: if mt == "release" and (release_ts is None or ts > release_ts): release_ts = ts for ts, mt, f in entries: if mt == "release": continue if release_ts is not None and ts <= release_ts: continue pending_files.append(f) if not pending_files: return 0, None, None # Most-recent by mtime (proxy for arrival order). most_recent = max(pending_files, key=lambda p: p.stat().st_mtime) import time age = int(time.time() - most_recent.stat().st_mtime) return len(pending_files), most_recent.name, age def fmt_age(seconds: int | None) -> str: if seconds is None: return "—" if seconds < 60: return f"{seconds}s ago" if seconds < 3600: return f"{seconds // 60} min ago" if seconds < 86400: return f"{seconds // 3600} hr ago" return f"{seconds // 86400} day(s) ago" def render_banner_if_halt() -> None: if not HALT_FILE.exists(): return try: reason = HALT_FILE.read_text().strip() except OSError: reason = "(HALT file unreadable; treated as halted)" print("⚠ HALT ACTIVE — cross-agent comms paused") if reason: print(f" reason: {reason}") print(f" clear: rm {HALT_FILE} (or: cross-agent-resume)") print() def main() -> int: parser = argparse.ArgumentParser(description="Snapshot of pending cross-agent messages across local projects.") parser.add_argument("--json", action="store_true", help="Emit JSON output") parser.add_argument("--projects-glob", default=DEFAULT_GLOB, help=f"Glob for project from-agents dirs (default: {DEFAULT_GLOB})") args = parser.parse_args() render_banner_if_halt() matched = sorted(glob.glob(args.projects_glob)) rows = [] for path in matched: inbox = Path(path) if not inbox.is_dir(): continue proj = project_name_from_path(path) count, most_recent, age = scan_project(inbox) rows.append({ "name": proj, "pending_count": count, "most_recent": ( {"filename": most_recent, "age_seconds": age} if most_recent else None ), }) # Sort: pending-first, then alphabetical by name. rows.sort(key=lambda r: (-r["pending_count"], r["name"])) if args.json: import datetime as _dt payload = { "scanned_at": _dt.datetime.now(_dt.timezone.utc).isoformat(), "halt_active": HALT_FILE.exists(), "projects": rows, } print(json.dumps(payload, indent=2)) return 0 if not rows: print("No projects with inbox/from-agents/ found — 0 pending.") return 0 # Human-readable table. name_w = max(len("project"), max(len(r["name"]) for r in rows)) print(f"{'project':<{name_w}} pending most-recent") for r in rows: most_recent_str = "—" if r["most_recent"]: most_recent_str = f"{r['most_recent']['filename']} ({fmt_age(r['most_recent']['age_seconds'])})" print(f"{r['name']:<{name_w}} {r['pending_count']:<7} {most_recent_str}") return 0 if __name__ == "__main__": sys.exit(main())