diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
| commit | d81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch) | |
| tree | 2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/cross-agent-comms/cross-agent-send | |
| parent | 201377f57430ef28d02e703a2191434bbee55c75 (diff) | |
| download | rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip | |
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-send')
| -rwxr-xr-x | .ai/scripts/cross-agent-comms/cross-agent-send | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-send b/.ai/scripts/cross-agent-comms/cross-agent-send new file mode 100755 index 0000000..68c010a --- /dev/null +++ b/.ai/scripts/cross-agent-comms/cross-agent-send @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +"""Cross-agent message sender. + +See cross-agent-send.md for the full contract. Briefly: + +- Destination as <machine>.<project>; resolved via peers.toml. +- Same-machine: cp to receiver's inbox/from-agents/ with atomic rename. +- Cross-machine: rsync over SSH (typically Tailscale) with retry+backoff. +- GPG-signs by default; .asc renames before .org so receivers never see + a .org without its sibling signature. +- Generates the canonical filename; user's input filename is ignored. +- Honors the HALT file: refuses to send and exits with code 5 when set. +""" + +from __future__ import annotations + +import argparse +import datetime as _dt +import json +import os +import re +import shutil +import socket +import subprocess +import sys +import tempfile +import time +import tomllib +from pathlib import Path + +CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" +PEERS_TOML = CONFIG_DIR / "peers.toml" +HALT_FILE = CONFIG_DIR / "HALT" +STATE_DIR = Path.home() / ".local" / "state" / "cross-agent-comms" +FAILED_SENDS_DIR = STATE_DIR / "failed-sends" + +EXIT_OK = 0 +EXIT_GENERAL = 1 +EXIT_DEST_NOT_FOUND = 2 +EXIT_CROSS_MACHINE_FAILED = 3 +EXIT_FRONTMATTER = 4 +EXIT_HALT = 5 + +REQUIRED_FRONTMATTER = ["CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"] +VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"} + + +def err(msg: str) -> None: + print(msg, file=sys.stderr) + + +def check_halt() -> None: + """Exit with code 5 if HALT file exists.""" + if HALT_FILE.exists(): + try: + reason = HALT_FILE.read_text().strip() + except OSError: + # Fail-closed on unreadable HALT. + err("halt active (HALT file present but unreadable; treated as halted)") + err(f"remove {HALT_FILE} to resume") + sys.exit(EXIT_HALT) + msg = "halt active" + if reason: + msg += f": {reason}" + err(msg) + err(f"remove {HALT_FILE} to resume") + sys.exit(EXIT_HALT) + + +def parse_frontmatter(path: Path) -> dict[str, str]: + """Extract org-mode #+KEY: value frontmatter from the top of the file.""" + try: + text = path.read_text() + except OSError as e: + err(f"cannot read message file: {e}") + sys.exit(EXIT_GENERAL) + + frontmatter: dict[str, str] = {} + for line in text.splitlines(): + line = line.rstrip() + if not line: + # Blank line ends the frontmatter block. + if frontmatter: + break + continue + m = re.match(r"#\+([A-Z_]+):\s*(.*)", line) + if m: + frontmatter[m.group(1)] = m.group(2).strip() + else: + # First non-frontmatter line ends parsing. + if frontmatter: + break + return frontmatter + + +def validate_frontmatter(fm: dict[str, str]) -> None: + missing = [k for k in REQUIRED_FRONTMATTER if k not in fm] + if missing: + err(f"frontmatter missing required fields: {', '.join(missing)}") + sys.exit(EXIT_FRONTMATTER) + if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES: + err(f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}; expected one of {sorted(VALID_MESSAGE_TYPES)}") + sys.exit(EXIT_FRONTMATTER) + try: + int(fm["SEQUENCE"]) + except ValueError: + err(f"SEQUENCE must be an integer; got {fm['SEQUENCE']!r}") + sys.exit(EXIT_FRONTMATTER) + + +def load_peers() -> dict: + if not PEERS_TOML.exists(): + return {} + try: + return tomllib.loads(PEERS_TOML.read_text()) + except (tomllib.TOMLDecodeError, OSError) as e: + err(f"cannot read {PEERS_TOML}: {e}") + sys.exit(EXIT_GENERAL) + + +def resolve_destination(dest: str, peers: dict) -> tuple[str, str, str | None, str | None]: + """Resolve <machine>.<project> to (machine, project, host, ssh_user). + + host is None for same-machine destinations. + """ + if "." not in dest: + err(f"destination must be <machine>.<project>; got {dest!r}") + sys.exit(EXIT_DEST_NOT_FOUND) + machine, project = dest.split(".", 1) + + local_hostname = socket.gethostname().split(".")[0] + is_local = machine == local_hostname or machine == "local" + + host = None + ssh_user = None + if not is_local: + peer_cfg = peers.get("peers", {}).get(machine) + if peer_cfg is None: + available = list(peers.get("peers", {}).keys()) + err(f"destination not found in peers.toml; available peers: {available or '(none)'}") + sys.exit(EXIT_DEST_NOT_FOUND) + host = peer_cfg.get("host", machine) + ssh_user = peer_cfg.get("ssh_user", os.environ.get("USER")) + + return machine, project, host, ssh_user + + +def resolve_inbox_path(project: str, peers: dict) -> str: + """Inbox path on the receiver. Defaults to ~/projects/<project>/inbox/from-agents.""" + proj_cfg = peers.get("projects", {}).get(project) + if proj_cfg and "inbox_path" in proj_cfg: + return os.path.expanduser(proj_cfg["inbox_path"]) + return f"~/projects/{project}/inbox/from-agents" + + +def derive_sender_project() -> str: + """Walk up from CWD looking for ~/projects/<name>/. + + Returns the project name if found; falls back to the basename of CWD. + """ + cwd = Path.cwd().resolve() + projects_root = (Path.home() / "projects").resolve() + try: + rel = cwd.relative_to(projects_root) + return rel.parts[0] + except ValueError: + return cwd.name + + +def generate_canonical_filename(sender: str, conv_id: str) -> str: + """YYYYMMDDTHHMMSSZ-from-<sender>-<conv-id>.org""" + now = _dt.datetime.now(_dt.timezone.utc) + timestamp = now.strftime("%Y%m%dT%H%M%SZ") + return f"{timestamp}-from-{sender}-{conv_id}.org" + + +def sign(message_path: Path, sig_path: Path, key: str | None) -> None: + """gpg --detach-sign --armor --output <sig> [--local-user <key>] <message>""" + cmd = ["gpg", "--detach-sign", "--armor", "--yes", "--output", str(sig_path)] + if key: + cmd.extend(["--local-user", key]) + cmd.append(str(message_path)) + try: + result = subprocess.run(cmd, capture_output=True, text=True) + except FileNotFoundError: + err("gpg not found; install gnupg or use --no-sign for testing") + sys.exit(EXIT_GENERAL) + if result.returncode != 0: + err(f"signing failed: {result.stderr.strip()}") + sys.exit(EXIT_GENERAL) + + +def same_machine_deliver(message_path: Path, sig_path: Path | None, target_dir: Path, canonical_name: str) -> None: + """Atomic-write delivery: stage .asc, mv to final, then stage .org, mv to final.""" + target_dir.mkdir(parents=True, exist_ok=True) + final_msg = target_dir / canonical_name + final_sig = target_dir / f"{canonical_name}.asc" + + if sig_path is not None: + # Stage .asc first, mv to final, THEN stage .org and mv to final. + with tempfile.NamedTemporaryFile( + mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.asc.", delete=False + ) as tmp: + tmp.write(sig_path.read_bytes()) + tmp_sig_path = Path(tmp.name) + os.replace(tmp_sig_path, final_sig) + + # Re-check HALT between .asc and .org per the layered-checks rule. + check_halt() + + with tempfile.NamedTemporaryFile( + mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.", delete=False + ) as tmp: + tmp.write(message_path.read_bytes()) + tmp_msg_path = Path(tmp.name) + os.replace(tmp_msg_path, final_msg) + + +def cross_machine_deliver( + message_path: Path, + sig_path: Path | None, + canonical_name: str, + host: str, + ssh_user: str, + inbox_path: str, + retries: int, +) -> bool: + """rsync push the .asc first (if signed), re-check HALT, then push the .org. + + Returns True on success, False on persistent failure (after retries). + """ + # Stage local copies with the canonical name so rsync sets the right + # destination filename. + with tempfile.TemporaryDirectory(prefix="cross-agent-send-") as staging: + staging_dir = Path(staging) + local_msg = staging_dir / canonical_name + local_msg.write_bytes(message_path.read_bytes()) + local_sig = None + if sig_path is not None: + local_sig = staging_dir / f"{canonical_name}.asc" + local_sig.write_bytes(sig_path.read_bytes()) + + backoffs = [5, 30, 120] + # Step 1: push .asc first if signed. + if local_sig is not None: + if not _rsync_with_retries(local_sig, host, ssh_user, inbox_path, retries, backoffs): + return False + + # Re-check HALT between .asc and .org per the layered-checks rule. + check_halt() + + # Step 2: push .org. + if not _rsync_with_retries(local_msg, host, ssh_user, inbox_path, retries, backoffs): + return False + + return True + + +def _rsync_with_retries( + src: Path, host: str, ssh_user: str, inbox_path: str, retries: int, backoffs: list[int] +) -> bool: + target = f"{ssh_user}@{host}:{inbox_path}/" + last_err = "" + for attempt in range(retries + 1): + if attempt > 0: + check_halt() + wait = backoffs[min(attempt - 1, len(backoffs) - 1)] + err(f"rsync attempt {attempt} failed: {last_err}; retrying in {wait}s") + time.sleep(wait) + try: + result = subprocess.run( + ["rsync", "-a", str(src), target], + capture_output=True, + text=True, + ) + except FileNotFoundError: + err("rsync not found; install rsync") + return False + if result.returncode == 0: + return True + last_err = result.stderr.strip() or f"exit {result.returncode}" + err(f"rsync failed after {retries + 1} attempts: {last_err}") + return False + + +def write_failed_send_marker(dest: str, message_path: Path, error: str, retry_log: list[str]) -> None: + FAILED_SENDS_DIR.mkdir(parents=True, exist_ok=True) + timestamp = _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") + safe_basename = re.sub(r"[^A-Za-z0-9._-]", "_", message_path.name) + marker = FAILED_SENDS_DIR / f"{timestamp}-{dest.replace('.', '-')}-{safe_basename}.json" + marker.write_text(json.dumps( + { + "timestamp": timestamp, + "destination": dest, + "message_path": str(message_path), + "error": error, + "retry_log": retry_log, + }, + indent=2, + )) + err(f"marker written: {marker}") + + +def main() -> int: + parser = argparse.ArgumentParser(description="Send a cross-agent message.") + parser.add_argument("destination", help="Destination as <machine>.<project>") + parser.add_argument("message_file", type=Path, help="Path to the message body file") + parser.add_argument("--no-sign", action="store_true", help="Skip GPG signing (testing only)") + parser.add_argument("--retries", type=int, default=3, help="Retry count for cross-machine sends") + parser.add_argument("--key", help="GPG key id to sign with (default: user's primary)") + args = parser.parse_args() + + check_halt() + + if not args.message_file.is_file(): + err(f"message file not found: {args.message_file}") + return EXIT_GENERAL + + fm = parse_frontmatter(args.message_file) + validate_frontmatter(fm) + + peers = load_peers() + machine, project, host, ssh_user = resolve_destination(args.destination, peers) + inbox_path = resolve_inbox_path(project, peers) + + sender = derive_sender_project() + canonical_name = generate_canonical_filename(sender, fm["CONVERSATION_ID"]) + + sig_tmp = None + if not args.no_sign: + sig_tmp = args.message_file.with_suffix(args.message_file.suffix + ".asc.tmp") + sign(args.message_file, sig_tmp, args.key) + + try: + if host is None: + # Same-machine delivery. + target_dir = Path(os.path.expanduser(inbox_path)) + same_machine_deliver(args.message_file, sig_tmp, target_dir, canonical_name) + print(f"sent: {target_dir}/{canonical_name}") + return EXIT_OK + else: + ok = cross_machine_deliver( + args.message_file, sig_tmp, canonical_name, host, ssh_user, inbox_path, args.retries + ) + if ok: + print(f"sent: {ssh_user}@{host}:{inbox_path}/{canonical_name}") + return EXIT_OK + write_failed_send_marker(args.destination, args.message_file, "rsync failed after retries", []) + return EXIT_CROSS_MACHINE_FAILED + finally: + if sig_tmp is not None and sig_tmp.exists(): + sig_tmp.unlink() + + +if __name__ == "__main__": + sys.exit(main()) |
