aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cross-agent-comms/cross-agent-send
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-06 21:59:52 -0500
committerCraig Jennings <c@cjennings.net>2026-05-06 21:59:52 -0500
commitd81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch)
tree2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/cross-agent-comms/cross-agent-send
parent201377f57430ef28d02e703a2191434bbee55c75 (diff)
downloadrulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz
rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-send')
-rwxr-xr-x.ai/scripts/cross-agent-comms/cross-agent-send356
1 files changed, 356 insertions, 0 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-send b/.ai/scripts/cross-agent-comms/cross-agent-send
new file mode 100755
index 0000000..68c010a
--- /dev/null
+++ b/.ai/scripts/cross-agent-comms/cross-agent-send
@@ -0,0 +1,356 @@
+#!/usr/bin/env python3
+"""Cross-agent message sender.
+
+See cross-agent-send.md for the full contract. Briefly:
+
+- Destination as <machine>.<project>; resolved via peers.toml.
+- Same-machine: cp to receiver's inbox/from-agents/ with atomic rename.
+- Cross-machine: rsync over SSH (typically Tailscale) with retry+backoff.
+- GPG-signs by default; .asc renames before .org so receivers never see
+ a .org without its sibling signature.
+- Generates the canonical filename; user's input filename is ignored.
+- Honors the HALT file: refuses to send and exits with code 5 when set.
+"""
+
+from __future__ import annotations
+
+import argparse
+import datetime as _dt
+import json
+import os
+import re
+import shutil
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+import tomllib
+from pathlib import Path
+
+CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms"
+PEERS_TOML = CONFIG_DIR / "peers.toml"
+HALT_FILE = CONFIG_DIR / "HALT"
+STATE_DIR = Path.home() / ".local" / "state" / "cross-agent-comms"
+FAILED_SENDS_DIR = STATE_DIR / "failed-sends"
+
+EXIT_OK = 0
+EXIT_GENERAL = 1
+EXIT_DEST_NOT_FOUND = 2
+EXIT_CROSS_MACHINE_FAILED = 3
+EXIT_FRONTMATTER = 4
+EXIT_HALT = 5
+
+REQUIRED_FRONTMATTER = ["CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"]
+VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"}
+
+
+def err(msg: str) -> None:
+ print(msg, file=sys.stderr)
+
+
+def check_halt() -> None:
+ """Exit with code 5 if HALT file exists."""
+ if HALT_FILE.exists():
+ try:
+ reason = HALT_FILE.read_text().strip()
+ except OSError:
+ # Fail-closed on unreadable HALT.
+ err("halt active (HALT file present but unreadable; treated as halted)")
+ err(f"remove {HALT_FILE} to resume")
+ sys.exit(EXIT_HALT)
+ msg = "halt active"
+ if reason:
+ msg += f": {reason}"
+ err(msg)
+ err(f"remove {HALT_FILE} to resume")
+ sys.exit(EXIT_HALT)
+
+
+def parse_frontmatter(path: Path) -> dict[str, str]:
+ """Extract org-mode #+KEY: value frontmatter from the top of the file."""
+ try:
+ text = path.read_text()
+ except OSError as e:
+ err(f"cannot read message file: {e}")
+ sys.exit(EXIT_GENERAL)
+
+ frontmatter: dict[str, str] = {}
+ for line in text.splitlines():
+ line = line.rstrip()
+ if not line:
+ # Blank line ends the frontmatter block.
+ if frontmatter:
+ break
+ continue
+ m = re.match(r"#\+([A-Z_]+):\s*(.*)", line)
+ if m:
+ frontmatter[m.group(1)] = m.group(2).strip()
+ else:
+ # First non-frontmatter line ends parsing.
+ if frontmatter:
+ break
+ return frontmatter
+
+
+def validate_frontmatter(fm: dict[str, str]) -> None:
+ missing = [k for k in REQUIRED_FRONTMATTER if k not in fm]
+ if missing:
+ err(f"frontmatter missing required fields: {', '.join(missing)}")
+ sys.exit(EXIT_FRONTMATTER)
+ if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES:
+ err(f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}; expected one of {sorted(VALID_MESSAGE_TYPES)}")
+ sys.exit(EXIT_FRONTMATTER)
+ try:
+ int(fm["SEQUENCE"])
+ except ValueError:
+ err(f"SEQUENCE must be an integer; got {fm['SEQUENCE']!r}")
+ sys.exit(EXIT_FRONTMATTER)
+
+
+def load_peers() -> dict:
+ if not PEERS_TOML.exists():
+ return {}
+ try:
+ return tomllib.loads(PEERS_TOML.read_text())
+ except (tomllib.TOMLDecodeError, OSError) as e:
+ err(f"cannot read {PEERS_TOML}: {e}")
+ sys.exit(EXIT_GENERAL)
+
+
+def resolve_destination(dest: str, peers: dict) -> tuple[str, str, str | None, str | None]:
+ """Resolve <machine>.<project> to (machine, project, host, ssh_user).
+
+ host is None for same-machine destinations.
+ """
+ if "." not in dest:
+ err(f"destination must be <machine>.<project>; got {dest!r}")
+ sys.exit(EXIT_DEST_NOT_FOUND)
+ machine, project = dest.split(".", 1)
+
+ local_hostname = socket.gethostname().split(".")[0]
+ is_local = machine == local_hostname or machine == "local"
+
+ host = None
+ ssh_user = None
+ if not is_local:
+ peer_cfg = peers.get("peers", {}).get(machine)
+ if peer_cfg is None:
+ available = list(peers.get("peers", {}).keys())
+ err(f"destination not found in peers.toml; available peers: {available or '(none)'}")
+ sys.exit(EXIT_DEST_NOT_FOUND)
+ host = peer_cfg.get("host", machine)
+ ssh_user = peer_cfg.get("ssh_user", os.environ.get("USER"))
+
+ return machine, project, host, ssh_user
+
+
+def resolve_inbox_path(project: str, peers: dict) -> str:
+ """Inbox path on the receiver. Defaults to ~/projects/<project>/inbox/from-agents."""
+ proj_cfg = peers.get("projects", {}).get(project)
+ if proj_cfg and "inbox_path" in proj_cfg:
+ return os.path.expanduser(proj_cfg["inbox_path"])
+ return f"~/projects/{project}/inbox/from-agents"
+
+
+def derive_sender_project() -> str:
+ """Walk up from CWD looking for ~/projects/<name>/.
+
+ Returns the project name if found; falls back to the basename of CWD.
+ """
+ cwd = Path.cwd().resolve()
+ projects_root = (Path.home() / "projects").resolve()
+ try:
+ rel = cwd.relative_to(projects_root)
+ return rel.parts[0]
+ except ValueError:
+ return cwd.name
+
+
+def generate_canonical_filename(sender: str, conv_id: str) -> str:
+ """YYYYMMDDTHHMMSSZ-from-<sender>-<conv-id>.org"""
+ now = _dt.datetime.now(_dt.timezone.utc)
+ timestamp = now.strftime("%Y%m%dT%H%M%SZ")
+ return f"{timestamp}-from-{sender}-{conv_id}.org"
+
+
+def sign(message_path: Path, sig_path: Path, key: str | None) -> None:
+ """gpg --detach-sign --armor --output <sig> [--local-user <key>] <message>"""
+ cmd = ["gpg", "--detach-sign", "--armor", "--yes", "--output", str(sig_path)]
+ if key:
+ cmd.extend(["--local-user", key])
+ cmd.append(str(message_path))
+ try:
+ result = subprocess.run(cmd, capture_output=True, text=True)
+ except FileNotFoundError:
+ err("gpg not found; install gnupg or use --no-sign for testing")
+ sys.exit(EXIT_GENERAL)
+ if result.returncode != 0:
+ err(f"signing failed: {result.stderr.strip()}")
+ sys.exit(EXIT_GENERAL)
+
+
+def same_machine_deliver(message_path: Path, sig_path: Path | None, target_dir: Path, canonical_name: str) -> None:
+ """Atomic-write delivery: stage .asc, mv to final, then stage .org, mv to final."""
+ target_dir.mkdir(parents=True, exist_ok=True)
+ final_msg = target_dir / canonical_name
+ final_sig = target_dir / f"{canonical_name}.asc"
+
+ if sig_path is not None:
+ # Stage .asc first, mv to final, THEN stage .org and mv to final.
+ with tempfile.NamedTemporaryFile(
+ mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.asc.", delete=False
+ ) as tmp:
+ tmp.write(sig_path.read_bytes())
+ tmp_sig_path = Path(tmp.name)
+ os.replace(tmp_sig_path, final_sig)
+
+ # Re-check HALT between .asc and .org per the layered-checks rule.
+ check_halt()
+
+ with tempfile.NamedTemporaryFile(
+ mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.", delete=False
+ ) as tmp:
+ tmp.write(message_path.read_bytes())
+ tmp_msg_path = Path(tmp.name)
+ os.replace(tmp_msg_path, final_msg)
+
+
+def cross_machine_deliver(
+ message_path: Path,
+ sig_path: Path | None,
+ canonical_name: str,
+ host: str,
+ ssh_user: str,
+ inbox_path: str,
+ retries: int,
+) -> bool:
+ """rsync push the .asc first (if signed), re-check HALT, then push the .org.
+
+ Returns True on success, False on persistent failure (after retries).
+ """
+ # Stage local copies with the canonical name so rsync sets the right
+ # destination filename.
+ with tempfile.TemporaryDirectory(prefix="cross-agent-send-") as staging:
+ staging_dir = Path(staging)
+ local_msg = staging_dir / canonical_name
+ local_msg.write_bytes(message_path.read_bytes())
+ local_sig = None
+ if sig_path is not None:
+ local_sig = staging_dir / f"{canonical_name}.asc"
+ local_sig.write_bytes(sig_path.read_bytes())
+
+ backoffs = [5, 30, 120]
+ # Step 1: push .asc first if signed.
+ if local_sig is not None:
+ if not _rsync_with_retries(local_sig, host, ssh_user, inbox_path, retries, backoffs):
+ return False
+
+ # Re-check HALT between .asc and .org per the layered-checks rule.
+ check_halt()
+
+ # Step 2: push .org.
+ if not _rsync_with_retries(local_msg, host, ssh_user, inbox_path, retries, backoffs):
+ return False
+
+ return True
+
+
+def _rsync_with_retries(
+ src: Path, host: str, ssh_user: str, inbox_path: str, retries: int, backoffs: list[int]
+) -> bool:
+ target = f"{ssh_user}@{host}:{inbox_path}/"
+ last_err = ""
+ for attempt in range(retries + 1):
+ if attempt > 0:
+ check_halt()
+ wait = backoffs[min(attempt - 1, len(backoffs) - 1)]
+ err(f"rsync attempt {attempt} failed: {last_err}; retrying in {wait}s")
+ time.sleep(wait)
+ try:
+ result = subprocess.run(
+ ["rsync", "-a", str(src), target],
+ capture_output=True,
+ text=True,
+ )
+ except FileNotFoundError:
+ err("rsync not found; install rsync")
+ return False
+ if result.returncode == 0:
+ return True
+ last_err = result.stderr.strip() or f"exit {result.returncode}"
+ err(f"rsync failed after {retries + 1} attempts: {last_err}")
+ return False
+
+
+def write_failed_send_marker(dest: str, message_path: Path, error: str, retry_log: list[str]) -> None:
+ FAILED_SENDS_DIR.mkdir(parents=True, exist_ok=True)
+ timestamp = _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
+ safe_basename = re.sub(r"[^A-Za-z0-9._-]", "_", message_path.name)
+ marker = FAILED_SENDS_DIR / f"{timestamp}-{dest.replace('.', '-')}-{safe_basename}.json"
+ marker.write_text(json.dumps(
+ {
+ "timestamp": timestamp,
+ "destination": dest,
+ "message_path": str(message_path),
+ "error": error,
+ "retry_log": retry_log,
+ },
+ indent=2,
+ ))
+ err(f"marker written: {marker}")
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Send a cross-agent message.")
+ parser.add_argument("destination", help="Destination as <machine>.<project>")
+ parser.add_argument("message_file", type=Path, help="Path to the message body file")
+ parser.add_argument("--no-sign", action="store_true", help="Skip GPG signing (testing only)")
+ parser.add_argument("--retries", type=int, default=3, help="Retry count for cross-machine sends")
+ parser.add_argument("--key", help="GPG key id to sign with (default: user's primary)")
+ args = parser.parse_args()
+
+ check_halt()
+
+ if not args.message_file.is_file():
+ err(f"message file not found: {args.message_file}")
+ return EXIT_GENERAL
+
+ fm = parse_frontmatter(args.message_file)
+ validate_frontmatter(fm)
+
+ peers = load_peers()
+ machine, project, host, ssh_user = resolve_destination(args.destination, peers)
+ inbox_path = resolve_inbox_path(project, peers)
+
+ sender = derive_sender_project()
+ canonical_name = generate_canonical_filename(sender, fm["CONVERSATION_ID"])
+
+ sig_tmp = None
+ if not args.no_sign:
+ sig_tmp = args.message_file.with_suffix(args.message_file.suffix + ".asc.tmp")
+ sign(args.message_file, sig_tmp, args.key)
+
+ try:
+ if host is None:
+ # Same-machine delivery.
+ target_dir = Path(os.path.expanduser(inbox_path))
+ same_machine_deliver(args.message_file, sig_tmp, target_dir, canonical_name)
+ print(f"sent: {target_dir}/{canonical_name}")
+ return EXIT_OK
+ else:
+ ok = cross_machine_deliver(
+ args.message_file, sig_tmp, canonical_name, host, ssh_user, inbox_path, args.retries
+ )
+ if ok:
+ print(f"sent: {ssh_user}@{host}:{inbox_path}/{canonical_name}")
+ return EXIT_OK
+ write_failed_send_marker(args.destination, args.message_file, "rsync failed after retries", [])
+ return EXIT_CROSS_MACHINE_FAILED
+ finally:
+ if sig_tmp is not None and sig_tmp.exists():
+ sig_tmp.unlink()
+
+
+if __name__ == "__main__":
+ sys.exit(main())