aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cross-agent-comms/cross-agent-recv
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-06 21:59:52 -0500
committerCraig Jennings <c@cjennings.net>2026-05-06 21:59:52 -0500
commitd81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch)
tree2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/cross-agent-comms/cross-agent-recv
parent201377f57430ef28d02e703a2191434bbee55c75 (diff)
downloadrulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz
rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-recv')
-rwxr-xr-x.ai/scripts/cross-agent-comms/cross-agent-recv250
1 files changed, 250 insertions, 0 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-recv b/.ai/scripts/cross-agent-comms/cross-agent-recv
new file mode 100755
index 0000000..b67533a
--- /dev/null
+++ b/.ai/scripts/cross-agent-comms/cross-agent-recv
@@ -0,0 +1,250 @@
+#!/usr/bin/env python3
+"""Cross-agent message receiver.
+
+See cross-agent-recv.md for the full contract. Reads one message file and
+emits a structured decision the agent acts on:
+
+ process | dedup | query | reject
+
+Decision exit codes:
+ 0 = process 1 = dedup 2 = query 3 = reject
+
+When HALT is set, the script refuses to verify or dedup and leaves the
+inbound file in place — resume picks it up via cold-start.
+"""
+
+from __future__ import annotations
+
+import argparse
+import hashlib
+import json
+import re
+import shutil
+import subprocess
+import sys
+from pathlib import Path
+
+CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms"
+HALT_FILE = CONFIG_DIR / "HALT"
+EXPECTED_PROTOCOL_VERSION = "5"
+
+REQUIRED_FRONTMATTER = ["TITLE", "CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"]
+VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"}
+
+DEC_PROCESS = "process"
+DEC_DEDUP = "dedup"
+DEC_QUERY = "query"
+DEC_REJECT = "reject"
+
+EXIT_FOR_DECISION = {
+ DEC_PROCESS: 0,
+ DEC_DEDUP: 1,
+ DEC_QUERY: 2,
+ DEC_REJECT: 3,
+}
+
+EXIT_HALT = 5
+
+
+def err(msg: str) -> None:
+ print(msg, file=sys.stderr)
+
+
+def check_halt() -> None:
+ if HALT_FILE.exists():
+ try:
+ reason = HALT_FILE.read_text().strip()
+ except OSError:
+ err("halt active (HALT file present but unreadable; treated as halted)")
+ sys.exit(EXIT_HALT)
+ msg = "halt active; leaving inbound message in place (resume will pick up)"
+ if reason:
+ msg = f"{msg}: {reason}"
+ err(msg)
+ sys.exit(EXIT_HALT)
+
+
+def parse_frontmatter(path: Path) -> dict[str, str]:
+ try:
+ text = path.read_text()
+ except OSError as e:
+ return {"_parse_error": f"cannot read: {e}"}
+ fm: dict[str, str] = {}
+ for line in text.splitlines():
+ line = line.rstrip()
+ if not line:
+ if fm:
+ break
+ continue
+ m = re.match(r"#\+([A-Z_]+):\s*(.*)", line)
+ if m:
+ fm[m.group(1)] = m.group(2).strip()
+ elif fm:
+ break
+ return fm
+
+
+def emit_decision(
+ decision: str,
+ reason: str | None,
+ fm: dict[str, str],
+ sha256: str | None,
+ args: argparse.Namespace,
+) -> int:
+ payload = {
+ "decision": decision,
+ "reason": reason,
+ "message_type": fm.get("MESSAGE_TYPE"),
+ "conversation_id": fm.get("CONVERSATION_ID"),
+ "sequence": fm.get("SEQUENCE"),
+ "timestamp": fm.get("TIMESTAMP"),
+ "sha256": sha256,
+ }
+ if args.json:
+ print(json.dumps(payload, indent=None if args.compact_json else 2))
+ else:
+ print(f"decision: {decision}")
+ if reason:
+ print(f"reason: {reason}")
+ for k in ("message_type", "conversation_id", "sequence", "timestamp"):
+ v = payload[k]
+ if v is not None:
+ print(f"{k}: {v}")
+ if sha256:
+ print(f"sha256: {sha256}")
+ return EXIT_FOR_DECISION[decision]
+
+
+def gpg_verify(message_path: Path, sig_path: Path) -> tuple[bool, str]:
+ try:
+ result = subprocess.run(
+ ["gpg", "--verify", str(sig_path), str(message_path)],
+ capture_output=True,
+ text=True,
+ )
+ except FileNotFoundError:
+ return False, "gpg not installed"
+ if result.returncode == 0:
+ return True, ""
+ return False, result.stderr.strip().splitlines()[-1] if result.stderr.strip() else f"exit {result.returncode}"
+
+
+def sha256_of(path: Path) -> str:
+ h = hashlib.sha256()
+ with path.open("rb") as f:
+ for chunk in iter(lambda: f.read(65536), b""):
+ h.update(chunk)
+ return h.hexdigest()
+
+
+def find_dedup_match(message_path: Path, fm: dict[str, str], my_hash: str) -> tuple[str, str | None]:
+ """Scan the message's directory for same-CONVERSATION_ID/SEQUENCE files.
+
+ Returns (decision, reason) — decision is DEC_DEDUP for an exact-hash match,
+ or DEC_PROCESS when no match or hash differs (sequence collision is OK).
+ """
+ parent = message_path.parent
+ conv_id = fm["CONVERSATION_ID"]
+ sequence = fm["SEQUENCE"]
+ for sibling in parent.iterdir():
+ if sibling == message_path or not sibling.is_file() or sibling.suffix != ".org":
+ continue
+ sib_fm = parse_frontmatter(sibling)
+ if sib_fm.get("CONVERSATION_ID") != conv_id or sib_fm.get("SEQUENCE") != sequence:
+ continue
+ # Same conv-id + same sequence — check hash.
+ if sha256_of(sibling) == my_hash:
+ return DEC_DEDUP, f"identical retry of {sibling.name}"
+ return DEC_PROCESS, None
+
+
+def check_requires_tools(fm: dict[str, str]) -> tuple[bool, list[str]]:
+ """REQUIRES_TOOLS is a comma-separated list of tool names.
+
+ For v5, "tool available" is a heuristic: an executable on PATH whose name
+ matches the tool slug. MCP availability is currently out of scope (no
+ portable way to query it from a CLI).
+ """
+ tools_field = fm.get("REQUIRES_TOOLS")
+ if not tools_field:
+ return True, []
+ tools = [t.strip() for t in tools_field.split(",") if t.strip()]
+ missing = [t for t in tools if shutil.which(t) is None]
+ return len(missing) == 0, missing
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Receive and decide on a cross-agent message.")
+ parser.add_argument("message_file", type=Path)
+ parser.add_argument("--no-verify", action="store_true", help="Skip GPG verification (testing only)")
+ parser.add_argument("--no-dedup", action="store_true", help="Skip SHA-256 dedup against existing files")
+ parser.add_argument("--protocol-version", default=EXPECTED_PROTOCOL_VERSION,
+ help="Override expected protocol version (default: 5)")
+ parser.add_argument("--json", action="store_true", help="Emit JSON output")
+ parser.add_argument("--compact-json", action="store_true", help="Compact JSON (no indent)")
+ args = parser.parse_args()
+
+ check_halt()
+
+ if not args.message_file.is_file():
+ err(f"message file not found: {args.message_file}")
+ return EXIT_FOR_DECISION[DEC_REJECT]
+
+ fm = parse_frontmatter(args.message_file)
+ if "_parse_error" in fm:
+ return emit_decision(DEC_REJECT, fm["_parse_error"], {}, None, args)
+
+ # Step 1: frontmatter sanity-check.
+ missing = [k for k in REQUIRED_FRONTMATTER if k not in fm]
+ if missing:
+ return emit_decision(
+ DEC_REJECT, f"frontmatter missing required fields: {', '.join(missing)}", fm, None, args
+ )
+ if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES:
+ return emit_decision(
+ DEC_REJECT, f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}", fm, None, args
+ )
+
+ # Step 2: PROTOCOL_VERSION check.
+ if fm["PROTOCOL_VERSION"] != args.protocol_version:
+ return emit_decision(
+ DEC_QUERY,
+ f"PROTOCOL_VERSION mismatch: expected {args.protocol_version}, got {fm['PROTOCOL_VERSION']}",
+ fm,
+ None,
+ args,
+ )
+
+ # Step 3: GPG verify.
+ if not args.no_verify:
+ sig_path = args.message_file.with_suffix(args.message_file.suffix + ".asc")
+ if not sig_path.is_file():
+ return emit_decision(DEC_REJECT, f"signature file missing: {sig_path.name}", fm, None, args)
+ ok, gpg_err = gpg_verify(args.message_file, sig_path)
+ if not ok:
+ return emit_decision(DEC_REJECT, f"gpg verify failed: {gpg_err}", fm, None, args)
+
+ # Step 4: SHA-256 dedup.
+ my_hash = sha256_of(args.message_file)
+ if not args.no_dedup:
+ decision, reason = find_dedup_match(args.message_file, fm, my_hash)
+ if decision == DEC_DEDUP:
+ return emit_decision(DEC_DEDUP, reason, fm, my_hash, args)
+
+ # Step 5: REQUIRES_TOOLS check.
+ ok, missing_tools = check_requires_tools(fm)
+ if not ok:
+ return emit_decision(
+ DEC_QUERY,
+ f"required tools unavailable: {', '.join(missing_tools)}",
+ fm,
+ my_hash,
+ args,
+ )
+
+ # Step 6: process.
+ return emit_decision(DEC_PROCESS, None, fm, my_hash, args)
+
+
+if __name__ == "__main__":
+ sys.exit(main())