#!/usr/bin/env python3 """Cross-agent message sender. See cross-agent-send.md for the full contract. Briefly: - Destination as .; resolved via peers.toml. - Same-machine: cp to receiver's inbox/from-agents/ with atomic rename. - Cross-machine: rsync over SSH (typically Tailscale) with retry+backoff. - GPG-signs by default; .asc renames before .org so receivers never see a .org without its sibling signature. - Generates the canonical filename; user's input filename is ignored. - Honors the HALT file: refuses to send and exits with code 5 when set. """ from __future__ import annotations import argparse import datetime as _dt import json import os import re import shutil import socket import subprocess import sys import tempfile import time import tomllib from pathlib import Path CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" PEERS_TOML = CONFIG_DIR / "peers.toml" HALT_FILE = CONFIG_DIR / "HALT" STATE_DIR = Path.home() / ".local" / "state" / "cross-agent-comms" FAILED_SENDS_DIR = STATE_DIR / "failed-sends" EXIT_OK = 0 EXIT_GENERAL = 1 EXIT_DEST_NOT_FOUND = 2 EXIT_CROSS_MACHINE_FAILED = 3 EXIT_FRONTMATTER = 4 EXIT_HALT = 5 REQUIRED_FRONTMATTER = ["CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"] VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"} def err(msg: str) -> None: print(msg, file=sys.stderr) def check_halt() -> None: """Exit with code 5 if HALT file exists.""" if HALT_FILE.exists(): try: reason = HALT_FILE.read_text().strip() except OSError: # Fail-closed on unreadable HALT. err("halt active (HALT file present but unreadable; treated as halted)") err(f"remove {HALT_FILE} to resume") sys.exit(EXIT_HALT) msg = "halt active" if reason: msg += f": {reason}" err(msg) err(f"remove {HALT_FILE} to resume") sys.exit(EXIT_HALT) def parse_frontmatter(path: Path) -> dict[str, str]: """Extract org-mode #+KEY: value frontmatter from the top of the file.""" try: text = path.read_text() except OSError as e: err(f"cannot read message file: {e}") sys.exit(EXIT_GENERAL) frontmatter: dict[str, str] = {} for line in text.splitlines(): line = line.rstrip() if not line: # Blank line ends the frontmatter block. if frontmatter: break continue m = re.match(r"#\+([A-Z_]+):\s*(.*)", line) if m: frontmatter[m.group(1)] = m.group(2).strip() else: # First non-frontmatter line ends parsing. if frontmatter: break return frontmatter def validate_frontmatter(fm: dict[str, str]) -> None: missing = [k for k in REQUIRED_FRONTMATTER if k not in fm] if missing: err(f"frontmatter missing required fields: {', '.join(missing)}") sys.exit(EXIT_FRONTMATTER) if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES: err(f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}; expected one of {sorted(VALID_MESSAGE_TYPES)}") sys.exit(EXIT_FRONTMATTER) try: int(fm["SEQUENCE"]) except ValueError: err(f"SEQUENCE must be an integer; got {fm['SEQUENCE']!r}") sys.exit(EXIT_FRONTMATTER) def load_peers() -> dict: if not PEERS_TOML.exists(): return {} try: return tomllib.loads(PEERS_TOML.read_text()) except (tomllib.TOMLDecodeError, OSError) as e: err(f"cannot read {PEERS_TOML}: {e}") sys.exit(EXIT_GENERAL) def resolve_destination(dest: str, peers: dict) -> tuple[str, str, str | None, str | None]: """Resolve . to (machine, project, host, ssh_user). host is None for same-machine destinations. """ if "." not in dest: err(f"destination must be .; got {dest!r}") sys.exit(EXIT_DEST_NOT_FOUND) machine, project = dest.split(".", 1) local_hostname = socket.gethostname().split(".")[0] is_local = machine == local_hostname or machine == "local" host = None ssh_user = None if not is_local: peer_cfg = peers.get("peers", {}).get(machine) if peer_cfg is None: available = list(peers.get("peers", {}).keys()) err(f"destination not found in peers.toml; available peers: {available or '(none)'}") sys.exit(EXIT_DEST_NOT_FOUND) host = peer_cfg.get("host", machine) ssh_user = peer_cfg.get("ssh_user", os.environ.get("USER")) return machine, project, host, ssh_user def resolve_inbox_path(project: str, peers: dict) -> str: """Inbox path on the receiver. Defaults to ~/projects//inbox/from-agents.""" proj_cfg = peers.get("projects", {}).get(project) if proj_cfg and "inbox_path" in proj_cfg: return os.path.expanduser(proj_cfg["inbox_path"]) return f"~/projects/{project}/inbox/from-agents" def derive_sender_project() -> str: """Walk up from CWD looking for ~/projects//. Returns the project name if found; falls back to the basename of CWD. """ cwd = Path.cwd().resolve() projects_root = (Path.home() / "projects").resolve() try: rel = cwd.relative_to(projects_root) return rel.parts[0] except ValueError: return cwd.name def generate_canonical_filename(sender: str, conv_id: str) -> str: """YYYYMMDDTHHMMSSZ-from--.org""" now = _dt.datetime.now(_dt.timezone.utc) timestamp = now.strftime("%Y%m%dT%H%M%SZ") return f"{timestamp}-from-{sender}-{conv_id}.org" def sign(message_path: Path, sig_path: Path, key: str | None) -> None: """gpg --detach-sign --armor --output [--local-user ] """ cmd = ["gpg", "--detach-sign", "--armor", "--yes", "--output", str(sig_path)] if key: cmd.extend(["--local-user", key]) cmd.append(str(message_path)) try: result = subprocess.run(cmd, capture_output=True, text=True) except FileNotFoundError: err("gpg not found; install gnupg or use --no-sign for testing") sys.exit(EXIT_GENERAL) if result.returncode != 0: err(f"signing failed: {result.stderr.strip()}") sys.exit(EXIT_GENERAL) def same_machine_deliver(message_path: Path, sig_path: Path | None, target_dir: Path, canonical_name: str) -> None: """Atomic-write delivery: stage .asc, mv to final, then stage .org, mv to final.""" target_dir.mkdir(parents=True, exist_ok=True) final_msg = target_dir / canonical_name final_sig = target_dir / f"{canonical_name}.asc" if sig_path is not None: # Stage .asc first, mv to final, THEN stage .org and mv to final. with tempfile.NamedTemporaryFile( mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.asc.", delete=False ) as tmp: tmp.write(sig_path.read_bytes()) tmp_sig_path = Path(tmp.name) os.replace(tmp_sig_path, final_sig) # Re-check HALT between .asc and .org per the layered-checks rule. check_halt() with tempfile.NamedTemporaryFile( mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.", delete=False ) as tmp: tmp.write(message_path.read_bytes()) tmp_msg_path = Path(tmp.name) os.replace(tmp_msg_path, final_msg) def cross_machine_deliver( message_path: Path, sig_path: Path | None, canonical_name: str, host: str, ssh_user: str, inbox_path: str, retries: int, ) -> bool: """rsync push the .asc first (if signed), re-check HALT, then push the .org. Returns True on success, False on persistent failure (after retries). """ # Stage local copies with the canonical name so rsync sets the right # destination filename. with tempfile.TemporaryDirectory(prefix="cross-agent-send-") as staging: staging_dir = Path(staging) local_msg = staging_dir / canonical_name local_msg.write_bytes(message_path.read_bytes()) local_sig = None if sig_path is not None: local_sig = staging_dir / f"{canonical_name}.asc" local_sig.write_bytes(sig_path.read_bytes()) backoffs = [5, 30, 120] # Step 1: push .asc first if signed. if local_sig is not None: if not _rsync_with_retries(local_sig, host, ssh_user, inbox_path, retries, backoffs): return False # Re-check HALT between .asc and .org per the layered-checks rule. check_halt() # Step 2: push .org. if not _rsync_with_retries(local_msg, host, ssh_user, inbox_path, retries, backoffs): return False return True def _rsync_with_retries( src: Path, host: str, ssh_user: str, inbox_path: str, retries: int, backoffs: list[int] ) -> bool: target = f"{ssh_user}@{host}:{inbox_path}/" last_err = "" for attempt in range(retries + 1): if attempt > 0: check_halt() wait = backoffs[min(attempt - 1, len(backoffs) - 1)] err(f"rsync attempt {attempt} failed: {last_err}; retrying in {wait}s") time.sleep(wait) try: result = subprocess.run( ["rsync", "-a", str(src), target], capture_output=True, text=True, ) except FileNotFoundError: err("rsync not found; install rsync") return False if result.returncode == 0: return True last_err = result.stderr.strip() or f"exit {result.returncode}" err(f"rsync failed after {retries + 1} attempts: {last_err}") return False def write_failed_send_marker(dest: str, message_path: Path, error: str, retry_log: list[str]) -> None: FAILED_SENDS_DIR.mkdir(parents=True, exist_ok=True) timestamp = _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") safe_basename = re.sub(r"[^A-Za-z0-9._-]", "_", message_path.name) marker = FAILED_SENDS_DIR / f"{timestamp}-{dest.replace('.', '-')}-{safe_basename}.json" marker.write_text(json.dumps( { "timestamp": timestamp, "destination": dest, "message_path": str(message_path), "error": error, "retry_log": retry_log, }, indent=2, )) err(f"marker written: {marker}") def main() -> int: parser = argparse.ArgumentParser(description="Send a cross-agent message.") parser.add_argument("destination", help="Destination as .") parser.add_argument("message_file", type=Path, help="Path to the message body file") parser.add_argument("--no-sign", action="store_true", help="Skip GPG signing (testing only)") parser.add_argument("--retries", type=int, default=3, help="Retry count for cross-machine sends") parser.add_argument("--key", help="GPG key id to sign with (default: user's primary)") args = parser.parse_args() check_halt() if not args.message_file.is_file(): err(f"message file not found: {args.message_file}") return EXIT_GENERAL fm = parse_frontmatter(args.message_file) validate_frontmatter(fm) peers = load_peers() machine, project, host, ssh_user = resolve_destination(args.destination, peers) inbox_path = resolve_inbox_path(project, peers) sender = derive_sender_project() canonical_name = generate_canonical_filename(sender, fm["CONVERSATION_ID"]) sig_tmp = None if not args.no_sign: sig_tmp = args.message_file.with_suffix(args.message_file.suffix + ".asc.tmp") sign(args.message_file, sig_tmp, args.key) try: if host is None: # Same-machine delivery. target_dir = Path(os.path.expanduser(inbox_path)) same_machine_deliver(args.message_file, sig_tmp, target_dir, canonical_name) print(f"sent: {target_dir}/{canonical_name}") return EXIT_OK else: ok = cross_machine_deliver( args.message_file, sig_tmp, canonical_name, host, ssh_user, inbox_path, args.retries ) if ok: print(f"sent: {ssh_user}@{host}:{inbox_path}/{canonical_name}") return EXIT_OK write_failed_send_marker(args.destination, args.message_file, "rsync failed after retries", []) return EXIT_CROSS_MACHINE_FAILED finally: if sig_tmp is not None and sig_tmp.exists(): sig_tmp.unlink() if __name__ == "__main__": sys.exit(main())