diff options
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-send')
| -rwxr-xr-x | .ai/scripts/cross-agent-comms/cross-agent-send | 356 |
1 files changed, 0 insertions, 356 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-send b/.ai/scripts/cross-agent-comms/cross-agent-send deleted file mode 100755 index 68c010a..0000000 --- a/.ai/scripts/cross-agent-comms/cross-agent-send +++ /dev/null @@ -1,356 +0,0 @@ -#!/usr/bin/env python3 -"""Cross-agent message sender. - -See cross-agent-send.md for the full contract. Briefly: - -- Destination as <machine>.<project>; resolved via peers.toml. -- Same-machine: cp to receiver's inbox/from-agents/ with atomic rename. -- Cross-machine: rsync over SSH (typically Tailscale) with retry+backoff. -- GPG-signs by default; .asc renames before .org so receivers never see - a .org without its sibling signature. -- Generates the canonical filename; user's input filename is ignored. -- Honors the HALT file: refuses to send and exits with code 5 when set. -""" - -from __future__ import annotations - -import argparse -import datetime as _dt -import json -import os -import re -import shutil -import socket -import subprocess -import sys -import tempfile -import time -import tomllib -from pathlib import Path - -CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" -PEERS_TOML = CONFIG_DIR / "peers.toml" -HALT_FILE = CONFIG_DIR / "HALT" -STATE_DIR = Path.home() / ".local" / "state" / "cross-agent-comms" -FAILED_SENDS_DIR = STATE_DIR / "failed-sends" - -EXIT_OK = 0 -EXIT_GENERAL = 1 -EXIT_DEST_NOT_FOUND = 2 -EXIT_CROSS_MACHINE_FAILED = 3 -EXIT_FRONTMATTER = 4 -EXIT_HALT = 5 - -REQUIRED_FRONTMATTER = ["CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"] -VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"} - - -def err(msg: str) -> None: - print(msg, file=sys.stderr) - - -def check_halt() -> None: - """Exit with code 5 if HALT file exists.""" - if HALT_FILE.exists(): - try: - reason = HALT_FILE.read_text().strip() - except OSError: - # Fail-closed on unreadable HALT. - err("halt active (HALT file present but unreadable; treated as halted)") - err(f"remove {HALT_FILE} to resume") - sys.exit(EXIT_HALT) - msg = "halt active" - if reason: - msg += f": {reason}" - err(msg) - err(f"remove {HALT_FILE} to resume") - sys.exit(EXIT_HALT) - - -def parse_frontmatter(path: Path) -> dict[str, str]: - """Extract org-mode #+KEY: value frontmatter from the top of the file.""" - try: - text = path.read_text() - except OSError as e: - err(f"cannot read message file: {e}") - sys.exit(EXIT_GENERAL) - - frontmatter: dict[str, str] = {} - for line in text.splitlines(): - line = line.rstrip() - if not line: - # Blank line ends the frontmatter block. - if frontmatter: - break - continue - m = re.match(r"#\+([A-Z_]+):\s*(.*)", line) - if m: - frontmatter[m.group(1)] = m.group(2).strip() - else: - # First non-frontmatter line ends parsing. - if frontmatter: - break - return frontmatter - - -def validate_frontmatter(fm: dict[str, str]) -> None: - missing = [k for k in REQUIRED_FRONTMATTER if k not in fm] - if missing: - err(f"frontmatter missing required fields: {', '.join(missing)}") - sys.exit(EXIT_FRONTMATTER) - if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES: - err(f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}; expected one of {sorted(VALID_MESSAGE_TYPES)}") - sys.exit(EXIT_FRONTMATTER) - try: - int(fm["SEQUENCE"]) - except ValueError: - err(f"SEQUENCE must be an integer; got {fm['SEQUENCE']!r}") - sys.exit(EXIT_FRONTMATTER) - - -def load_peers() -> dict: - if not PEERS_TOML.exists(): - return {} - try: - return tomllib.loads(PEERS_TOML.read_text()) - except (tomllib.TOMLDecodeError, OSError) as e: - err(f"cannot read {PEERS_TOML}: {e}") - sys.exit(EXIT_GENERAL) - - -def resolve_destination(dest: str, peers: dict) -> tuple[str, str, str | None, str | None]: - """Resolve <machine>.<project> to (machine, project, host, ssh_user). - - host is None for same-machine destinations. - """ - if "." not in dest: - err(f"destination must be <machine>.<project>; got {dest!r}") - sys.exit(EXIT_DEST_NOT_FOUND) - machine, project = dest.split(".", 1) - - local_hostname = socket.gethostname().split(".")[0] - is_local = machine == local_hostname or machine == "local" - - host = None - ssh_user = None - if not is_local: - peer_cfg = peers.get("peers", {}).get(machine) - if peer_cfg is None: - available = list(peers.get("peers", {}).keys()) - err(f"destination not found in peers.toml; available peers: {available or '(none)'}") - sys.exit(EXIT_DEST_NOT_FOUND) - host = peer_cfg.get("host", machine) - ssh_user = peer_cfg.get("ssh_user", os.environ.get("USER")) - - return machine, project, host, ssh_user - - -def resolve_inbox_path(project: str, peers: dict) -> str: - """Inbox path on the receiver. Defaults to ~/projects/<project>/inbox/from-agents.""" - proj_cfg = peers.get("projects", {}).get(project) - if proj_cfg and "inbox_path" in proj_cfg: - return os.path.expanduser(proj_cfg["inbox_path"]) - return f"~/projects/{project}/inbox/from-agents" - - -def derive_sender_project() -> str: - """Walk up from CWD looking for ~/projects/<name>/. - - Returns the project name if found; falls back to the basename of CWD. - """ - cwd = Path.cwd().resolve() - projects_root = (Path.home() / "projects").resolve() - try: - rel = cwd.relative_to(projects_root) - return rel.parts[0] - except ValueError: - return cwd.name - - -def generate_canonical_filename(sender: str, conv_id: str) -> str: - """YYYYMMDDTHHMMSSZ-from-<sender>-<conv-id>.org""" - now = _dt.datetime.now(_dt.timezone.utc) - timestamp = now.strftime("%Y%m%dT%H%M%SZ") - return f"{timestamp}-from-{sender}-{conv_id}.org" - - -def sign(message_path: Path, sig_path: Path, key: str | None) -> None: - """gpg --detach-sign --armor --output <sig> [--local-user <key>] <message>""" - cmd = ["gpg", "--detach-sign", "--armor", "--yes", "--output", str(sig_path)] - if key: - cmd.extend(["--local-user", key]) - cmd.append(str(message_path)) - try: - result = subprocess.run(cmd, capture_output=True, text=True) - except FileNotFoundError: - err("gpg not found; install gnupg or use --no-sign for testing") - sys.exit(EXIT_GENERAL) - if result.returncode != 0: - err(f"signing failed: {result.stderr.strip()}") - sys.exit(EXIT_GENERAL) - - -def same_machine_deliver(message_path: Path, sig_path: Path | None, target_dir: Path, canonical_name: str) -> None: - """Atomic-write delivery: stage .asc, mv to final, then stage .org, mv to final.""" - target_dir.mkdir(parents=True, exist_ok=True) - final_msg = target_dir / canonical_name - final_sig = target_dir / f"{canonical_name}.asc" - - if sig_path is not None: - # Stage .asc first, mv to final, THEN stage .org and mv to final. - with tempfile.NamedTemporaryFile( - mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.asc.", delete=False - ) as tmp: - tmp.write(sig_path.read_bytes()) - tmp_sig_path = Path(tmp.name) - os.replace(tmp_sig_path, final_sig) - - # Re-check HALT between .asc and .org per the layered-checks rule. - check_halt() - - with tempfile.NamedTemporaryFile( - mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.", delete=False - ) as tmp: - tmp.write(message_path.read_bytes()) - tmp_msg_path = Path(tmp.name) - os.replace(tmp_msg_path, final_msg) - - -def cross_machine_deliver( - message_path: Path, - sig_path: Path | None, - canonical_name: str, - host: str, - ssh_user: str, - inbox_path: str, - retries: int, -) -> bool: - """rsync push the .asc first (if signed), re-check HALT, then push the .org. - - Returns True on success, False on persistent failure (after retries). - """ - # Stage local copies with the canonical name so rsync sets the right - # destination filename. - with tempfile.TemporaryDirectory(prefix="cross-agent-send-") as staging: - staging_dir = Path(staging) - local_msg = staging_dir / canonical_name - local_msg.write_bytes(message_path.read_bytes()) - local_sig = None - if sig_path is not None: - local_sig = staging_dir / f"{canonical_name}.asc" - local_sig.write_bytes(sig_path.read_bytes()) - - backoffs = [5, 30, 120] - # Step 1: push .asc first if signed. - if local_sig is not None: - if not _rsync_with_retries(local_sig, host, ssh_user, inbox_path, retries, backoffs): - return False - - # Re-check HALT between .asc and .org per the layered-checks rule. - check_halt() - - # Step 2: push .org. - if not _rsync_with_retries(local_msg, host, ssh_user, inbox_path, retries, backoffs): - return False - - return True - - -def _rsync_with_retries( - src: Path, host: str, ssh_user: str, inbox_path: str, retries: int, backoffs: list[int] -) -> bool: - target = f"{ssh_user}@{host}:{inbox_path}/" - last_err = "" - for attempt in range(retries + 1): - if attempt > 0: - check_halt() - wait = backoffs[min(attempt - 1, len(backoffs) - 1)] - err(f"rsync attempt {attempt} failed: {last_err}; retrying in {wait}s") - time.sleep(wait) - try: - result = subprocess.run( - ["rsync", "-a", str(src), target], - capture_output=True, - text=True, - ) - except FileNotFoundError: - err("rsync not found; install rsync") - return False - if result.returncode == 0: - return True - last_err = result.stderr.strip() or f"exit {result.returncode}" - err(f"rsync failed after {retries + 1} attempts: {last_err}") - return False - - -def write_failed_send_marker(dest: str, message_path: Path, error: str, retry_log: list[str]) -> None: - FAILED_SENDS_DIR.mkdir(parents=True, exist_ok=True) - timestamp = _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") - safe_basename = re.sub(r"[^A-Za-z0-9._-]", "_", message_path.name) - marker = FAILED_SENDS_DIR / f"{timestamp}-{dest.replace('.', '-')}-{safe_basename}.json" - marker.write_text(json.dumps( - { - "timestamp": timestamp, - "destination": dest, - "message_path": str(message_path), - "error": error, - "retry_log": retry_log, - }, - indent=2, - )) - err(f"marker written: {marker}") - - -def main() -> int: - parser = argparse.ArgumentParser(description="Send a cross-agent message.") - parser.add_argument("destination", help="Destination as <machine>.<project>") - parser.add_argument("message_file", type=Path, help="Path to the message body file") - parser.add_argument("--no-sign", action="store_true", help="Skip GPG signing (testing only)") - parser.add_argument("--retries", type=int, default=3, help="Retry count for cross-machine sends") - parser.add_argument("--key", help="GPG key id to sign with (default: user's primary)") - args = parser.parse_args() - - check_halt() - - if not args.message_file.is_file(): - err(f"message file not found: {args.message_file}") - return EXIT_GENERAL - - fm = parse_frontmatter(args.message_file) - validate_frontmatter(fm) - - peers = load_peers() - machine, project, host, ssh_user = resolve_destination(args.destination, peers) - inbox_path = resolve_inbox_path(project, peers) - - sender = derive_sender_project() - canonical_name = generate_canonical_filename(sender, fm["CONVERSATION_ID"]) - - sig_tmp = None - if not args.no_sign: - sig_tmp = args.message_file.with_suffix(args.message_file.suffix + ".asc.tmp") - sign(args.message_file, sig_tmp, args.key) - - try: - if host is None: - # Same-machine delivery. - target_dir = Path(os.path.expanduser(inbox_path)) - same_machine_deliver(args.message_file, sig_tmp, target_dir, canonical_name) - print(f"sent: {target_dir}/{canonical_name}") - return EXIT_OK - else: - ok = cross_machine_deliver( - args.message_file, sig_tmp, canonical_name, host, ssh_user, inbox_path, args.retries - ) - if ok: - print(f"sent: {ssh_user}@{host}:{inbox_path}/{canonical_name}") - return EXIT_OK - write_failed_send_marker(args.destination, args.message_file, "rsync failed after retries", []) - return EXIT_CROSS_MACHINE_FAILED - finally: - if sig_tmp is not None and sig_tmp.exists(): - sig_tmp.unlink() - - -if __name__ == "__main__": - sys.exit(main()) |
