aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cross-agent-comms/cross-agent-send
diff options
context:
space:
mode:
Diffstat (limited to '.ai/scripts/cross-agent-comms/cross-agent-send')
-rwxr-xr-x.ai/scripts/cross-agent-comms/cross-agent-send356
1 files changed, 0 insertions, 356 deletions
diff --git a/.ai/scripts/cross-agent-comms/cross-agent-send b/.ai/scripts/cross-agent-comms/cross-agent-send
deleted file mode 100755
index 68c010a..0000000
--- a/.ai/scripts/cross-agent-comms/cross-agent-send
+++ /dev/null
@@ -1,356 +0,0 @@
-#!/usr/bin/env python3
-"""Cross-agent message sender.
-
-See cross-agent-send.md for the full contract. Briefly:
-
-- Destination as <machine>.<project>; resolved via peers.toml.
-- Same-machine: cp to receiver's inbox/from-agents/ with atomic rename.
-- Cross-machine: rsync over SSH (typically Tailscale) with retry+backoff.
-- GPG-signs by default; .asc renames before .org so receivers never see
- a .org without its sibling signature.
-- Generates the canonical filename; user's input filename is ignored.
-- Honors the HALT file: refuses to send and exits with code 5 when set.
-"""
-
-from __future__ import annotations
-
-import argparse
-import datetime as _dt
-import json
-import os
-import re
-import shutil
-import socket
-import subprocess
-import sys
-import tempfile
-import time
-import tomllib
-from pathlib import Path
-
-CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms"
-PEERS_TOML = CONFIG_DIR / "peers.toml"
-HALT_FILE = CONFIG_DIR / "HALT"
-STATE_DIR = Path.home() / ".local" / "state" / "cross-agent-comms"
-FAILED_SENDS_DIR = STATE_DIR / "failed-sends"
-
-EXIT_OK = 0
-EXIT_GENERAL = 1
-EXIT_DEST_NOT_FOUND = 2
-EXIT_CROSS_MACHINE_FAILED = 3
-EXIT_FRONTMATTER = 4
-EXIT_HALT = 5
-
-REQUIRED_FRONTMATTER = ["CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"]
-VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"}
-
-
-def err(msg: str) -> None:
- print(msg, file=sys.stderr)
-
-
-def check_halt() -> None:
- """Exit with code 5 if HALT file exists."""
- if HALT_FILE.exists():
- try:
- reason = HALT_FILE.read_text().strip()
- except OSError:
- # Fail-closed on unreadable HALT.
- err("halt active (HALT file present but unreadable; treated as halted)")
- err(f"remove {HALT_FILE} to resume")
- sys.exit(EXIT_HALT)
- msg = "halt active"
- if reason:
- msg += f": {reason}"
- err(msg)
- err(f"remove {HALT_FILE} to resume")
- sys.exit(EXIT_HALT)
-
-
-def parse_frontmatter(path: Path) -> dict[str, str]:
- """Extract org-mode #+KEY: value frontmatter from the top of the file."""
- try:
- text = path.read_text()
- except OSError as e:
- err(f"cannot read message file: {e}")
- sys.exit(EXIT_GENERAL)
-
- frontmatter: dict[str, str] = {}
- for line in text.splitlines():
- line = line.rstrip()
- if not line:
- # Blank line ends the frontmatter block.
- if frontmatter:
- break
- continue
- m = re.match(r"#\+([A-Z_]+):\s*(.*)", line)
- if m:
- frontmatter[m.group(1)] = m.group(2).strip()
- else:
- # First non-frontmatter line ends parsing.
- if frontmatter:
- break
- return frontmatter
-
-
-def validate_frontmatter(fm: dict[str, str]) -> None:
- missing = [k for k in REQUIRED_FRONTMATTER if k not in fm]
- if missing:
- err(f"frontmatter missing required fields: {', '.join(missing)}")
- sys.exit(EXIT_FRONTMATTER)
- if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES:
- err(f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}; expected one of {sorted(VALID_MESSAGE_TYPES)}")
- sys.exit(EXIT_FRONTMATTER)
- try:
- int(fm["SEQUENCE"])
- except ValueError:
- err(f"SEQUENCE must be an integer; got {fm['SEQUENCE']!r}")
- sys.exit(EXIT_FRONTMATTER)
-
-
-def load_peers() -> dict:
- if not PEERS_TOML.exists():
- return {}
- try:
- return tomllib.loads(PEERS_TOML.read_text())
- except (tomllib.TOMLDecodeError, OSError) as e:
- err(f"cannot read {PEERS_TOML}: {e}")
- sys.exit(EXIT_GENERAL)
-
-
-def resolve_destination(dest: str, peers: dict) -> tuple[str, str, str | None, str | None]:
- """Resolve <machine>.<project> to (machine, project, host, ssh_user).
-
- host is None for same-machine destinations.
- """
- if "." not in dest:
- err(f"destination must be <machine>.<project>; got {dest!r}")
- sys.exit(EXIT_DEST_NOT_FOUND)
- machine, project = dest.split(".", 1)
-
- local_hostname = socket.gethostname().split(".")[0]
- is_local = machine == local_hostname or machine == "local"
-
- host = None
- ssh_user = None
- if not is_local:
- peer_cfg = peers.get("peers", {}).get(machine)
- if peer_cfg is None:
- available = list(peers.get("peers", {}).keys())
- err(f"destination not found in peers.toml; available peers: {available or '(none)'}")
- sys.exit(EXIT_DEST_NOT_FOUND)
- host = peer_cfg.get("host", machine)
- ssh_user = peer_cfg.get("ssh_user", os.environ.get("USER"))
-
- return machine, project, host, ssh_user
-
-
-def resolve_inbox_path(project: str, peers: dict) -> str:
- """Inbox path on the receiver. Defaults to ~/projects/<project>/inbox/from-agents."""
- proj_cfg = peers.get("projects", {}).get(project)
- if proj_cfg and "inbox_path" in proj_cfg:
- return os.path.expanduser(proj_cfg["inbox_path"])
- return f"~/projects/{project}/inbox/from-agents"
-
-
-def derive_sender_project() -> str:
- """Walk up from CWD looking for ~/projects/<name>/.
-
- Returns the project name if found; falls back to the basename of CWD.
- """
- cwd = Path.cwd().resolve()
- projects_root = (Path.home() / "projects").resolve()
- try:
- rel = cwd.relative_to(projects_root)
- return rel.parts[0]
- except ValueError:
- return cwd.name
-
-
-def generate_canonical_filename(sender: str, conv_id: str) -> str:
- """YYYYMMDDTHHMMSSZ-from-<sender>-<conv-id>.org"""
- now = _dt.datetime.now(_dt.timezone.utc)
- timestamp = now.strftime("%Y%m%dT%H%M%SZ")
- return f"{timestamp}-from-{sender}-{conv_id}.org"
-
-
-def sign(message_path: Path, sig_path: Path, key: str | None) -> None:
- """gpg --detach-sign --armor --output <sig> [--local-user <key>] <message>"""
- cmd = ["gpg", "--detach-sign", "--armor", "--yes", "--output", str(sig_path)]
- if key:
- cmd.extend(["--local-user", key])
- cmd.append(str(message_path))
- try:
- result = subprocess.run(cmd, capture_output=True, text=True)
- except FileNotFoundError:
- err("gpg not found; install gnupg or use --no-sign for testing")
- sys.exit(EXIT_GENERAL)
- if result.returncode != 0:
- err(f"signing failed: {result.stderr.strip()}")
- sys.exit(EXIT_GENERAL)
-
-
-def same_machine_deliver(message_path: Path, sig_path: Path | None, target_dir: Path, canonical_name: str) -> None:
- """Atomic-write delivery: stage .asc, mv to final, then stage .org, mv to final."""
- target_dir.mkdir(parents=True, exist_ok=True)
- final_msg = target_dir / canonical_name
- final_sig = target_dir / f"{canonical_name}.asc"
-
- if sig_path is not None:
- # Stage .asc first, mv to final, THEN stage .org and mv to final.
- with tempfile.NamedTemporaryFile(
- mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.asc.", delete=False
- ) as tmp:
- tmp.write(sig_path.read_bytes())
- tmp_sig_path = Path(tmp.name)
- os.replace(tmp_sig_path, final_sig)
-
- # Re-check HALT between .asc and .org per the layered-checks rule.
- check_halt()
-
- with tempfile.NamedTemporaryFile(
- mode="wb", dir=target_dir, prefix=f".tmp.{canonical_name}.", delete=False
- ) as tmp:
- tmp.write(message_path.read_bytes())
- tmp_msg_path = Path(tmp.name)
- os.replace(tmp_msg_path, final_msg)
-
-
-def cross_machine_deliver(
- message_path: Path,
- sig_path: Path | None,
- canonical_name: str,
- host: str,
- ssh_user: str,
- inbox_path: str,
- retries: int,
-) -> bool:
- """rsync push the .asc first (if signed), re-check HALT, then push the .org.
-
- Returns True on success, False on persistent failure (after retries).
- """
- # Stage local copies with the canonical name so rsync sets the right
- # destination filename.
- with tempfile.TemporaryDirectory(prefix="cross-agent-send-") as staging:
- staging_dir = Path(staging)
- local_msg = staging_dir / canonical_name
- local_msg.write_bytes(message_path.read_bytes())
- local_sig = None
- if sig_path is not None:
- local_sig = staging_dir / f"{canonical_name}.asc"
- local_sig.write_bytes(sig_path.read_bytes())
-
- backoffs = [5, 30, 120]
- # Step 1: push .asc first if signed.
- if local_sig is not None:
- if not _rsync_with_retries(local_sig, host, ssh_user, inbox_path, retries, backoffs):
- return False
-
- # Re-check HALT between .asc and .org per the layered-checks rule.
- check_halt()
-
- # Step 2: push .org.
- if not _rsync_with_retries(local_msg, host, ssh_user, inbox_path, retries, backoffs):
- return False
-
- return True
-
-
-def _rsync_with_retries(
- src: Path, host: str, ssh_user: str, inbox_path: str, retries: int, backoffs: list[int]
-) -> bool:
- target = f"{ssh_user}@{host}:{inbox_path}/"
- last_err = ""
- for attempt in range(retries + 1):
- if attempt > 0:
- check_halt()
- wait = backoffs[min(attempt - 1, len(backoffs) - 1)]
- err(f"rsync attempt {attempt} failed: {last_err}; retrying in {wait}s")
- time.sleep(wait)
- try:
- result = subprocess.run(
- ["rsync", "-a", str(src), target],
- capture_output=True,
- text=True,
- )
- except FileNotFoundError:
- err("rsync not found; install rsync")
- return False
- if result.returncode == 0:
- return True
- last_err = result.stderr.strip() or f"exit {result.returncode}"
- err(f"rsync failed after {retries + 1} attempts: {last_err}")
- return False
-
-
-def write_failed_send_marker(dest: str, message_path: Path, error: str, retry_log: list[str]) -> None:
- FAILED_SENDS_DIR.mkdir(parents=True, exist_ok=True)
- timestamp = _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
- safe_basename = re.sub(r"[^A-Za-z0-9._-]", "_", message_path.name)
- marker = FAILED_SENDS_DIR / f"{timestamp}-{dest.replace('.', '-')}-{safe_basename}.json"
- marker.write_text(json.dumps(
- {
- "timestamp": timestamp,
- "destination": dest,
- "message_path": str(message_path),
- "error": error,
- "retry_log": retry_log,
- },
- indent=2,
- ))
- err(f"marker written: {marker}")
-
-
-def main() -> int:
- parser = argparse.ArgumentParser(description="Send a cross-agent message.")
- parser.add_argument("destination", help="Destination as <machine>.<project>")
- parser.add_argument("message_file", type=Path, help="Path to the message body file")
- parser.add_argument("--no-sign", action="store_true", help="Skip GPG signing (testing only)")
- parser.add_argument("--retries", type=int, default=3, help="Retry count for cross-machine sends")
- parser.add_argument("--key", help="GPG key id to sign with (default: user's primary)")
- args = parser.parse_args()
-
- check_halt()
-
- if not args.message_file.is_file():
- err(f"message file not found: {args.message_file}")
- return EXIT_GENERAL
-
- fm = parse_frontmatter(args.message_file)
- validate_frontmatter(fm)
-
- peers = load_peers()
- machine, project, host, ssh_user = resolve_destination(args.destination, peers)
- inbox_path = resolve_inbox_path(project, peers)
-
- sender = derive_sender_project()
- canonical_name = generate_canonical_filename(sender, fm["CONVERSATION_ID"])
-
- sig_tmp = None
- if not args.no_sign:
- sig_tmp = args.message_file.with_suffix(args.message_file.suffix + ".asc.tmp")
- sign(args.message_file, sig_tmp, args.key)
-
- try:
- if host is None:
- # Same-machine delivery.
- target_dir = Path(os.path.expanduser(inbox_path))
- same_machine_deliver(args.message_file, sig_tmp, target_dir, canonical_name)
- print(f"sent: {target_dir}/{canonical_name}")
- return EXIT_OK
- else:
- ok = cross_machine_deliver(
- args.message_file, sig_tmp, canonical_name, host, ssh_user, inbox_path, args.retries
- )
- if ok:
- print(f"sent: {ssh_user}@{host}:{inbox_path}/{canonical_name}")
- return EXIT_OK
- write_failed_send_marker(args.destination, args.message_file, "rsync failed after retries", [])
- return EXIT_CROSS_MACHINE_FAILED
- finally:
- if sig_tmp is not None and sig_tmp.exists():
- sig_tmp.unlink()
-
-
-if __name__ == "__main__":
- sys.exit(main())