1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
"""Tests for cross-agent-recv."""
from __future__ import annotations
import json
import os
import subprocess
from pathlib import Path
import pytest
SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-recv"
def _make_message(path: Path, *, conv_id: str = "test-conv", seq: int = 1, msg_type: str = "request",
proto_version: str = "5", title: str = "Test", requires_tools: str | None = None,
body: str = "Body.\n") -> Path:
fm_lines = [
f"#+TITLE: {title}",
f"#+CONVERSATION_ID: {conv_id}",
f"#+MESSAGE_TYPE: {msg_type}",
f"#+SEQUENCE: {seq}",
"#+TIMESTAMP: 2026-04-27T05:00:00-05:00",
f"#+PROTOCOL_VERSION: {proto_version}",
]
if requires_tools:
fm_lines.append(f"#+REQUIRES_TOOLS: {requires_tools}")
path.write_text("\n".join(fm_lines) + "\n\n" + body)
return path
def _run(args: list[str], env: dict | None = None) -> subprocess.CompletedProcess:
return subprocess.run([str(SCRIPT), *args], capture_output=True, text=True, env=env)
@pytest.fixture
def isolated_env(tmp_path, monkeypatch):
fake_home = tmp_path / "home"
fake_home.mkdir()
monkeypatch.setenv("HOME", str(fake_home))
return fake_home
def test_recv_help(isolated_env):
result = _run(["--help"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 0
assert "Receive and decide" in result.stdout
def test_recv_missing_file_rejects(isolated_env, tmp_path):
result = _run([str(tmp_path / "nope.org")], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 3 # reject
def test_recv_malformed_frontmatter_rejects(isolated_env, tmp_path):
bad = tmp_path / "bad.org"
bad.write_text("not org-mode at all\n")
result = _run([str(bad), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 3
assert "decision: reject" in result.stdout
def test_recv_missing_required_field_rejects(isolated_env, tmp_path):
msg = tmp_path / "msg.org"
# Missing PROTOCOL_VERSION among others.
msg.write_text("#+TITLE: x\n#+CONVERSATION_ID: c\n\nBody.\n")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 3
assert "missing required" in result.stdout
def test_recv_protocol_version_mismatch_query(isolated_env, tmp_path):
msg = _make_message(tmp_path / "msg.org", proto_version="4")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 2 # query
assert "PROTOCOL_VERSION mismatch" in result.stdout
def test_recv_invalid_message_type_rejects(isolated_env, tmp_path):
msg = _make_message(tmp_path / "msg.org", msg_type="banana")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 3
assert "invalid MESSAGE_TYPE" in result.stdout
def test_recv_missing_signature_rejects(isolated_env, tmp_path):
"""When verify is on, a missing .asc sibling rejects."""
msg = _make_message(tmp_path / "msg.org")
# No .asc sidecar.
result = _run([str(msg)], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 3
assert "signature file missing" in result.stdout
def test_recv_valid_processes(isolated_env, tmp_path):
"""A valid message with --no-verify and no dedup match → process."""
msg = _make_message(tmp_path / "msg.org")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 0 # process
assert "decision: process" in result.stdout
assert "sha256:" in result.stdout
def test_recv_dedup_against_identical_existing(isolated_env, tmp_path):
"""Same content + same SEQUENCE in same dir → dedup."""
inbox = tmp_path / "inbox"
inbox.mkdir()
first = _make_message(inbox / "20260427T100000Z-from-x-c.org", conv_id="c", seq=5)
# Second message with same content — name differs (canonical-style would have different timestamp).
second = _make_message(inbox / "20260427T100100Z-from-x-c.org", conv_id="c", seq=5)
# Bodies must be byte-identical for hash equality.
second.write_bytes(first.read_bytes())
result = _run([str(second), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 1 # dedup
assert "decision: dedup" in result.stdout
def test_recv_collision_with_different_content_processes(isolated_env, tmp_path):
"""Same SEQUENCE + same CONVERSATION_ID but different content → process both."""
inbox = tmp_path / "inbox"
inbox.mkdir()
_make_message(inbox / "20260427T100000Z-from-x-c.org", conv_id="c", seq=5, body="First body.\n")
second = _make_message(inbox / "20260427T100100Z-from-x-c.org", conv_id="c", seq=5, body="Different body.\n")
result = _run([str(second), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 0 # process
assert "decision: process" in result.stdout
def test_recv_requires_tools_missing_query(isolated_env, tmp_path):
"""REQUIRES_TOOLS naming a definitely-missing binary → query."""
msg = _make_message(tmp_path / "msg.org", requires_tools="definitely-not-installed-xyzzy-9000")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 2 # query
assert "required tools unavailable" in result.stdout
def test_recv_requires_tools_present_processes(isolated_env, tmp_path):
"""REQUIRES_TOOLS naming a real binary → process."""
msg = _make_message(tmp_path / "msg.org", requires_tools="ls,cat")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 0
assert "decision: process" in result.stdout
def test_recv_json_output(isolated_env, tmp_path):
msg = _make_message(tmp_path / "msg.org")
result = _run([str(msg), "--no-verify", "--json"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 0
payload = json.loads(result.stdout)
assert payload["decision"] == "process"
assert payload["message_type"] == "request"
assert payload["conversation_id"] == "test-conv"
def test_recv_halt_blocks(isolated_env, tmp_path):
halt = isolated_env / ".config" / "cross-agent-comms" / "HALT"
halt.parent.mkdir(parents=True)
halt.write_text("halted\n")
msg = _make_message(tmp_path / "msg.org")
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 5
assert "halt active" in result.stderr.lower()
def test_recv_halt_leaves_message_in_place(isolated_env, tmp_path):
"""Per spec: under HALT, recv must NOT move/dedup/reject — leave file in place."""
halt = isolated_env / ".config" / "cross-agent-comms" / "HALT"
halt.parent.mkdir(parents=True)
halt.write_text("halted\n")
msg = _make_message(tmp_path / "msg.org")
pre_content = msg.read_text()
result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)})
assert result.returncode == 5
# File still exists with same content.
assert msg.exists()
assert msg.read_text() == pre_content
|