diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
| commit | d81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch) | |
| tree | 2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/cross-agent-comms/cross-agent-recv | |
| parent | 201377f57430ef28d02e703a2191434bbee55c75 (diff) | |
| download | rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip | |
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-recv')
| -rwxr-xr-x | .ai/scripts/cross-agent-comms/cross-agent-recv | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-recv b/.ai/scripts/cross-agent-comms/cross-agent-recv new file mode 100755 index 0000000..b67533a --- /dev/null +++ b/.ai/scripts/cross-agent-comms/cross-agent-recv @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +"""Cross-agent message receiver. + +See cross-agent-recv.md for the full contract. Reads one message file and +emits a structured decision the agent acts on: + + process | dedup | query | reject + +Decision exit codes: + 0 = process 1 = dedup 2 = query 3 = reject + +When HALT is set, the script refuses to verify or dedup and leaves the +inbound file in place — resume picks it up via cold-start. +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import re +import shutil +import subprocess +import sys +from pathlib import Path + +CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" +HALT_FILE = CONFIG_DIR / "HALT" +EXPECTED_PROTOCOL_VERSION = "5" + +REQUIRED_FRONTMATTER = ["TITLE", "CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"] +VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"} + +DEC_PROCESS = "process" +DEC_DEDUP = "dedup" +DEC_QUERY = "query" +DEC_REJECT = "reject" + +EXIT_FOR_DECISION = { + DEC_PROCESS: 0, + DEC_DEDUP: 1, + DEC_QUERY: 2, + DEC_REJECT: 3, +} + +EXIT_HALT = 5 + + +def err(msg: str) -> None: + print(msg, file=sys.stderr) + + +def check_halt() -> None: + if HALT_FILE.exists(): + try: + reason = HALT_FILE.read_text().strip() + except OSError: + err("halt active (HALT file present but unreadable; treated as halted)") + sys.exit(EXIT_HALT) + msg = "halt active; leaving inbound message in place (resume will pick up)" + if reason: + msg = f"{msg}: {reason}" + err(msg) + sys.exit(EXIT_HALT) + + +def parse_frontmatter(path: Path) -> dict[str, str]: + try: + text = path.read_text() + except OSError as e: + return {"_parse_error": f"cannot read: {e}"} + fm: dict[str, str] = {} + for line in text.splitlines(): + line = line.rstrip() + if not line: + if fm: + break + continue + m = re.match(r"#\+([A-Z_]+):\s*(.*)", line) + if m: + fm[m.group(1)] = m.group(2).strip() + elif fm: + break + return fm + + +def emit_decision( + decision: str, + reason: str | None, + fm: dict[str, str], + sha256: str | None, + args: argparse.Namespace, +) -> int: + payload = { + "decision": decision, + "reason": reason, + "message_type": fm.get("MESSAGE_TYPE"), + "conversation_id": fm.get("CONVERSATION_ID"), + "sequence": fm.get("SEQUENCE"), + "timestamp": fm.get("TIMESTAMP"), + "sha256": sha256, + } + if args.json: + print(json.dumps(payload, indent=None if args.compact_json else 2)) + else: + print(f"decision: {decision}") + if reason: + print(f"reason: {reason}") + for k in ("message_type", "conversation_id", "sequence", "timestamp"): + v = payload[k] + if v is not None: + print(f"{k}: {v}") + if sha256: + print(f"sha256: {sha256}") + return EXIT_FOR_DECISION[decision] + + +def gpg_verify(message_path: Path, sig_path: Path) -> tuple[bool, str]: + try: + result = subprocess.run( + ["gpg", "--verify", str(sig_path), str(message_path)], + capture_output=True, + text=True, + ) + except FileNotFoundError: + return False, "gpg not installed" + if result.returncode == 0: + return True, "" + return False, result.stderr.strip().splitlines()[-1] if result.stderr.strip() else f"exit {result.returncode}" + + +def sha256_of(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +def find_dedup_match(message_path: Path, fm: dict[str, str], my_hash: str) -> tuple[str, str | None]: + """Scan the message's directory for same-CONVERSATION_ID/SEQUENCE files. + + Returns (decision, reason) — decision is DEC_DEDUP for an exact-hash match, + or DEC_PROCESS when no match or hash differs (sequence collision is OK). + """ + parent = message_path.parent + conv_id = fm["CONVERSATION_ID"] + sequence = fm["SEQUENCE"] + for sibling in parent.iterdir(): + if sibling == message_path or not sibling.is_file() or sibling.suffix != ".org": + continue + sib_fm = parse_frontmatter(sibling) + if sib_fm.get("CONVERSATION_ID") != conv_id or sib_fm.get("SEQUENCE") != sequence: + continue + # Same conv-id + same sequence — check hash. + if sha256_of(sibling) == my_hash: + return DEC_DEDUP, f"identical retry of {sibling.name}" + return DEC_PROCESS, None + + +def check_requires_tools(fm: dict[str, str]) -> tuple[bool, list[str]]: + """REQUIRES_TOOLS is a comma-separated list of tool names. + + For v5, "tool available" is a heuristic: an executable on PATH whose name + matches the tool slug. MCP availability is currently out of scope (no + portable way to query it from a CLI). + """ + tools_field = fm.get("REQUIRES_TOOLS") + if not tools_field: + return True, [] + tools = [t.strip() for t in tools_field.split(",") if t.strip()] + missing = [t for t in tools if shutil.which(t) is None] + return len(missing) == 0, missing + + +def main() -> int: + parser = argparse.ArgumentParser(description="Receive and decide on a cross-agent message.") + parser.add_argument("message_file", type=Path) + parser.add_argument("--no-verify", action="store_true", help="Skip GPG verification (testing only)") + parser.add_argument("--no-dedup", action="store_true", help="Skip SHA-256 dedup against existing files") + parser.add_argument("--protocol-version", default=EXPECTED_PROTOCOL_VERSION, + help="Override expected protocol version (default: 5)") + parser.add_argument("--json", action="store_true", help="Emit JSON output") + parser.add_argument("--compact-json", action="store_true", help="Compact JSON (no indent)") + args = parser.parse_args() + + check_halt() + + if not args.message_file.is_file(): + err(f"message file not found: {args.message_file}") + return EXIT_FOR_DECISION[DEC_REJECT] + + fm = parse_frontmatter(args.message_file) + if "_parse_error" in fm: + return emit_decision(DEC_REJECT, fm["_parse_error"], {}, None, args) + + # Step 1: frontmatter sanity-check. + missing = [k for k in REQUIRED_FRONTMATTER if k not in fm] + if missing: + return emit_decision( + DEC_REJECT, f"frontmatter missing required fields: {', '.join(missing)}", fm, None, args + ) + if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES: + return emit_decision( + DEC_REJECT, f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}", fm, None, args + ) + + # Step 2: PROTOCOL_VERSION check. + if fm["PROTOCOL_VERSION"] != args.protocol_version: + return emit_decision( + DEC_QUERY, + f"PROTOCOL_VERSION mismatch: expected {args.protocol_version}, got {fm['PROTOCOL_VERSION']}", + fm, + None, + args, + ) + + # Step 3: GPG verify. + if not args.no_verify: + sig_path = args.message_file.with_suffix(args.message_file.suffix + ".asc") + if not sig_path.is_file(): + return emit_decision(DEC_REJECT, f"signature file missing: {sig_path.name}", fm, None, args) + ok, gpg_err = gpg_verify(args.message_file, sig_path) + if not ok: + return emit_decision(DEC_REJECT, f"gpg verify failed: {gpg_err}", fm, None, args) + + # Step 4: SHA-256 dedup. + my_hash = sha256_of(args.message_file) + if not args.no_dedup: + decision, reason = find_dedup_match(args.message_file, fm, my_hash) + if decision == DEC_DEDUP: + return emit_decision(DEC_DEDUP, reason, fm, my_hash, args) + + # Step 5: REQUIRES_TOOLS check. + ok, missing_tools = check_requires_tools(fm) + if not ok: + return emit_decision( + DEC_QUERY, + f"required tools unavailable: {', '.join(missing_tools)}", + fm, + my_hash, + args, + ) + + # Step 6: process. + return emit_decision(DEC_PROCESS, None, fm, my_hash, args) + + +if __name__ == "__main__": + sys.exit(main()) |
