aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cross-agent-comms/cross-agent-recv
blob: b67533a2d9a6e3e672a084e951dfc8131e59954b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#!/usr/bin/env python3
"""Cross-agent message receiver.

See cross-agent-recv.md for the full contract. Reads one message file and
emits a structured decision the agent acts on:

  process | dedup | query | reject

Decision exit codes:
  0 = process    1 = dedup    2 = query    3 = reject

When HALT is set, the script refuses to verify or dedup and leaves the
inbound file in place — resume picks it up via cold-start.
"""

from __future__ import annotations

import argparse
import hashlib
import json
import re
import shutil
import subprocess
import sys
from pathlib import Path

CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms"
HALT_FILE = CONFIG_DIR / "HALT"
EXPECTED_PROTOCOL_VERSION = "5"

REQUIRED_FRONTMATTER = ["TITLE", "CONVERSATION_ID", "MESSAGE_TYPE", "SEQUENCE", "TIMESTAMP", "PROTOCOL_VERSION"]
VALID_MESSAGE_TYPES = {"request", "progress", "query", "pushback", "complete", "release", "escalate"}

DEC_PROCESS = "process"
DEC_DEDUP = "dedup"
DEC_QUERY = "query"
DEC_REJECT = "reject"

EXIT_FOR_DECISION = {
    DEC_PROCESS: 0,
    DEC_DEDUP: 1,
    DEC_QUERY: 2,
    DEC_REJECT: 3,
}

EXIT_HALT = 5


def err(msg: str) -> None:
    print(msg, file=sys.stderr)


def check_halt() -> None:
    if HALT_FILE.exists():
        try:
            reason = HALT_FILE.read_text().strip()
        except OSError:
            err("halt active (HALT file present but unreadable; treated as halted)")
            sys.exit(EXIT_HALT)
        msg = "halt active; leaving inbound message in place (resume will pick up)"
        if reason:
            msg = f"{msg}: {reason}"
        err(msg)
        sys.exit(EXIT_HALT)


def parse_frontmatter(path: Path) -> dict[str, str]:
    try:
        text = path.read_text()
    except OSError as e:
        return {"_parse_error": f"cannot read: {e}"}
    fm: dict[str, str] = {}
    for line in text.splitlines():
        line = line.rstrip()
        if not line:
            if fm:
                break
            continue
        m = re.match(r"#\+([A-Z_]+):\s*(.*)", line)
        if m:
            fm[m.group(1)] = m.group(2).strip()
        elif fm:
            break
    return fm


def emit_decision(
    decision: str,
    reason: str | None,
    fm: dict[str, str],
    sha256: str | None,
    args: argparse.Namespace,
) -> int:
    payload = {
        "decision": decision,
        "reason": reason,
        "message_type": fm.get("MESSAGE_TYPE"),
        "conversation_id": fm.get("CONVERSATION_ID"),
        "sequence": fm.get("SEQUENCE"),
        "timestamp": fm.get("TIMESTAMP"),
        "sha256": sha256,
    }
    if args.json:
        print(json.dumps(payload, indent=None if args.compact_json else 2))
    else:
        print(f"decision: {decision}")
        if reason:
            print(f"reason: {reason}")
        for k in ("message_type", "conversation_id", "sequence", "timestamp"):
            v = payload[k]
            if v is not None:
                print(f"{k}: {v}")
        if sha256:
            print(f"sha256: {sha256}")
    return EXIT_FOR_DECISION[decision]


def gpg_verify(message_path: Path, sig_path: Path) -> tuple[bool, str]:
    try:
        result = subprocess.run(
            ["gpg", "--verify", str(sig_path), str(message_path)],
            capture_output=True,
            text=True,
        )
    except FileNotFoundError:
        return False, "gpg not installed"
    if result.returncode == 0:
        return True, ""
    return False, result.stderr.strip().splitlines()[-1] if result.stderr.strip() else f"exit {result.returncode}"


def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()


def find_dedup_match(message_path: Path, fm: dict[str, str], my_hash: str) -> tuple[str, str | None]:
    """Scan the message's directory for same-CONVERSATION_ID/SEQUENCE files.

    Returns (decision, reason) — decision is DEC_DEDUP for an exact-hash match,
    or DEC_PROCESS when no match or hash differs (sequence collision is OK).
    """
    parent = message_path.parent
    conv_id = fm["CONVERSATION_ID"]
    sequence = fm["SEQUENCE"]
    for sibling in parent.iterdir():
        if sibling == message_path or not sibling.is_file() or sibling.suffix != ".org":
            continue
        sib_fm = parse_frontmatter(sibling)
        if sib_fm.get("CONVERSATION_ID") != conv_id or sib_fm.get("SEQUENCE") != sequence:
            continue
        # Same conv-id + same sequence — check hash.
        if sha256_of(sibling) == my_hash:
            return DEC_DEDUP, f"identical retry of {sibling.name}"
    return DEC_PROCESS, None


def check_requires_tools(fm: dict[str, str]) -> tuple[bool, list[str]]:
    """REQUIRES_TOOLS is a comma-separated list of tool names.

    For v5, "tool available" is a heuristic: an executable on PATH whose name
    matches the tool slug. MCP availability is currently out of scope (no
    portable way to query it from a CLI).
    """
    tools_field = fm.get("REQUIRES_TOOLS")
    if not tools_field:
        return True, []
    tools = [t.strip() for t in tools_field.split(",") if t.strip()]
    missing = [t for t in tools if shutil.which(t) is None]
    return len(missing) == 0, missing


def main() -> int:
    parser = argparse.ArgumentParser(description="Receive and decide on a cross-agent message.")
    parser.add_argument("message_file", type=Path)
    parser.add_argument("--no-verify", action="store_true", help="Skip GPG verification (testing only)")
    parser.add_argument("--no-dedup", action="store_true", help="Skip SHA-256 dedup against existing files")
    parser.add_argument("--protocol-version", default=EXPECTED_PROTOCOL_VERSION,
                        help="Override expected protocol version (default: 5)")
    parser.add_argument("--json", action="store_true", help="Emit JSON output")
    parser.add_argument("--compact-json", action="store_true", help="Compact JSON (no indent)")
    args = parser.parse_args()

    check_halt()

    if not args.message_file.is_file():
        err(f"message file not found: {args.message_file}")
        return EXIT_FOR_DECISION[DEC_REJECT]

    fm = parse_frontmatter(args.message_file)
    if "_parse_error" in fm:
        return emit_decision(DEC_REJECT, fm["_parse_error"], {}, None, args)

    # Step 1: frontmatter sanity-check.
    missing = [k for k in REQUIRED_FRONTMATTER if k not in fm]
    if missing:
        return emit_decision(
            DEC_REJECT, f"frontmatter missing required fields: {', '.join(missing)}", fm, None, args
        )
    if fm["MESSAGE_TYPE"] not in VALID_MESSAGE_TYPES:
        return emit_decision(
            DEC_REJECT, f"invalid MESSAGE_TYPE: {fm['MESSAGE_TYPE']!r}", fm, None, args
        )

    # Step 2: PROTOCOL_VERSION check.
    if fm["PROTOCOL_VERSION"] != args.protocol_version:
        return emit_decision(
            DEC_QUERY,
            f"PROTOCOL_VERSION mismatch: expected {args.protocol_version}, got {fm['PROTOCOL_VERSION']}",
            fm,
            None,
            args,
        )

    # Step 3: GPG verify.
    if not args.no_verify:
        sig_path = args.message_file.with_suffix(args.message_file.suffix + ".asc")
        if not sig_path.is_file():
            return emit_decision(DEC_REJECT, f"signature file missing: {sig_path.name}", fm, None, args)
        ok, gpg_err = gpg_verify(args.message_file, sig_path)
        if not ok:
            return emit_decision(DEC_REJECT, f"gpg verify failed: {gpg_err}", fm, None, args)

    # Step 4: SHA-256 dedup.
    my_hash = sha256_of(args.message_file)
    if not args.no_dedup:
        decision, reason = find_dedup_match(args.message_file, fm, my_hash)
        if decision == DEC_DEDUP:
            return emit_decision(DEC_DEDUP, reason, fm, my_hash, args)

    # Step 5: REQUIRES_TOOLS check.
    ok, missing_tools = check_requires_tools(fm)
    if not ok:
        return emit_decision(
            DEC_QUERY,
            f"required tools unavailable: {', '.join(missing_tools)}",
            fm,
            my_hash,
            args,
        )

    # Step 6: process.
    return emit_decision(DEC_PROCESS, None, fm, my_hash, args)


if __name__ == "__main__":
    sys.exit(main())