#!/usr/bin/env python3 """Enumerate cross-agent destinations: local projects + tailnet peers. See cross-agent-discover.md. Local: scan ~/projects/*/.ai/. Peers: read peers.toml, SSH-probe each for reachability. --enumerate-remote optionally runs `ls -d ~/projects/*/.ai/` over SSH to list remote projects. Cache results for 5 min at ~/.cache/cross-agent-comms/discovery.json so repeated invocations don't re-probe. HALT: prints a banner; otherwise continues. """ from __future__ import annotations import argparse import datetime as _dt import json import os import subprocess import sys import time import tomllib from pathlib import Path CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms" PEERS_TOML = CONFIG_DIR / "peers.toml" HALT_FILE = CONFIG_DIR / "HALT" CACHE_DIR = Path.home() / ".cache" / "cross-agent-comms" CACHE_FILE = CACHE_DIR / "discovery.json" CACHE_TTL_SECONDS = 300 EXIT_OK = 0 EXIT_GENERAL = 1 EXIT_PEERS_TOML = 1 def err(msg: str) -> None: print(msg, file=sys.stderr) def render_banner_if_halt() -> None: if not HALT_FILE.exists(): return try: reason = HALT_FILE.read_text().strip() except OSError: reason = "(HALT file unreadable; treated as halted)" print("⚠ HALT ACTIVE — cross-agent comms paused") if reason: print(f" reason: {reason}") print() def enumerate_local_projects() -> list[str]: projects_dir = Path.home() / "projects" if not projects_dir.is_dir(): return [] found = [] for child in sorted(projects_dir.iterdir()): if child.is_dir() and (child / ".ai").is_dir(): found.append(child.name) return found def load_peers() -> dict: if not PEERS_TOML.exists(): return {"peers": {}} try: return tomllib.loads(PEERS_TOML.read_text()) except (tomllib.TOMLDecodeError, OSError) as e: err(f"cannot parse peers.toml: {e}") sys.exit(EXIT_PEERS_TOML) def probe_peer_reachability(host: str, ssh_user: str | None) -> tuple[bool, str | None]: """Run a short SSH probe with BatchMode=yes (no interactive prompt).""" target = f"{ssh_user}@{host}" if ssh_user else host try: result = subprocess.run( ["ssh", "-o", "ConnectTimeout=2", "-o", "BatchMode=yes", target, "true"], capture_output=True, text=True, timeout=5, ) except (FileNotFoundError, subprocess.TimeoutExpired): return False, "ssh probe failed" if result.returncode == 0: return True, None return False, (result.stderr.strip().splitlines() or [f"exit {result.returncode}"])[-1] def enumerate_remote_projects(host: str, ssh_user: str | None) -> list[str] | None: target = f"{ssh_user}@{host}" if ssh_user else host try: result = subprocess.run( [ "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", target, "ls -d ~/projects/*/.ai/ 2>/dev/null", ], capture_output=True, text=True, timeout=10, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if result.returncode != 0: return None projects = [] for line in result.stdout.splitlines(): # Each line looks like /home//projects//.ai/ parts = line.rstrip("/").split("/") if len(parts) >= 2 and parts[-1] == ".ai": projects.append(parts[-2]) return projects def read_cache() -> dict | None: if not CACHE_FILE.exists(): return None try: age = time.time() - CACHE_FILE.stat().st_mtime if age > CACHE_TTL_SECONDS: return None return json.loads(CACHE_FILE.read_text()) except (OSError, json.JSONDecodeError): return None def write_cache(payload: dict) -> None: CACHE_DIR.mkdir(parents=True, exist_ok=True) CACHE_FILE.write_text(json.dumps(payload, indent=2)) def discover(peer_filter: str | None, enumerate_remote: bool) -> dict: local = enumerate_local_projects() peers_cfg = load_peers().get("peers", {}) peers_out = [] for name, cfg in sorted(peers_cfg.items()): if peer_filter and name != peer_filter: continue host = cfg.get("host", name) ssh_user = cfg.get("ssh_user") reachable, error = probe_peer_reachability(host, ssh_user) entry = { "name": name, "host": host, "reachable": reachable, } if not reachable: entry["error"] = error if enumerate_remote and reachable: entry["projects"] = enumerate_remote_projects(host, ssh_user) or [] peers_out.append(entry) return { "scanned_at": _dt.datetime.now(_dt.timezone.utc).isoformat(), "halt_active": HALT_FILE.exists(), "local": local, "peers": peers_out, } def render_table(payload: dict, enumerate_remote: bool) -> None: local = payload.get("local", []) print(f"Local ({_local_hostname()}):") if local: wrapped = ", ".join(local) print(f" {wrapped} [{len(local)} project{'s' if len(local) != 1 else ''}]") else: print(" (no projects with .ai/ found)") print() peers = payload.get("peers", []) if not peers: print("Peers (from peers.toml):") print(" (no peers configured)") return print("Peers (from ~/.config/cross-agent-comms/peers.toml):") for p in peers: marker = "✓ reachable" if p.get("reachable") else f"✗ UNREACHABLE ({p.get('error', 'unknown')})" print(f" {p['name']:<16} {p['host']:<24} {marker}") if enumerate_remote and p.get("projects"): wrapped = ", ".join(p["projects"]) print(f" projects: {wrapped}") def _local_hostname() -> str: import socket return socket.gethostname().split(".")[0] def main() -> int: parser = argparse.ArgumentParser(description="Discover cross-agent destinations.") parser.add_argument("--enumerate-remote", action="store_true", help="SSH into each peer and list ~/projects/*/.ai/") parser.add_argument("--no-cache", action="store_true", help="Skip cache; force fresh probe") parser.add_argument("--peer", help="Limit to a single peer name from peers.toml") parser.add_argument("--json", action="store_true", help="Machine-readable output") args = parser.parse_args() render_banner_if_halt() payload = None if not args.no_cache: cached = read_cache() if cached is not None: # Honor --peer filter on cached payload. if args.peer: cached["peers"] = [p for p in cached.get("peers", []) if p["name"] == args.peer] payload = cached if payload is None: payload = discover(args.peer, args.enumerate_remote) if not args.no_cache and not args.peer: # Only cache full (unfiltered) discoveries. write_cache(payload) if args.json: print(json.dumps(payload, indent=2)) return EXIT_OK render_table(payload, args.enumerate_remote) return EXIT_OK if __name__ == "__main__": sys.exit(main())