#!/usr/bin/env python3 """Cross-agent message receiver. See cross-agent-recv.md for the full contract. Reads one message file and emits a structured decision the agent acts on: process | dedup | query | reject Decision exit codes: 0 = process 1 = dedup 2 = query 3 = reject When HALT is set, the script refuses to verify or dedup and leaves the inbound file in place — resume picks it up via cold-start. """ from __future__ import annotations import argparse import hashlib import json import re import shutil import subprocess import sys from pathlib import Path CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" HALT_FILE = CONFIG_DIR / "HALT" EXPECTED_PROTOCOL_VERSION = "5" REQUIRED_FRONTMATTER = ["TITLE", "CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"] VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"} DEC_PROCESS = "process" DEC_DEDUP = "dedup" DEC_QUERY = "query" DEC_REJECT = "reject" EXIT_FOR_DECISION = { DEC_PROCESS: 0, DEC_DEDUP: 1, DEC_QUERY: 2, DEC_REJECT: 3, } EXIT_HALT = 5 def err(msg: str) -> None: print(msg, file=sys.stderr) def check_halt() -> None: if HALT_FILE.exists(): try: reason = HALT_FILE.read_text().strip() except OSError: err("halt active (HALT file present but unreadable; treated as halted)") sys.exit(EXIT_HALT) msg = "halt active; leaving inbound message in place (resume will pick up)" if reason: msg = f"{msg}: {reason}" err(msg) sys.exit(EXIT_HALT) def parse_frontmatter(path: Path) -> dict[str, str]: try: text = path.read_text() except OSError as e: return {"_parse_error": f"cannot read: {e}"} fm: dict[str, str] = {} for line in text.splitlines(): line = line.rstrip() if not line: if fm: break continue m = re.match(r"#\+([A-Z_]+):\s*(.*)", line) if m: fm[m.group(1)] = m.group(2).strip() elif fm: break return fm def emit_decision( decision: str, reason: str | None, fm: dict[str, str], sha256: str | None, args: argparse.Namespace, ) -> int: payload = { "decision": decision, "reason": reason, "message_type": fm.get("MESSAGE_TYPE"), "conversation_id": fm.get("CONVERSATION_ID"), "sequence": fm.get("SEQUENCE"), "timestamp": fm.get("TIMESTAMP"), "sha256": sha256, } if args.json: print(json.dumps(payload, indent=None if args.compact_json else 2)) else: print(f"decision: {decision}") if reason: print(f"reason: {reason}") for k in ("message_type", "conversation_id", "sequence", "timestamp"): v = payload[k] if v is not None: print(f"{k}: {v}") if sha256: print(f"sha256: {sha256}") return EXIT_FOR_DECISION[decision] def gpg_verify(message_path: Path, sig_path: Path) -> tuple[bool, str]: try: result = subprocess.run( ["gpg", "--verify", str(sig_path), str(message_path)], capture_output=True, text=True, ) except FileNotFoundError: return False, "gpg not installed" if result.returncode == 0: return True, "" return False, result.stderr.strip().splitlines()[-1] if result.stderr.strip() else f"exit {result.returncode}" def sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest() def find_dedup_match(message_path: Path, fm: dict[str, str], my_hash: str) -> tuple[str, str | None]: """Scan the message's directory for same-CONVERSATION_ID/SEQUENCE files. Returns (decision, reason) — decision is DEC_DEDUP for an exact-hash match, or DEC_PROCESS when no match or hash differs (sequence collision is OK). """ parent = message_path.parent conv_id = fm["CONVERSATION_ID"] sequence = fm["SEQUENCE"] for sibling in parent.iterdir(): if sibling == message_path or not sibling.is_file() or sibling.suffix != ".org": continue sib_fm = parse_frontmatter(sibling) if sib_fm.get("CONVERSATION_ID") != conv_id or sib_fm.get("SEQUENCE") != sequence: continue # Same conv-id + same sequence — check hash. if sha256_of(sibling) == my_hash: return DEC_DEDUP, f"identical retry of {sibling.name}" return DEC_PROCESS, None def check_requires_tools(fm: dict[str, str]) -> tuple[bool, list[str]]: """REQUIRES_TOOLS is a comma-separated list of tool names. For v5, "tool available" is a heuristic: an executable on PATH whose name matches the tool slug. MCP availability is currently out of scope (no portable way to query it from a CLI). """ tools_field = fm.get("REQUIRES_TOOLS") if not tools_field: return True, [] tools = [t.strip() for t in tools_field.split(",") if t.strip()] missing = [t for t in tools if shutil.which(t) is None] return len(missing) == 0, missing def main() -> int: parser = argparse.ArgumentParser(description="Receive and decide on a cross-agent message.") parser.add_argument("message_file", type=Path) parser.add_argument("--no-verify", action="store_true", help="Skip GPG verification (testing only)") parser.add_argument("--no-dedup", action="store_true", help="Skip SHA-256 dedup against existing files") parser.add_argument("--protocol-version", default=EXPECTED_PROTOCOL_VERSION, help="Override expected protocol version (default: 5)") parser.add_argument("--json", action="store_true", help="Emit JSON output") parser.add_argument("--compact-json", action="store_true", help="Compact JSON (no indent)") args = parser.parse_args() check_halt() if not args.message_file.is_file(): err(f"message file not found: {args.message_file}") return EXIT_FOR_DECISION[DEC_REJECT] fm = parse_frontmatter(args.message_file) if "_parse_error" in fm: return emit_decision(DEC_REJECT, fm["_parse_error"], {}, None, args) # Step 1: frontmatter sanity-check. missing = [k for k in REQUIRED_FRONTMATTER if k not in fm] if missing: return emit_decision( DEC_REJECT, f"frontmatter missing required fields: {', '.join(missing)}", fm, None, args ) if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES: return emit_decision( DEC_REJECT, f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}", fm, None, args ) # Step 2: PROTOCOL_VERSION check. if fm["PROTOCOL_VERSION"] != args.protocol_version: return emit_decision( DEC_QUERY, f"PROTOCOL_VERSION mismatch: expected {args.protocol_version}, got {fm['PROTOCOL_VERSION']}", fm, None, args, ) # Step 3: GPG verify. if not args.no_verify: sig_path = args.message_file.with_suffix(args.message_file.suffix + ".asc") if not sig_path.is_file(): return emit_decision(DEC_REJECT, f"signature file missing: {sig_path.name}", fm, None, args) ok, gpg_err = gpg_verify(args.message_file, sig_path) if not ok: return emit_decision(DEC_REJECT, f"gpg verify failed: {gpg_err}", fm, None, args) # Step 4: SHA-256 dedup. my_hash = sha256_of(args.message_file) if not args.no_dedup: decision, reason = find_dedup_match(args.message_file, fm, my_hash) if decision == DEC_DEDUP: return emit_decision(DEC_DEDUP, reason, fm, my_hash, args) # Step 5: REQUIRES_TOOLS check. ok, missing_tools = check_requires_tools(fm) if not ok: return emit_decision( DEC_QUERY, f"required tools unavailable: {', '.join(missing_tools)}", fm, my_hash, args, ) # Step 6: process. return emit_decision(DEC_PROCESS, None, fm, my_hash, args) if __name__ == "__main__": sys.exit(main())