aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cross-agent-comms/cross-agent-recv
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-06-16 23:40:42 -0500
committerCraig Jennings <c@cjennings.net>2026-06-16 23:40:42 -0500
commite1933fe685a3e15d001552537df90e33ba00b83a (patch)
tree4b6435152b6605a96c0cc7a3f3b1dcadd4fc6a02 /.ai/scripts/cross-agent-comms/cross-agent-recv
parent4e2db8f20a259d43d2198b120ac32660298d0d63 (diff)
downloadrulesets-e1933fe685a3e15d001552537df90e33ba00b83a.tar.gz
rulesets-e1933fe685a3e15d001552537df90e33ba00b83a.zip
refactor: remove unused cross-agent-comms subsystem
Nothing used the cross-agent message system (send/recv/watch/status/discover/halt/resume over the inbox/from-agents/ file-IPC protocol). Every cross-project handoff goes through inbox-send instead. I removed the scripts, READMEs, workflow, tests, INDEX entry, the three startup.org wirings, and the legacy bin symlinks, then repointed helper-mode's escalation to inbox-send and noted the removal in the generic-agent-runtime spec.
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-recv')
-rwxr-xr-x.ai/scripts/cross-agent-comms/cross-agent-recv250
1 files changed, 0 insertions, 250 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-recv b/.ai/scripts/cross-agent-comms/cross-agent-recv
deleted file mode 100755
index b67533a..0000000
--- a/.ai/scripts/cross-agent-comms/cross-agent-recv
+++ /dev/null
@@ -1,250 +0,0 @@
-#!/usr/bin/env python3
-"""Cross-agent message receiver.
-
-See cross-agent-recv.md for the full contract. Reads one message file and
-emits a structured decision the agent acts on:
-
- process | dedup | query | reject
-
-Decision exit codes:
- 0 = process 1 = dedup 2 = query 3 = reject
-
-When HALT is set, the script refuses to verify or dedup and leaves the
-inbound file in place — resume picks it up via cold-start.
-"""
-
-from __future__ import annotations
-
-import argparse
-import hashlib
-import json
-import re
-import shutil
-import subprocess
-import sys
-from pathlib import Path
-
-CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms"
-HALT_FILE = CONFIG_DIR / "HALT"
-EXPECTED_PROTOCOL_VERSION = "5"
-
-REQUIRED_FRONTMATTER = ["TITLE", "CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"]
-VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"}
-
-DEC_PROCESS = "process"
-DEC_DEDUP = "dedup"
-DEC_QUERY = "query"
-DEC_REJECT = "reject"
-
-EXIT_FOR_DECISION = {
- DEC_PROCESS: 0,
- DEC_DEDUP: 1,
- DEC_QUERY: 2,
- DEC_REJECT: 3,
-}
-
-EXIT_HALT = 5
-
-
-def err(msg: str) -> None:
- print(msg, file=sys.stderr)
-
-
-def check_halt() -> None:
- if HALT_FILE.exists():
- try:
- reason = HALT_FILE.read_text().strip()
- except OSError:
- err("halt active (HALT file present but unreadable; treated as halted)")
- sys.exit(EXIT_HALT)
- msg = "halt active; leaving inbound message in place (resume will pick up)"
- if reason:
- msg = f"{msg}: {reason}"
- err(msg)
- sys.exit(EXIT_HALT)
-
-
-def parse_frontmatter(path: Path) -> dict[str, str]:
- try:
- text = path.read_text()
- except OSError as e:
- return {"_parse_error": f"cannot read: {e}"}
- fm: dict[str, str] = {}
- for line in text.splitlines():
- line = line.rstrip()
- if not line:
- if fm:
- break
- continue
- m = re.match(r"#\+([A-Z_]+):\s*(.*)", line)
- if m:
- fm[m.group(1)] = m.group(2).strip()
- elif fm:
- break
- return fm
-
-
-def emit_decision(
- decision: str,
- reason: str | None,
- fm: dict[str, str],
- sha256: str | None,
- args: argparse.Namespace,
-) -> int:
- payload = {
- "decision": decision,
- "reason": reason,
- "message_type": fm.get("MESSAGE_TYPE"),
- "conversation_id": fm.get("CONVERSATION_ID"),
- "sequence": fm.get("SEQUENCE"),
- "timestamp": fm.get("TIMESTAMP"),
- "sha256": sha256,
- }
- if args.json:
- print(json.dumps(payload, indent=None if args.compact_json else 2))
- else:
- print(f"decision: {decision}")
- if reason:
- print(f"reason: {reason}")
- for k in ("message_type", "conversation_id", "sequence", "timestamp"):
- v = payload[k]
- if v is not None:
- print(f"{k}: {v}")
- if sha256:
- print(f"sha256: {sha256}")
- return EXIT_FOR_DECISION[decision]
-
-
-def gpg_verify(message_path: Path, sig_path: Path) -> tuple[bool, str]:
- try:
- result = subprocess.run(
- ["gpg", "--verify", str(sig_path), str(message_path)],
- capture_output=True,
- text=True,
- )
- except FileNotFoundError:
- return False, "gpg not installed"
- if result.returncode == 0:
- return True, ""
- return False, result.stderr.strip().splitlines()[-1] if result.stderr.strip() else f"exit {result.returncode}"
-
-
-def sha256_of(path: Path) -> str:
- h = hashlib.sha256()
- with path.open("rb") as f:
- for chunk in iter(lambda: f.read(65536), b""):
- h.update(chunk)
- return h.hexdigest()
-
-
-def find_dedup_match(message_path: Path, fm: dict[str, str], my_hash: str) -> tuple[str, str | None]:
- """Scan the message's directory for same-CONVERSATION_ID/SEQUENCE files.
-
- Returns (decision, reason) — decision is DEC_DEDUP for an exact-hash match,
- or DEC_PROCESS when no match or hash differs (sequence collision is OK).
- """
- parent = message_path.parent
- conv_id = fm["CONVERSATION_ID"]
- sequence = fm["SEQUENCE"]
- for sibling in parent.iterdir():
- if sibling == message_path or not sibling.is_file() or sibling.suffix != ".org":
- continue
- sib_fm = parse_frontmatter(sibling)
- if sib_fm.get("CONVERSATION_ID") != conv_id or sib_fm.get("SEQUENCE") != sequence:
- continue
- # Same conv-id + same sequence — check hash.
- if sha256_of(sibling) == my_hash:
- return DEC_DEDUP, f"identical retry of {sibling.name}"
- return DEC_PROCESS, None
-
-
-def check_requires_tools(fm: dict[str, str]) -> tuple[bool, list[str]]:
- """REQUIRES_TOOLS is a comma-separated list of tool names.
-
- For v5, "tool available" is a heuristic: an executable on PATH whose name
- matches the tool slug. MCP availability is currently out of scope (no
- portable way to query it from a CLI).
- """
- tools_field = fm.get("REQUIRES_TOOLS")
- if not tools_field:
- return True, []
- tools = [t.strip() for t in tools_field.split(",") if t.strip()]
- missing = [t for t in tools if shutil.which(t) is None]
- return len(missing) == 0, missing
-
-
-def main() -> int:
- parser = argparse.ArgumentParser(description="Receive and decide on a cross-agent message.")
- parser.add_argument("message_file", type=Path)
- parser.add_argument("--no-verify", action="store_true", help="Skip GPG verification (testing only)")
- parser.add_argument("--no-dedup", action="store_true", help="Skip SHA-256 dedup against existing files")
- parser.add_argument("--protocol-version", default=EXPECTED_PROTOCOL_VERSION,
- help="Override expected protocol version (default: 5)")
- parser.add_argument("--json", action="store_true", help="Emit JSON output")
- parser.add_argument("--compact-json", action="store_true", help="Compact JSON (no indent)")
- args = parser.parse_args()
-
- check_halt()
-
- if not args.message_file.is_file():
- err(f"message file not found: {args.message_file}")
- return EXIT_FOR_DECISION[DEC_REJECT]
-
- fm = parse_frontmatter(args.message_file)
- if "_parse_error" in fm:
- return emit_decision(DEC_REJECT, fm["_parse_error"], {}, None, args)
-
- # Step 1: frontmatter sanity-check.
- missing = [k for k in REQUIRED_FRONTMATTER if k not in fm]
- if missing:
- return emit_decision(
- DEC_REJECT, f"frontmatter missing required fields: {', '.join(missing)}", fm, None, args
- )
- if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES:
- return emit_decision(
- DEC_REJECT, f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}", fm, None, args
- )
-
- # Step 2: PROTOCOL_VERSION check.
- if fm["PROTOCOL_VERSION"] != args.protocol_version:
- return emit_decision(
- DEC_QUERY,
- f"PROTOCOL_VERSION mismatch: expected {args.protocol_version}, got {fm['PROTOCOL_VERSION']}",
- fm,
- None,
- args,
- )
-
- # Step 3: GPG verify.
- if not args.no_verify:
- sig_path = args.message_file.with_suffix(args.message_file.suffix + ".asc")
- if not sig_path.is_file():
- return emit_decision(DEC_REJECT, f"signature file missing: {sig_path.name}", fm, None, args)
- ok, gpg_err = gpg_verify(args.message_file, sig_path)
- if not ok:
- return emit_decision(DEC_REJECT, f"gpg verify failed: {gpg_err}", fm, None, args)
-
- # Step 4: SHA-256 dedup.
- my_hash = sha256_of(args.message_file)
- if not args.no_dedup:
- decision, reason = find_dedup_match(args.message_file, fm, my_hash)
- if decision == DEC_DEDUP:
- return emit_decision(DEC_DEDUP, reason, fm, my_hash, args)
-
- # Step 5: REQUIRES_TOOLS check.
- ok, missing_tools = check_requires_tools(fm)
- if not ok:
- return emit_decision(
- DEC_QUERY,
- f"required tools unavailable: {', '.join(missing_tools)}",
- fm,
- my_hash,
- args,
- )
-
- # Step 6: process.
- return emit_decision(DEC_PROCESS, None, fm, my_hash, args)
-
-
-if __name__ == "__main__":
- sys.exit(main())