aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/tests/test_cross_agent_status.py
diff options
context:
space:
mode:
Diffstat (limited to '.ai/scripts/tests/test_cross_agent_status.py')
-rw-r--r--.ai/scripts/tests/test_cross_agent_status.py165
1 files changed, 0 insertions, 165 deletions
diff --git a/.ai/scripts/tests/test_cross_agent_status.py b/.ai/scripts/tests/test_cross_agent_status.py
deleted file mode 100644
index bb5b8ba..0000000
--- a/.ai/scripts/tests/test_cross_agent_status.py
+++ /dev/null
@@ -1,165 +0,0 @@
-"""Tests for cross-agent-status (TDD: tests written before implementation)."""
-
-from __future__ import annotations
-
-import json
-import os
-import subprocess
-import textwrap
-from pathlib import Path
-
-import pytest
-
-SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-status"
-
-
-def _make_msg(path: Path, *, conv_id: str, seq: int, msg_type: str = "request",
- proto_version: str = "5", timestamp: str = "2026-04-27T05:00:00-05:00") -> Path:
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(textwrap.dedent(f"""\
- #+TITLE: T
- #+CONVERSATION_ID: {conv_id}
- #+MESSAGE_TYPE: {msg_type}
- #+SEQUENCE: {seq}
- #+TIMESTAMP: {timestamp}
- #+PROTOCOL_VERSION: {proto_version}
-
- Body.
- """))
- return path
-
-
-def _run(args: list[str], env: dict | None = None) -> subprocess.CompletedProcess:
- return subprocess.run([str(SCRIPT), *args], capture_output=True, text=True, env=env)
-
-
-@pytest.fixture
-def fake_projects(tmp_path, monkeypatch):
- """Create a fake ~/projects/<name>/inbox/from-agents/ tree under tmp_path."""
- home = tmp_path / "home"
- home.mkdir()
- monkeypatch.setenv("HOME", str(home))
- return home
-
-
-def test_status_help(fake_projects):
- result = _run(["--help"], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- assert "snapshot" in result.stdout.lower() or "pending" in result.stdout.lower()
-
-
-def test_status_no_projects_clean_output(fake_projects):
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- # Empty machine prints either header-only table or "no projects" — accept either.
- # No crash, no pending claims.
- assert "pending" in result.stdout.lower() or result.stdout.strip() == ""
-
-
-def test_status_one_pending_shows_up(fake_projects):
- inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- _make_msg(inbox / "20260427T100000Z-from-career-fixup.org", conv_id="fixup", seq=1)
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- assert "homelab" in result.stdout
- assert "1" in result.stdout # pending count
- assert "20260427T100000Z-from-career-fixup.org" in result.stdout
-
-
-def test_status_released_conversation_zero_pending(fake_projects):
- """A conversation with a release message in it counts as 0 pending."""
- inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- _make_msg(inbox / "20260427T100000Z-from-career-done.org", conv_id="done", seq=1)
- _make_msg(inbox / "20260427T100100Z-from-homelab-done.org", conv_id="done", seq=2, msg_type="release")
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- # Check the homelab row shows 0 pending.
- lines = [ln for ln in result.stdout.splitlines() if "homelab" in ln]
- # At least one homelab line should show 0 pending or "—".
- assert any("0" in ln or "—" in ln for ln in lines)
-
-
-def test_status_partial_release(fake_projects):
- """Conversation with release + a later message → that later message counts as pending."""
- inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- _make_msg(inbox / "20260427T100000Z-from-career-x.org", conv_id="x", seq=1,
- timestamp="2026-04-27T05:00:00-05:00")
- _make_msg(inbox / "20260427T100100Z-from-homelab-x.org", conv_id="x", seq=2, msg_type="release",
- timestamp="2026-04-27T05:01:00-05:00")
- # New message AFTER release: starts a fresh thread that's pending.
- _make_msg(inbox / "20260427T200000Z-from-career-x.org", conv_id="x", seq=3,
- timestamp="2026-04-27T15:00:00-05:00")
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- homelab_line = next(ln for ln in result.stdout.splitlines() if "homelab" in ln)
- assert "1" in homelab_line # the post-release message is pending
-
-
-def test_status_multiple_projects(fake_projects):
- inbox_a = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- inbox_b = fake_projects / "projects" / "career" / "inbox" / "from-agents"
- _make_msg(inbox_a / "20260427T100000Z-from-x-a.org", conv_id="a", seq=1)
- _make_msg(inbox_b / "20260427T100100Z-from-x-b.org", conv_id="b", seq=1)
- _make_msg(inbox_b / "20260427T100200Z-from-x-c.org", conv_id="c", seq=1)
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- # career has 2 pending, homelab has 1.
- career_line = next(ln for ln in result.stdout.splitlines() if "career" in ln)
- homelab_line = next(ln for ln in result.stdout.splitlines() if "homelab" in ln)
- assert "2" in career_line
- assert "1" in homelab_line
-
-
-def test_status_json_output(fake_projects):
- inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- _make_msg(inbox / "20260427T100000Z-from-career-test.org", conv_id="test", seq=1)
- result = _run(["--json"], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- payload = json.loads(result.stdout)
- assert "projects" in payload
- assert isinstance(payload["projects"], list)
- homelab = next((p for p in payload["projects"] if p["name"] == "homelab"), None)
- assert homelab is not None
- assert homelab["pending_count"] == 1
-
-
-def test_status_sort_pending_first(fake_projects):
- """Projects with pending messages sort before projects with 0."""
- (fake_projects / "projects" / "alpha" / "inbox" / "from-agents").mkdir(parents=True)
- inbox_zeta = fake_projects / "projects" / "zeta" / "inbox" / "from-agents"
- _make_msg(inbox_zeta / "20260427T100000Z-from-x-z.org", conv_id="z", seq=1)
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0
- lines = result.stdout.splitlines()
- zeta_idx = next(i for i, ln in enumerate(lines) if "zeta" in ln)
- alpha_idx = next(i for i, ln in enumerate(lines) if "alpha" in ln)
- assert zeta_idx < alpha_idx, "pending project should sort before zero-pending project"
-
-
-def test_status_halt_shows_banner(fake_projects):
- halt = fake_projects / ".config" / "cross-agent-comms" / "HALT"
- halt.parent.mkdir(parents=True)
- halt.write_text("halted for test")
- inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- _make_msg(inbox / "20260427T100000Z-from-x-x.org", conv_id="x", seq=1)
- result = _run([], env={**os.environ, "HOME": str(fake_projects)})
- assert result.returncode == 0 # status continues to print under HALT
- assert "HALT" in result.stdout
- # Banner should mention the reason.
- assert "halted for test" in result.stdout
-
-
-def test_status_projects_glob_override(fake_projects):
- inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents"
- _make_msg(inbox / "20260427T100000Z-from-x-a.org", conv_id="a", seq=1)
- other_inbox = fake_projects / "projects" / "career" / "inbox" / "from-agents"
- _make_msg(other_inbox / "20260427T100100Z-from-x-b.org", conv_id="b", seq=1)
- # Glob limits to homelab only.
- result = _run(
- ["--projects-glob", str(fake_projects / "projects" / "homelab" / "inbox" / "from-agents") + "/"],
- env={**os.environ, "HOME": str(fake_projects)},
- )
- assert result.returncode == 0
- assert "homelab" in result.stdout
- # career not in scope.
- assert "career" not in result.stdout