diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
| commit | d81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch) | |
| tree | 2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/tests | |
| parent | 201377f57430ef28d02e703a2191434bbee55c75 (diff) | |
| download | rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip | |
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/tests')
21 files changed, 2076 insertions, 0 deletions
diff --git a/.ai/scripts/tests/conftest.py b/.ai/scripts/tests/conftest.py new file mode 100644 index 0000000..8d965ab --- /dev/null +++ b/.ai/scripts/tests/conftest.py @@ -0,0 +1,77 @@ +"""Shared fixtures for EML extraction tests.""" + +import os +from email.message import EmailMessage +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +import pytest + + +@pytest.fixture +def fixtures_dir(): + """Return path to the fixtures/ directory.""" + return os.path.join(os.path.dirname(__file__), 'fixtures') + + +def make_plain_message(body="Test body", from_="Jonathan Smith <jsmith@example.com>", + to="Craig <craig@example.com>", + subject="Test Subject", + date="Wed, 05 Feb 2026 11:36:00 -0600"): + """Create an EmailMessage with text/plain body.""" + msg = EmailMessage() + msg['From'] = from_ + msg['To'] = to + msg['Subject'] = subject + msg['Date'] = date + msg.set_content(body) + return msg + + +def make_html_message(html_body="<p>Test body</p>", + from_="Jonathan Smith <jsmith@example.com>", + to="Craig <craig@example.com>", + subject="Test Subject", + date="Wed, 05 Feb 2026 11:36:00 -0600"): + """Create an EmailMessage with text/html body only.""" + msg = EmailMessage() + msg['From'] = from_ + msg['To'] = to + msg['Subject'] = subject + msg['Date'] = date + msg.set_content(html_body, subtype='html') + return msg + + +def make_message_with_attachment(body="Test body", + from_="Jonathan Smith <jsmith@example.com>", + to="Craig <craig@example.com>", + subject="Test Subject", + date="Wed, 05 Feb 2026 11:36:00 -0600", + attachment_filename="document.pdf", + attachment_content=b"fake pdf content"): + """Create a multipart message with a text body and one attachment.""" + msg = MIMEMultipart() + msg['From'] = from_ + msg['To'] = to + msg['Subject'] = subject + msg['Date'] = date + + msg.attach(MIMEText(body, 'plain')) + + att = MIMEApplication(attachment_content, Name=attachment_filename) + att['Content-Disposition'] = f'attachment; filename="{attachment_filename}"' + msg.attach(att) + + return msg + + +def add_received_headers(msg, headers): + """Add Received headers to an existing message. + + headers: list of header strings, added in order (first = most recent). + """ + for header in headers: + msg['Received'] = header + return msg diff --git a/.ai/scripts/tests/fixtures/duplicate-attachment-names.eml b/.ai/scripts/tests/fixtures/duplicate-attachment-names.eml new file mode 100644 index 0000000..827d4f0 --- /dev/null +++ b/.ai/scripts/tests/fixtures/duplicate-attachment-names.eml @@ -0,0 +1,36 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: Re: 4319 Danneel Street +Date: Mon, 27 Apr 2026 23:30:28 +0000 +MIME-Version: 1.0 +Content-Type: multipart/mixed; boundary="boundary123" + +--boundary123 +Content-Type: text/plain; charset="utf-8" +Content-Transfer-Encoding: 7bit + +Body with three inlined copies of the same signature image, mimicking +the way Outlook embeds a sender's signature once per quoted reply level. + +--boundary123 +Content-Type: image/png; name="Outlook-Ricci Part.png" +Content-Disposition: inline; filename="Outlook-Ricci Part.png" +Content-Transfer-Encoding: base64 + +aW1hZ2UtY29udGVudC0x + +--boundary123 +Content-Type: image/png; name="Outlook-Ricci Part.png" +Content-Disposition: inline; filename="Outlook-Ricci Part.png" +Content-Transfer-Encoding: base64 + +aW1hZ2UtY29udGVudC0y + +--boundary123 +Content-Type: image/png; name="Outlook-Ricci Part.png" +Content-Disposition: inline; filename="Outlook-Ricci Part.png" +Content-Transfer-Encoding: base64 + +aW1hZ2UtY29udGVudC0z + +--boundary123-- diff --git a/.ai/scripts/tests/fixtures/empty-body.eml b/.ai/scripts/tests/fixtures/empty-body.eml new file mode 100644 index 0000000..cf008df --- /dev/null +++ b/.ai/scripts/tests/fixtures/empty-body.eml @@ -0,0 +1,16 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: Empty Body Test +Date: Thu, 05 Feb 2026 11:36:00 -0600 +MIME-Version: 1.0 +Content-Type: multipart/mixed; boundary="boundary456" +Received: from mail-sender.example.com by mx.receiver.example.com with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600 + +--boundary456 +Content-Type: application/octet-stream; name="data.bin" +Content-Disposition: attachment; filename="data.bin" +Content-Transfer-Encoding: base64 + +AQIDBA== + +--boundary456-- diff --git a/.ai/scripts/tests/fixtures/html-only.eml b/.ai/scripts/tests/fixtures/html-only.eml new file mode 100644 index 0000000..4db7645 --- /dev/null +++ b/.ai/scripts/tests/fixtures/html-only.eml @@ -0,0 +1,20 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: HTML Update +Date: Thu, 05 Feb 2026 11:36:00 -0600 +MIME-Version: 1.0 +Content-Type: text/html; charset="utf-8" +Content-Transfer-Encoding: 7bit +Received: from mail-sender.example.com by mx.receiver.example.com with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600 + +<html> +<body> +<p>Hi Craig,</p> +<p>Here is the <strong>HTML</strong> update.</p> +<ul> +<li>Item one</li> +<li>Item two</li> +</ul> +<p>Best,<br>Jonathan</p> +</body> +</html> diff --git a/.ai/scripts/tests/fixtures/multiple-received-headers.eml b/.ai/scripts/tests/fixtures/multiple-received-headers.eml new file mode 100644 index 0000000..1b8d6a7 --- /dev/null +++ b/.ai/scripts/tests/fixtures/multiple-received-headers.eml @@ -0,0 +1,12 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: Multiple Received Headers Test +Date: Thu, 05 Feb 2026 11:36:00 -0600 +MIME-Version: 1.0 +Content-Type: text/plain; charset="utf-8" +Content-Transfer-Encoding: 7bit +Received: by internal.example.com with SMTP; Thu, 05 Feb 2026 11:36:10 -0600 +Received: from mail-sender.example.com by mx.receiver.example.com with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600 +Received: from originator.example.com by relay.example.com with SMTP; Thu, 05 Feb 2026 11:35:58 -0600 + +Test body with multiple received headers. diff --git a/.ai/scripts/tests/fixtures/no-received-headers.eml b/.ai/scripts/tests/fixtures/no-received-headers.eml new file mode 100644 index 0000000..8a05dc7 --- /dev/null +++ b/.ai/scripts/tests/fixtures/no-received-headers.eml @@ -0,0 +1,9 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: No Received Headers +Date: Thu, 05 Feb 2026 11:36:00 -0600 +MIME-Version: 1.0 +Content-Type: text/plain; charset="utf-8" +Content-Transfer-Encoding: 7bit + +Test body with no received headers at all. diff --git a/.ai/scripts/tests/fixtures/plain-text.eml b/.ai/scripts/tests/fixtures/plain-text.eml new file mode 100644 index 0000000..8cc9d9c --- /dev/null +++ b/.ai/scripts/tests/fixtures/plain-text.eml @@ -0,0 +1,15 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: Re: Fw: 4319 Danneel Street +Date: Thu, 05 Feb 2026 11:36:00 -0600 +MIME-Version: 1.0 +Content-Type: text/plain; charset="utf-8" +Content-Transfer-Encoding: 7bit +Received: from mail-sender.example.com by mx.receiver.example.com with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600 + +Hi Craig, + +Here is the update on 4319 Danneel Street. + +Best, +Jonathan diff --git a/.ai/scripts/tests/fixtures/with-attachment.eml b/.ai/scripts/tests/fixtures/with-attachment.eml new file mode 100644 index 0000000..ac49c5d --- /dev/null +++ b/.ai/scripts/tests/fixtures/with-attachment.eml @@ -0,0 +1,27 @@ +From: Jonathan Smith <jsmith@example.com> +To: Craig Jennings <craig@example.com> +Subject: Ltr from Carrollton +Date: Thu, 05 Feb 2026 11:36:00 -0600 +MIME-Version: 1.0 +Content-Type: multipart/mixed; boundary="boundary123" +Received: from mail-sender.example.com by mx.receiver.example.com with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600 + +--boundary123 +Content-Type: text/plain; charset="utf-8" +Content-Transfer-Encoding: 7bit + +Hi Craig, + +Please find the letter attached. + +Best, +Jonathan + +--boundary123 +Content-Type: application/octet-stream; name="Ltr Carrollton.pdf" +Content-Disposition: attachment; filename="Ltr Carrollton.pdf" +Content-Transfer-Encoding: base64 + +ZmFrZSBwZGYgY29udGVudA== + +--boundary123-- diff --git a/.ai/scripts/tests/test_cross_agent_discover.py b/.ai/scripts/tests/test_cross_agent_discover.py new file mode 100644 index 0000000..f0d2bb7 --- /dev/null +++ b/.ai/scripts/tests/test_cross_agent_discover.py @@ -0,0 +1,204 @@ +"""Tests for cross-agent-discover (TDD: tests written before implementation).""" + +from __future__ import annotations + +import json +import os +import subprocess +import textwrap +from pathlib import Path + +import pytest + +SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-discover" + + +def _run(args: list[str], env: dict | None = None) -> subprocess.CompletedProcess: + return subprocess.run([str(SCRIPT), *args], capture_output=True, text=True, env=env) + + +@pytest.fixture +def fake_home(tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + monkeypatch.setenv("HOME", str(home)) + return home + + +def _make_project(home: Path, name: str) -> Path: + proj = home / "projects" / name + (proj / ".ai").mkdir(parents=True) + return proj + + +def _write_peers_toml(home: Path, content: str) -> Path: + cfg = home / ".config" / "cross-agent-comms" + cfg.mkdir(parents=True, exist_ok=True) + peers = cfg / "peers.toml" + peers.write_text(content) + return peers + + +def test_discover_help(fake_home): + result = _run(["--help"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + assert "discover" in result.stdout.lower() or "enumerate" in result.stdout.lower() + + +def test_discover_local_only_no_projects(fake_home): + """Empty home → reports zero local projects, zero peers.""" + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + # No crash; mentions local somehow. + assert "local" in result.stdout.lower() or "0 project" in result.stdout.lower() + + +def test_discover_lists_local_projects(fake_home): + _make_project(fake_home, "homelab") + _make_project(fake_home, "career") + _make_project(fake_home, "claude-templates") + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + assert "homelab" in result.stdout + assert "career" in result.stdout + assert "claude-templates" in result.stdout + + +def test_discover_excludes_dirs_without_ai_subdir(fake_home): + """Directories under ~/projects/ that lack .ai/ are NOT projects.""" + _make_project(fake_home, "real-project") + (fake_home / "projects" / "not-a-project").mkdir(parents=True) + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + assert "real-project" in result.stdout + assert "not-a-project" not in result.stdout + + +def test_discover_no_peers_toml_just_local(fake_home): + _make_project(fake_home, "homelab") + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + # No peers section since no toml. + assert "homelab" in result.stdout + + +def test_discover_lists_peers_from_toml(fake_home): + _write_peers_toml(fake_home, textwrap.dedent("""\ + [peers.velox] + host = "velox" + ssh_user = "cjennings" + + [peers.bastion] + host = "bastion.local" + ssh_user = "cjennings" + """)) + _make_project(fake_home, "homelab") + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + assert "velox" in result.stdout + assert "bastion" in result.stdout + + +def test_discover_malformed_peers_toml_errors_clearly(fake_home): + _write_peers_toml(fake_home, "not valid toml at all = = =") + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode != 0 + assert "peers.toml" in result.stderr or "TOML" in result.stderr or "parse" in result.stderr.lower() + + +def test_discover_json_output_schema(fake_home): + _make_project(fake_home, "homelab") + _make_project(fake_home, "career") + _write_peers_toml(fake_home, textwrap.dedent("""\ + [peers.velox] + host = "velox" + """)) + result = _run(["--json", "--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + payload = json.loads(result.stdout) + assert "local" in payload + assert "peers" in payload + assert isinstance(payload["local"], list) + assert isinstance(payload["peers"], list) + assert "homelab" in payload["local"] + assert "career" in payload["local"] + velox = next((p for p in payload["peers"] if p["name"] == "velox"), None) + assert velox is not None + # Reachability is a key — value depends on actual SSH state. + assert "reachable" in velox + + +def test_discover_peer_scope(fake_home): + _write_peers_toml(fake_home, textwrap.dedent("""\ + [peers.velox] + host = "velox" + + [peers.bastion] + host = "bastion.local" + """)) + result = _run(["--peer", "velox", "--no-cache", "--json"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + payload = json.loads(result.stdout) + peer_names = [p["name"] for p in payload["peers"]] + assert "velox" in peer_names + assert "bastion" not in peer_names + + +def test_discover_unreachable_peer_marked(fake_home): + """A peer with a definitely-unreachable host gets reachable=False.""" + _write_peers_toml(fake_home, textwrap.dedent("""\ + [peers.bogus] + host = "definitely-not-a-real-host.invalid" + ssh_user = "nobody" + """)) + result = _run(["--no-cache", "--json"], env={**os.environ, "HOME": str(fake_home)}, ) + assert result.returncode == 0 + payload = json.loads(result.stdout) + bogus = next((p for p in payload["peers"] if p["name"] == "bogus"), None) + assert bogus is not None + assert bogus["reachable"] is False + + +def test_discover_cache_hit_within_window(fake_home): + """Second invocation within 5 min reads cache (skip the SSH probe).""" + _make_project(fake_home, "homelab") + # First call populates cache. + result1 = _run(["--json"], env={**os.environ, "HOME": str(fake_home)}) + assert result1.returncode == 0 + cache = fake_home / ".cache" / "cross-agent-comms" / "discovery.json" + assert cache.exists() + # Tamper with the cache to a marker only the cache path can produce. + payload = json.loads(cache.read_text()) + payload["_test_marker"] = True + cache.write_text(json.dumps(payload)) + # Second call (no --no-cache) should return the tampered payload. + result2 = _run(["--json"], env={**os.environ, "HOME": str(fake_home)}) + assert result2.returncode == 0 + payload2 = json.loads(result2.stdout) + assert payload2.get("_test_marker") is True + + +def test_discover_no_cache_flag_bypasses(fake_home): + """--no-cache ignores even a fresh cache.""" + _make_project(fake_home, "homelab") + cache_dir = fake_home / ".cache" / "cross-agent-comms" + cache_dir.mkdir(parents=True) + cache_dir.joinpath("discovery.json").write_text(json.dumps({ + "_test_marker": True, "local": [], "peers": [] + })) + result = _run(["--no-cache", "--json"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 + payload = json.loads(result.stdout) + # Cache marker should NOT appear in fresh result. + assert payload.get("_test_marker") is None or payload.get("_test_marker") is False + assert "homelab" in payload["local"] + + +def test_discover_halt_shows_banner(fake_home): + halt = fake_home / ".config" / "cross-agent-comms" / "HALT" + halt.parent.mkdir(parents=True) + halt.write_text("halted") + _make_project(fake_home, "homelab") + result = _run(["--no-cache"], env={**os.environ, "HOME": str(fake_home)}) + assert result.returncode == 0 # discover continues to print under HALT + assert "HALT" in result.stdout diff --git a/.ai/scripts/tests/test_cross_agent_halt.py b/.ai/scripts/tests/test_cross_agent_halt.py new file mode 100644 index 0000000..f8bf0b3 --- /dev/null +++ b/.ai/scripts/tests/test_cross_agent_halt.py @@ -0,0 +1,204 @@ +"""Tests for cross-agent-halt and cross-agent-resume (TDD).""" + +from __future__ import annotations + +import os +import subprocess +import textwrap +from pathlib import Path + +import pytest + +HALT_SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-halt" +RESUME_SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-resume" + + +def _run(script: Path, args: list[str], env: dict | None = None) -> subprocess.CompletedProcess: + return subprocess.run([str(script), *args], capture_output=True, text=True, env=env) + + +@pytest.fixture +def isolated_env(tmp_path, monkeypatch): + """Isolated HOME + a fake systemctl that records calls without acting.""" + fake_home = tmp_path / "home" + fake_home.mkdir() + fake_bin = tmp_path / "bin" + fake_bin.mkdir() + # Fake systemctl: no-op, exit 0. + fake_systemctl = fake_bin / "systemctl" + fake_systemctl.write_text("#!/usr/bin/env bash\nexit 0\n") + fake_systemctl.chmod(0o755) + # Fake ssh: succeed only for known-good host. + fake_ssh = fake_bin / "ssh" + fake_ssh.write_text(textwrap.dedent("""\ + #!/usr/bin/env bash + # Find the destination arg (skip flags). + target="" + for arg in "$@"; do + case "$arg" in + -*|*=*) ;; + *@*|localhost|*.local|*.invalid) target="$arg"; break ;; + *) target="$arg"; break ;; + esac + done + case "$target" in + *invalid*|*unreachable*) exit 255 ;; + *) exit 0 ;; + esac + """)) + fake_ssh.chmod(0o755) + + monkeypatch.setenv("HOME", str(fake_home)) + # Prepend our fake bin so systemctl + ssh are intercepted, but keep real /bin etc. + monkeypatch.setenv("PATH", f"{fake_bin}:{os.environ.get('PATH', '')}") + return fake_home + + +# ---- cross-agent-halt ---- + + +def test_halt_help(isolated_env): + result = _run(HALT_SCRIPT, ["--help"], env={**os.environ, "HOME": str(isolated_env), + "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + assert "halt" in result.stdout.lower() + + +def test_halt_creates_halt_file(isolated_env): + halt_file = isolated_env / ".config" / "cross-agent-comms" / "HALT" + assert not halt_file.exists() + result = _run(HALT_SCRIPT, [], env={**os.environ, "HOME": str(isolated_env), + "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + assert halt_file.exists() + + +def test_halt_with_reason_writes_body(isolated_env): + result = _run(HALT_SCRIPT, ["pausing for incident review"], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + halt_file = isolated_env / ".config" / "cross-agent-comms" / "HALT" + assert halt_file.exists() + assert "pausing for incident review" in halt_file.read_text() + + +def test_halt_idempotent(isolated_env): + """Running halt twice doesn't error.""" + halt_file = isolated_env / ".config" / "cross-agent-comms" / "HALT" + r1 = _run(HALT_SCRIPT, [], env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert r1.returncode == 0 + assert halt_file.exists() + r2 = _run(HALT_SCRIPT, [], env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert r2.returncode == 0 + assert halt_file.exists() + + +def test_halt_does_not_pkill(isolated_env): + """Per design: halt does NOT call pkill. Verify by checking no pkill process gets launched.""" + # Replace pkill in PATH with something that fails loudly so we'd see if halt invoked it. + fake_bin = isolated_env.parent / "bin" + pkill = fake_bin / "pkill" + pkill.write_text("#!/usr/bin/env bash\necho 'PKILL CALLED' >&2\nexit 99\n") + pkill.chmod(0o755) + result = _run(HALT_SCRIPT, [], env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + assert "PKILL CALLED" not in result.stderr + + +def test_halt_tailnet_reports_per_peer(isolated_env): + """--tailnet iterates peers.toml and reports per-peer status.""" + cfg = isolated_env / ".config" / "cross-agent-comms" + cfg.mkdir(parents=True) + (cfg / "peers.toml").write_text(textwrap.dedent("""\ + [peers.velox] + host = "velox" + ssh_user = "cjennings" + + [peers.bogus] + host = "definitely-unreachable.invalid" + ssh_user = "cjennings" + """)) + result = _run(HALT_SCRIPT, ["--tailnet"], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + # Partial halt → exit 1. + assert result.returncode == 1 + assert "velox" in result.stdout + assert "bogus" in result.stdout + # ✓ marker for velox, ✗ for bogus. + assert "✓" in result.stdout + assert "✗" in result.stdout + assert "PARTIAL" in result.stdout or "partial" in result.stdout.lower() + + +def test_halt_tailnet_all_reachable_exits_zero(isolated_env): + cfg = isolated_env / ".config" / "cross-agent-comms" + cfg.mkdir(parents=True) + (cfg / "peers.toml").write_text(textwrap.dedent("""\ + [peers.velox] + host = "velox" + ssh_user = "cjennings" + """)) + result = _run(HALT_SCRIPT, ["--tailnet"], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + assert "velox" in result.stdout + + +# ---- cross-agent-resume ---- + + +def test_resume_help(isolated_env): + result = _run(RESUME_SCRIPT, ["--help"], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + assert "resume" in result.stdout.lower() + + +def test_resume_removes_halt_file(isolated_env): + halt_file = isolated_env / ".config" / "cross-agent-comms" / "HALT" + halt_file.parent.mkdir(parents=True) + halt_file.write_text("halted") + assert halt_file.exists() + result = _run(RESUME_SCRIPT, [], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + assert not halt_file.exists() + + +def test_resume_when_no_halt_active_succeeds(isolated_env): + """No HALT to clear is not an error.""" + result = _run(RESUME_SCRIPT, [], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + + +def test_resume_prints_per_session_instructions(isolated_env): + """Resume must surface that polling does NOT auto-resume.""" + halt_file = isolated_env / ".config" / "cross-agent-comms" / "HALT" + halt_file.parent.mkdir(parents=True) + halt_file.write_text("halted") + result = _run(RESUME_SCRIPT, [], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 0 + out = result.stdout.lower() + assert "polling" in out + assert "auto" in out or "explicit" in out or "session" in out + + +def test_resume_tailnet_partial_failure_exit_1(isolated_env): + cfg = isolated_env / ".config" / "cross-agent-comms" + cfg.mkdir(parents=True) + (cfg / "peers.toml").write_text(textwrap.dedent("""\ + [peers.velox] + host = "velox" + + [peers.bogus] + host = "unreachable-host.invalid" + """)) + halt_file = cfg / "HALT" + halt_file.write_text("halted") + result = _run(RESUME_SCRIPT, ["--tailnet"], + env={**os.environ, "HOME": str(isolated_env), "PATH": os.environ["PATH"]}) + assert result.returncode == 1 + assert "velox" in result.stdout + assert "bogus" in result.stdout diff --git a/.ai/scripts/tests/test_cross_agent_recv.py b/.ai/scripts/tests/test_cross_agent_recv.py new file mode 100644 index 0000000..27c53a5 --- /dev/null +++ b/.ai/scripts/tests/test_cross_agent_recv.py @@ -0,0 +1,176 @@ +"""Tests for cross-agent-recv.""" + +from __future__ import annotations + +import json +import os +import subprocess +from pathlib import Path + +import pytest + +SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-recv" + + +def _make_message(path: Path, *, conv_id: str = "test-conv", seq: int = 1, msg_type: str = "request", + proto_version: str = "5", title: str = "Test", requires_tools: str | None = None, + body: str = "Body.\n") -> Path: + fm_lines = [ + f"#+TITLE: {title}", + f"#+CONVERSATION_ID: {conv_id}", + f"#+MESSAGE_TYPE: {msg_type}", + f"#+SEQUENCE: {seq}", + "#+TIMESTAMP: 2026-04-27T05:00:00-05:00", + f"#+PROTOCOL_VERSION: {proto_version}", + ] + if requires_tools: + fm_lines.append(f"#+REQUIRES_TOOLS: {requires_tools}") + path.write_text("\n".join(fm_lines) + "\n\n" + body) + return path + + +def _run(args: list[str], env: dict | None = None) -> subprocess.CompletedProcess: + return subprocess.run([str(SCRIPT), *args], capture_output=True, text=True, env=env) + + +@pytest.fixture +def isolated_env(tmp_path, monkeypatch): + fake_home = tmp_path / "home" + fake_home.mkdir() + monkeypatch.setenv("HOME", str(fake_home)) + return fake_home + + +def test_recv_help(isolated_env): + result = _run(["--help"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 0 + assert "Receive and decide" in result.stdout + + +def test_recv_missing_file_rejects(isolated_env, tmp_path): + result = _run([str(tmp_path / "nope.org")], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 3 # reject + + +def test_recv_malformed_frontmatter_rejects(isolated_env, tmp_path): + bad = tmp_path / "bad.org" + bad.write_text("not org-mode at all\n") + result = _run([str(bad), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 3 + assert "decision: reject" in result.stdout + + +def test_recv_missing_required_field_rejects(isolated_env, tmp_path): + msg = tmp_path / "msg.org" + # Missing PROTOCOL_VERSION among others. + msg.write_text("#+TITLE: x\n#+CONVERSATION_ID: c\n\nBody.\n") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 3 + assert "missing required" in result.stdout + + +def test_recv_protocol_version_mismatch_query(isolated_env, tmp_path): + msg = _make_message(tmp_path / "msg.org", proto_version="4") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 2 # query + assert "PROTOCOL_VERSION mismatch" in result.stdout + + +def test_recv_invalid_message_type_rejects(isolated_env, tmp_path): + msg = _make_message(tmp_path / "msg.org", msg_type="banana") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 3 + assert "invalid MESSAGE_TYPE" in result.stdout + + +def test_recv_missing_signature_rejects(isolated_env, tmp_path): + """When verify is on, a missing .asc sibling rejects.""" + msg = _make_message(tmp_path / "msg.org") + # No .asc sidecar. + result = _run([str(msg)], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 3 + assert "signature file missing" in result.stdout + + +def test_recv_valid_processes(isolated_env, tmp_path): + """A valid message with --no-verify and no dedup match → process.""" + msg = _make_message(tmp_path / "msg.org") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 0 # process + assert "decision: process" in result.stdout + assert "sha256:" in result.stdout + + +def test_recv_dedup_against_identical_existing(isolated_env, tmp_path): + """Same content + same SEQUENCE in same dir → dedup.""" + inbox = tmp_path / "inbox" + inbox.mkdir() + first = _make_message(inbox / "20260427T100000Z-from-x-c.org", conv_id="c", seq=5) + # Second message with same content — name differs (canonical-style would have different timestamp). + second = _make_message(inbox / "20260427T100100Z-from-x-c.org", conv_id="c", seq=5) + # Bodies must be byte-identical for hash equality. + second.write_bytes(first.read_bytes()) + result = _run([str(second), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 1 # dedup + assert "decision: dedup" in result.stdout + + +def test_recv_collision_with_different_content_processes(isolated_env, tmp_path): + """Same SEQUENCE + same CONVERSATION_ID but different content → process both.""" + inbox = tmp_path / "inbox" + inbox.mkdir() + _make_message(inbox / "20260427T100000Z-from-x-c.org", conv_id="c", seq=5, body="First body.\n") + second = _make_message(inbox / "20260427T100100Z-from-x-c.org", conv_id="c", seq=5, body="Different body.\n") + result = _run([str(second), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 0 # process + assert "decision: process" in result.stdout + + +def test_recv_requires_tools_missing_query(isolated_env, tmp_path): + """REQUIRES_TOOLS naming a definitely-missing binary → query.""" + msg = _make_message(tmp_path / "msg.org", requires_tools="definitely-not-installed-xyzzy-9000") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 2 # query + assert "required tools unavailable" in result.stdout + + +def test_recv_requires_tools_present_processes(isolated_env, tmp_path): + """REQUIRES_TOOLS naming a real binary → process.""" + msg = _make_message(tmp_path / "msg.org", requires_tools="ls,cat") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 0 + assert "decision: process" in result.stdout + + +def test_recv_json_output(isolated_env, tmp_path): + msg = _make_message(tmp_path / "msg.org") + result = _run([str(msg), "--no-verify", "--json"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 0 + payload = json.loads(result.stdout) + assert payload["decision"] == "process" + assert payload["message_type"] == "request" + assert payload["conversation_id"] == "test-conv" + + +def test_recv_halt_blocks(isolated_env, tmp_path): + halt = isolated_env / ".config" / "cross-agent-comms" / "HALT" + halt.parent.mkdir(parents=True) + halt.write_text("halted\n") + msg = _make_message(tmp_path / "msg.org") + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 5 + assert "halt active" in result.stderr.lower() + + +def test_recv_halt_leaves_message_in_place(isolated_env, tmp_path): + """Per spec: under HALT, recv must NOT move/dedup/reject — leave file in place.""" + halt = isolated_env / ".config" / "cross-agent-comms" / "HALT" + halt.parent.mkdir(parents=True) + halt.write_text("halted\n") + msg = _make_message(tmp_path / "msg.org") + pre_content = msg.read_text() + result = _run([str(msg), "--no-verify"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 5 + # File still exists with same content. + assert msg.exists() + assert msg.read_text() == pre_content diff --git a/.ai/scripts/tests/test_cross_agent_send.py b/.ai/scripts/tests/test_cross_agent_send.py new file mode 100644 index 0000000..f716e95 --- /dev/null +++ b/.ai/scripts/tests/test_cross_agent_send.py @@ -0,0 +1,210 @@ +"""Tests for cross-agent-send. + +Subprocess-based: treat the script as a black-box CLI and assert on its +exit codes, stdout, and the files it produces. +""" + +from __future__ import annotations + +import os +import subprocess +import textwrap +from pathlib import Path + +import pytest + +SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-send" + + +def _make_message(tmp_path: Path, conv_id: str = "test-conv", seq: int = 1, msg_type: str = "request", + proto_version: str = "5") -> Path: + msg = tmp_path / "msg.org" + msg.write_text(textwrap.dedent(f"""\ + #+TITLE: Test message + #+CONVERSATION_ID: {conv_id} + #+MESSAGE_TYPE: {msg_type} + #+SEQUENCE: {seq} + #+TIMESTAMP: 2026-04-27T05:00:00-05:00 + #+PROTOCOL_VERSION: {proto_version} + + Body. + """)) + return msg + + +def _run(args: list[str], env: dict | None = None, cwd: Path | None = None) -> subprocess.CompletedProcess: + return subprocess.run( + [str(SCRIPT), *args], + capture_output=True, + text=True, + env=env, + cwd=cwd, + ) + + +@pytest.fixture +def isolated_env(tmp_path, monkeypatch): + """Redirect HOME so peers.toml, HALT, marker files are scoped to the test.""" + fake_home = tmp_path / "home" + fake_home.mkdir() + monkeypatch.setenv("HOME", str(fake_home)) + # Pre-create projects/ so derive_sender_project has somewhere to look. + (fake_home / "projects" / "homelab").mkdir(parents=True) + return fake_home + + +def test_send_help(isolated_env): + """--help works without side effects.""" + result = _run(["--help"], env={**os.environ, "HOME": str(isolated_env)}) + assert result.returncode == 0 + assert "Send a cross-agent message" in result.stdout + + +def test_send_missing_message_file(isolated_env): + """Nonexistent message file returns general error.""" + import socket + machine = socket.gethostname().split(".")[0] + result = _run( + [f"{machine}.homelab", str(isolated_env / "nonexistent.org")], + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 1 + assert "not found" in result.stderr.lower() + + +def test_send_invalid_destination_format(isolated_env, tmp_path): + """Destination without . returns dest-not-found exit code.""" + msg = _make_message(tmp_path) + result = _run( + ["bogus", str(msg)], + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 2 + assert "<machine>.<project>" in result.stderr or "destination" in result.stderr.lower() + + +def test_send_dest_not_in_peers(isolated_env, tmp_path): + """Cross-machine destination with no peers.toml entry exits 2.""" + msg = _make_message(tmp_path) + result = _run( + ["unknownmachine.homelab", str(msg)], + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 2 + assert "not found in peers" in result.stderr + + +def test_send_frontmatter_missing_required(isolated_env, tmp_path): + """Message missing required fields exits 4.""" + bad = tmp_path / "bad.org" + bad.write_text("#+TITLE: nope\n\nBody.\n") + import socket + machine = socket.gethostname().split(".")[0] + result = _run( + [f"{machine}.homelab", str(bad)], + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 4 + assert "missing required fields" in result.stderr + + +def test_send_invalid_message_type(isolated_env, tmp_path): + """Unknown MESSAGE_TYPE exits 4.""" + msg = _make_message(tmp_path, msg_type="frobnicate") + import socket + machine = socket.gethostname().split(".")[0] + result = _run( + [f"{machine}.homelab", str(msg)], + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 4 + assert "MESSAGE_TYPE" in result.stderr + + +def test_send_halt_blocks(isolated_env, tmp_path): + """When HALT exists, send refuses with exit 5.""" + halt = isolated_env / ".config" / "cross-agent-comms" / "HALT" + halt.parent.mkdir(parents=True) + halt.write_text("test halt\n") + msg = _make_message(tmp_path) + import socket + machine = socket.gethostname().split(".")[0] + result = _run( + [f"{machine}.homelab", str(msg)], + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 5 + assert "halt active" in result.stderr.lower() + + +def test_send_same_machine_no_sign_delivers(isolated_env, tmp_path): + """Same-machine delivery with --no-sign produces a canonically named file.""" + msg = _make_message(tmp_path, conv_id="my-conv") + import socket + machine = socket.gethostname().split(".")[0] + # Sender is derived from CWD walking up to ~/projects/<name>/ + cwd = isolated_env / "projects" / "homelab" + result = _run( + [f"{machine}.homelab", str(msg), "--no-sign"], + env={**os.environ, "HOME": str(isolated_env)}, + cwd=cwd, + ) + assert result.returncode == 0, f"stderr={result.stderr}" + inbox = isolated_env / "projects" / "homelab" / "inbox" / "from-agents" + files = list(inbox.glob("*-from-homelab-my-conv.org")) + assert len(files) == 1 + # No sig file with --no-sign. + assert not list(inbox.glob("*.asc")) + # Canonical filename pattern. + assert files[0].name.startswith("2026") and files[0].name.endswith("-from-homelab-my-conv.org") + + +def test_send_same_machine_signed_writes_asc(isolated_env, tmp_path): + """Signed delivery writes both .org and .asc.""" + msg = _make_message(tmp_path, conv_id="signed-conv") + import socket + machine = socket.gethostname().split(".")[0] + cwd = isolated_env / "projects" / "homelab" + # Use the real GPG keyring (not isolating GPG — Craig's existing keys are fine for tests). + real_env = {**os.environ, "HOME": str(isolated_env), "GNUPGHOME": str(Path.home() / ".gnupg")} + result = _run( + [f"{machine}.homelab", str(msg)], + env=real_env, + cwd=cwd, + ) + if result.returncode != 0: + pytest.skip(f"GPG signing unavailable in this environment: {result.stderr}") + inbox = isolated_env / "projects" / "homelab" / "inbox" / "from-agents" + org_files = list(inbox.glob("*-from-homelab-signed-conv.org")) + asc_files = list(inbox.glob("*-from-homelab-signed-conv.org.asc")) + assert len(org_files) == 1 + assert len(asc_files) == 1 + + +def test_send_filename_ignores_input_basename(isolated_env, tmp_path): + """User's input filename is ignored; canonical filename is generated.""" + weird = tmp_path / "weird-user-name.org" + weird.write_text(textwrap.dedent("""\ + #+TITLE: Title + #+CONVERSATION_ID: ignored-input + #+MESSAGE_TYPE: request + #+SEQUENCE: 1 + #+TIMESTAMP: 2026-04-27T05:00:00-05:00 + #+PROTOCOL_VERSION: 5 + + Body. + """)) + import socket + machine = socket.gethostname().split(".")[0] + cwd = isolated_env / "projects" / "homelab" + result = _run( + [f"{machine}.homelab", str(weird), "--no-sign"], + env={**os.environ, "HOME": str(isolated_env)}, + cwd=cwd, + ) + assert result.returncode == 0 + inbox = isolated_env / "projects" / "homelab" / "inbox" / "from-agents" + # No file named after the user's input. + assert not (inbox / "weird-user-name.org").exists() + # Canonical naming used. + assert list(inbox.glob("*-from-homelab-ignored-input.org")) diff --git a/.ai/scripts/tests/test_cross_agent_status.py b/.ai/scripts/tests/test_cross_agent_status.py new file mode 100644 index 0000000..bb5b8ba --- /dev/null +++ b/.ai/scripts/tests/test_cross_agent_status.py @@ -0,0 +1,165 @@ +"""Tests for cross-agent-status (TDD: tests written before implementation).""" + +from __future__ import annotations + +import json +import os +import subprocess +import textwrap +from pathlib import Path + +import pytest + +SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-status" + + +def _make_msg(path: Path, *, conv_id: str, seq: int, msg_type: str = "request", + proto_version: str = "5", timestamp: str = "2026-04-27T05:00:00-05:00") -> Path: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(textwrap.dedent(f"""\ + #+TITLE: T + #+CONVERSATION_ID: {conv_id} + #+MESSAGE_TYPE: {msg_type} + #+SEQUENCE: {seq} + #+TIMESTAMP: {timestamp} + #+PROTOCOL_VERSION: {proto_version} + + Body. + """)) + return path + + +def _run(args: list[str], env: dict | None = None) -> subprocess.CompletedProcess: + return subprocess.run([str(SCRIPT), *args], capture_output=True, text=True, env=env) + + +@pytest.fixture +def fake_projects(tmp_path, monkeypatch): + """Create a fake ~/projects/<name>/inbox/from-agents/ tree under tmp_path.""" + home = tmp_path / "home" + home.mkdir() + monkeypatch.setenv("HOME", str(home)) + return home + + +def test_status_help(fake_projects): + result = _run(["--help"], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + assert "snapshot" in result.stdout.lower() or "pending" in result.stdout.lower() + + +def test_status_no_projects_clean_output(fake_projects): + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + # Empty machine prints either header-only table or "no projects" — accept either. + # No crash, no pending claims. + assert "pending" in result.stdout.lower() or result.stdout.strip() == "" + + +def test_status_one_pending_shows_up(fake_projects): + inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + _make_msg(inbox / "20260427T100000Z-from-career-fixup.org", conv_id="fixup", seq=1) + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + assert "homelab" in result.stdout + assert "1" in result.stdout # pending count + assert "20260427T100000Z-from-career-fixup.org" in result.stdout + + +def test_status_released_conversation_zero_pending(fake_projects): + """A conversation with a release message in it counts as 0 pending.""" + inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + _make_msg(inbox / "20260427T100000Z-from-career-done.org", conv_id="done", seq=1) + _make_msg(inbox / "20260427T100100Z-from-homelab-done.org", conv_id="done", seq=2, msg_type="release") + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + # Check the homelab row shows 0 pending. + lines = [ln for ln in result.stdout.splitlines() if "homelab" in ln] + # At least one homelab line should show 0 pending or "—". + assert any("0" in ln or "—" in ln for ln in lines) + + +def test_status_partial_release(fake_projects): + """Conversation with release + a later message → that later message counts as pending.""" + inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + _make_msg(inbox / "20260427T100000Z-from-career-x.org", conv_id="x", seq=1, + timestamp="2026-04-27T05:00:00-05:00") + _make_msg(inbox / "20260427T100100Z-from-homelab-x.org", conv_id="x", seq=2, msg_type="release", + timestamp="2026-04-27T05:01:00-05:00") + # New message AFTER release: starts a fresh thread that's pending. + _make_msg(inbox / "20260427T200000Z-from-career-x.org", conv_id="x", seq=3, + timestamp="2026-04-27T15:00:00-05:00") + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + homelab_line = next(ln for ln in result.stdout.splitlines() if "homelab" in ln) + assert "1" in homelab_line # the post-release message is pending + + +def test_status_multiple_projects(fake_projects): + inbox_a = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + inbox_b = fake_projects / "projects" / "career" / "inbox" / "from-agents" + _make_msg(inbox_a / "20260427T100000Z-from-x-a.org", conv_id="a", seq=1) + _make_msg(inbox_b / "20260427T100100Z-from-x-b.org", conv_id="b", seq=1) + _make_msg(inbox_b / "20260427T100200Z-from-x-c.org", conv_id="c", seq=1) + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + # career has 2 pending, homelab has 1. + career_line = next(ln for ln in result.stdout.splitlines() if "career" in ln) + homelab_line = next(ln for ln in result.stdout.splitlines() if "homelab" in ln) + assert "2" in career_line + assert "1" in homelab_line + + +def test_status_json_output(fake_projects): + inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + _make_msg(inbox / "20260427T100000Z-from-career-test.org", conv_id="test", seq=1) + result = _run(["--json"], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + payload = json.loads(result.stdout) + assert "projects" in payload + assert isinstance(payload["projects"], list) + homelab = next((p for p in payload["projects"] if p["name"] == "homelab"), None) + assert homelab is not None + assert homelab["pending_count"] == 1 + + +def test_status_sort_pending_first(fake_projects): + """Projects with pending messages sort before projects with 0.""" + (fake_projects / "projects" / "alpha" / "inbox" / "from-agents").mkdir(parents=True) + inbox_zeta = fake_projects / "projects" / "zeta" / "inbox" / "from-agents" + _make_msg(inbox_zeta / "20260427T100000Z-from-x-z.org", conv_id="z", seq=1) + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 + lines = result.stdout.splitlines() + zeta_idx = next(i for i, ln in enumerate(lines) if "zeta" in ln) + alpha_idx = next(i for i, ln in enumerate(lines) if "alpha" in ln) + assert zeta_idx < alpha_idx, "pending project should sort before zero-pending project" + + +def test_status_halt_shows_banner(fake_projects): + halt = fake_projects / ".config" / "cross-agent-comms" / "HALT" + halt.parent.mkdir(parents=True) + halt.write_text("halted for test") + inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + _make_msg(inbox / "20260427T100000Z-from-x-x.org", conv_id="x", seq=1) + result = _run([], env={**os.environ, "HOME": str(fake_projects)}) + assert result.returncode == 0 # status continues to print under HALT + assert "HALT" in result.stdout + # Banner should mention the reason. + assert "halted for test" in result.stdout + + +def test_status_projects_glob_override(fake_projects): + inbox = fake_projects / "projects" / "homelab" / "inbox" / "from-agents" + _make_msg(inbox / "20260427T100000Z-from-x-a.org", conv_id="a", seq=1) + other_inbox = fake_projects / "projects" / "career" / "inbox" / "from-agents" + _make_msg(other_inbox / "20260427T100100Z-from-x-b.org", conv_id="b", seq=1) + # Glob limits to homelab only. + result = _run( + ["--projects-glob", str(fake_projects / "projects" / "homelab" / "inbox" / "from-agents") + "/"], + env={**os.environ, "HOME": str(fake_projects)}, + ) + assert result.returncode == 0 + assert "homelab" in result.stdout + # career not in scope. + assert "career" not in result.stdout diff --git a/.ai/scripts/tests/test_cross_agent_watch.py b/.ai/scripts/tests/test_cross_agent_watch.py new file mode 100644 index 0000000..417cc19 --- /dev/null +++ b/.ai/scripts/tests/test_cross_agent_watch.py @@ -0,0 +1,155 @@ +"""Tests for cross-agent-watch. + +Black-box: spawn the script, drop files into a watched dir, read the log. +Tests use --no-notify to avoid firing real desktop notifications. +""" + +from __future__ import annotations + +import os +import subprocess +import time +from pathlib import Path + +import pytest + +SCRIPT = Path(__file__).resolve().parent.parent / "cross-agent-comms" / "cross-agent-watch" + + +def _spawn(watched_dir: Path, log_path: Path, env: dict) -> subprocess.Popen: + return subprocess.Popen( + [ + str(SCRIPT), + "--projects-glob", str(watched_dir) + "/", + "--log", str(log_path), + "--no-notify", + "--quiet", + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + env=env, + ) + + +def _wait_for_log_lines(log_path: Path, expected: int, timeout: float = 5.0) -> list[str]: + deadline = time.time() + timeout + while time.time() < deadline: + if log_path.exists(): + lines = [ln for ln in log_path.read_text().splitlines() if ln] + if len(lines) >= expected: + return lines + time.sleep(0.1) + if log_path.exists(): + return [ln for ln in log_path.read_text().splitlines() if ln] + return [] + + +@pytest.fixture +def isolated_env(tmp_path, monkeypatch): + fake_home = tmp_path / "home" + fake_home.mkdir() + monkeypatch.setenv("HOME", str(fake_home)) + return fake_home + + +def test_watch_help(isolated_env): + result = subprocess.run( + [str(SCRIPT), "--help"], + capture_output=True, text=True, + env={**os.environ, "HOME": str(isolated_env)}, + ) + assert result.returncode == 0 + assert "Usage:" in result.stdout + + +def test_watch_empty_glob_exits_nonzero(isolated_env): + """Glob resolving to zero dirs should exit non-zero with a clear message.""" + result = subprocess.run( + [str(SCRIPT), "--projects-glob", "/nonexistent/path/*/foo/", "--no-notify", "--quiet"], + capture_output=True, text=True, + env={**os.environ, "HOME": str(isolated_env)}, + timeout=3, + ) + assert result.returncode != 0 + assert "0 directories" in result.stderr + + +def test_watch_logs_org_file_create(isolated_env, tmp_path): + watched = tmp_path / "watched" + watched.mkdir() + log = tmp_path / "watch.log" + proc = _spawn(watched, log, {**os.environ, "HOME": str(isolated_env)}) + try: + # Give inotifywait a moment to attach. + time.sleep(0.3) + (watched / "test-msg.org").write_text("hello") + lines = _wait_for_log_lines(log, expected=1, timeout=3.0) + assert len(lines) >= 1 + assert "test-msg.org" in lines[-1] + finally: + proc.terminate() + proc.wait(timeout=2) + + +def test_watch_filters_tmp_files(isolated_env, tmp_path): + """Files starting with .tmp. must NOT trigger log entries.""" + watched = tmp_path / "watched" + watched.mkdir() + log = tmp_path / "watch.log" + proc = _spawn(watched, log, {**os.environ, "HOME": str(isolated_env)}) + try: + time.sleep(0.3) + (watched / ".tmp.staging-file.org").write_text("hello") + # Wait briefly to confirm nothing logs. + time.sleep(0.5) + if log.exists(): + content = log.read_text() + assert ".tmp.staging-file" not in content + # Then drop a real file to confirm watcher is alive. + (watched / "real.org").write_text("real") + lines = _wait_for_log_lines(log, expected=1, timeout=3.0) + assert any("real.org" in ln for ln in lines) + finally: + proc.terminate() + proc.wait(timeout=2) + + +def test_watch_filters_asc_sidecars(isolated_env, tmp_path): + """Only .org events fire; .asc sidecars are silent.""" + watched = tmp_path / "watched" + watched.mkdir() + log = tmp_path / "watch.log" + proc = _spawn(watched, log, {**os.environ, "HOME": str(isolated_env)}) + try: + time.sleep(0.3) + (watched / "msg.org.asc").write_text("sig") + time.sleep(0.5) + if log.exists(): + assert "msg.org.asc" not in log.read_text() + # .org event still works. + (watched / "msg.org").write_text("body") + lines = _wait_for_log_lines(log, expected=1, timeout=3.0) + assert any(ln.endswith("msg.org") for ln in lines) + finally: + proc.terminate() + proc.wait(timeout=2) + + +def test_watch_halt_suppresses_but_logs(isolated_env, tmp_path): + """When HALT is set, watcher logs the event with (suppressed by HALT) marker.""" + halt = isolated_env / ".config" / "cross-agent-comms" / "HALT" + halt.parent.mkdir(parents=True) + halt.write_text("halted") + watched = tmp_path / "watched" + watched.mkdir() + log = tmp_path / "watch.log" + proc = _spawn(watched, log, {**os.environ, "HOME": str(isolated_env)}) + try: + time.sleep(0.3) + (watched / "halted-event.org").write_text("body") + lines = _wait_for_log_lines(log, expected=1, timeout=3.0) + assert len(lines) >= 1 + assert "suppressed by HALT" in lines[-1] + finally: + proc.terminate() + proc.wait(timeout=2) diff --git a/.ai/scripts/tests/test_extract_body.py b/.ai/scripts/tests/test_extract_body.py new file mode 100644 index 0000000..7b53cda --- /dev/null +++ b/.ai/scripts/tests/test_extract_body.py @@ -0,0 +1,96 @@ +"""Tests for extract_body().""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from conftest import make_plain_message, make_html_message, make_message_with_attachment +from email.message import EmailMessage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.application import MIMEApplication + +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +extract_body = eml_script.extract_body + + +class TestPlainText: + def test_returns_plain_text(self): + msg = make_plain_message(body="Hello, this is plain text.") + result = extract_body(msg) + assert "Hello, this is plain text." in result + + +class TestHtmlOnly: + def test_returns_converted_html(self): + msg = make_html_message(html_body="<p>Hello <strong>world</strong></p>") + result = extract_body(msg) + assert "Hello" in result + assert "world" in result + # Should not contain raw HTML tags + assert "<p>" not in result + assert "<strong>" not in result + + +class TestBothPlainAndHtml: + def test_prefers_plain_text(self): + msg = MIMEMultipart('alternative') + msg['From'] = 'test@example.com' + msg['To'] = 'dest@example.com' + msg['Subject'] = 'Test' + msg['Date'] = 'Thu, 05 Feb 2026 11:36:00 -0600' + msg.attach(MIMEText("Plain text version", 'plain')) + msg.attach(MIMEText("<p>HTML version</p>", 'html')) + result = extract_body(msg) + assert "Plain text version" in result + assert "HTML version" not in result + + +class TestEmptyBody: + def test_returns_empty_string(self): + # Multipart with only attachments, no text parts + msg = MIMEMultipart() + msg['From'] = 'test@example.com' + att = MIMEApplication(b"binary data", Name="file.bin") + att['Content-Disposition'] = 'attachment; filename="file.bin"' + msg.attach(att) + result = extract_body(msg) + assert result == "" + + +class TestNonUtf8Encoding: + def test_decodes_with_errors_ignore(self): + msg = EmailMessage() + msg['From'] = 'test@example.com' + # Set raw bytes that include invalid UTF-8 + msg.set_content("Valid text with special: café") + result = extract_body(msg) + assert "Valid text" in result + + +class TestHtmlWithStructure: + def test_preserves_list_structure(self): + html = "<ul><li>Item one</li><li>Item two</li></ul>" + msg = make_html_message(html_body=html) + result = extract_body(msg) + assert "Item one" in result + assert "Item two" in result + + +class TestNoTextParts: + def test_returns_empty_string(self): + msg = MIMEMultipart() + msg['From'] = 'test@example.com' + att = MIMEApplication(b"data", Name="image.png") + att['Content-Disposition'] = 'attachment; filename="image.png"' + msg.attach(att) + result = extract_body(msg) + assert result == "" diff --git a/.ai/scripts/tests/test_extract_metadata.py b/.ai/scripts/tests/test_extract_metadata.py new file mode 100644 index 0000000..d5ee52e --- /dev/null +++ b/.ai/scripts/tests/test_extract_metadata.py @@ -0,0 +1,65 @@ +"""Tests for extract_metadata().""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from conftest import make_plain_message, add_received_headers +from email.message import EmailMessage + +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +extract_metadata = eml_script.extract_metadata + + +class TestAllHeadersPresent: + def test_complete_dict(self): + msg = make_plain_message( + from_="Jonathan Smith <jsmith@example.com>", + to="Craig <craig@example.com>", + subject="Test Subject", + date="Thu, 05 Feb 2026 11:36:00 -0600" + ) + result = extract_metadata(msg) + assert result['from'] == "Jonathan Smith <jsmith@example.com>" + assert result['to'] == "Craig <craig@example.com>" + assert result['subject'] == "Test Subject" + assert result['date'] == "Thu, 05 Feb 2026 11:36:00 -0600" + assert 'timing' in result + + +class TestMissingFrom: + def test_from_is_none(self): + msg = EmailMessage() + msg['To'] = 'craig@example.com' + msg['Subject'] = 'Test' + msg['Date'] = 'Thu, 05 Feb 2026 11:36:00 -0600' + msg.set_content("body") + result = extract_metadata(msg) + assert result['from'] is None + + +class TestMissingDate: + def test_date_is_none(self): + msg = EmailMessage() + msg['From'] = 'test@example.com' + msg['To'] = 'craig@example.com' + msg['Subject'] = 'Test' + msg.set_content("body") + result = extract_metadata(msg) + assert result['date'] is None + + +class TestLongSubject: + def test_full_subject_returned(self): + long_subject = "Re: Fw: This is a very long subject line that spans many words and might be folded" + msg = make_plain_message(subject=long_subject) + result = extract_metadata(msg) + assert result['subject'] == long_subject diff --git a/.ai/scripts/tests/test_generate_filenames.py b/.ai/scripts/tests/test_generate_filenames.py new file mode 100644 index 0000000..07c8f84 --- /dev/null +++ b/.ai/scripts/tests/test_generate_filenames.py @@ -0,0 +1,157 @@ +"""Tests for generate_basename(), generate_email_filename(), generate_attachment_filename().""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +generate_basename = eml_script.generate_basename +generate_email_filename = eml_script.generate_email_filename +generate_attachment_filename = eml_script.generate_attachment_filename + + +# --- generate_basename --- + +class TestGenerateBasename: + def test_standard_from_and_date(self): + metadata = { + 'from': 'Jonathan Smith <jsmith@example.com>', + 'date': 'Wed, 05 Feb 2026 11:36:00 -0600', + } + assert generate_basename(metadata) == "2026-02-05-1136-Jonathan" + + def test_from_with_display_name_first_token(self): + metadata = { + 'from': 'C Ciarm <cciarm@example.com>', + 'date': 'Wed, 05 Feb 2026 11:36:00 -0600', + } + result = generate_basename(metadata) + assert result == "2026-02-05-1136-C" + + def test_from_without_display_name(self): + metadata = { + 'from': 'jsmith@example.com', + 'date': 'Wed, 05 Feb 2026 11:36:00 -0600', + } + result = generate_basename(metadata) + assert result == "2026-02-05-1136-jsmith" + + def test_missing_date(self): + metadata = { + 'from': 'Jonathan Smith <jsmith@example.com>', + 'date': None, + } + result = generate_basename(metadata) + assert result == "unknown-Jonathan" + + def test_missing_from(self): + metadata = { + 'from': None, + 'date': 'Wed, 05 Feb 2026 11:36:00 -0600', + } + result = generate_basename(metadata) + assert result == "2026-02-05-1136-unknown" + + def test_both_missing(self): + metadata = {'from': None, 'date': None} + result = generate_basename(metadata) + assert result == "unknown-unknown" + + def test_unparseable_date(self): + metadata = { + 'from': 'Jonathan <j@example.com>', + 'date': 'not a real date', + } + result = generate_basename(metadata) + assert result == "unknown-Jonathan" + + def test_none_date_no_crash(self): + metadata = {'from': 'Test <t@e.com>', 'date': None} + # Should not raise + result = generate_basename(metadata) + assert "unknown" in result + + +# --- generate_email_filename --- + +class TestGenerateEmailFilename: + def test_standard_subject(self): + result = generate_email_filename( + "2026-02-05-1136-Jonathan", + "Re: Fw: 4319 Danneel Street" + ) + assert result == "2026-02-05-1136-Jonathan-EMAIL-Re-Fw-4319-Danneel-Street" + + def test_subject_with_special_chars(self): + result = generate_email_filename( + "2026-02-05-1136-Jonathan", + "Update: Meeting (draft) & notes!" + ) + # Colons, parens, ampersands, exclamation stripped + assert "EMAIL" in result + assert ":" not in result + assert "(" not in result + assert ")" not in result + assert "&" not in result + assert "!" not in result + + def test_none_subject(self): + result = generate_email_filename("2026-02-05-1136-Jonathan", None) + assert result == "2026-02-05-1136-Jonathan-EMAIL-no-subject" + + def test_empty_subject(self): + result = generate_email_filename("2026-02-05-1136-Jonathan", "") + assert result == "2026-02-05-1136-Jonathan-EMAIL-no-subject" + + def test_very_long_subject(self): + long_subject = "A" * 100 + " " + "B" * 100 + result = generate_email_filename("2026-02-05-1136-Jonathan", long_subject) + # The cleaned subject part should be truncated + # basename (27) + "-EMAIL-" (7) + subject + # Subject itself is limited to 80 chars by _clean_for_filename + subject_part = result.split("-EMAIL-")[1] + assert len(subject_part) <= 80 + + +# --- generate_attachment_filename --- + +class TestGenerateAttachmentFilename: + def test_standard_attachment(self): + result = generate_attachment_filename( + "2026-02-05-1136-Jonathan", + "Ltr Carrollton.pdf" + ) + assert result == "2026-02-05-1136-Jonathan-ATTACH-Ltr-Carrollton.pdf" + + def test_filename_with_spaces_and_parens(self): + result = generate_attachment_filename( + "2026-02-05-1136-Jonathan", + "Document (final copy).pdf" + ) + assert " " not in result + assert "(" not in result + assert ")" not in result + assert result.endswith(".pdf") + + def test_preserves_extension(self): + result = generate_attachment_filename( + "2026-02-05-1136-Jonathan", + "photo.jpg" + ) + assert result.endswith(".jpg") + + def test_none_filename(self): + result = generate_attachment_filename("2026-02-05-1136-Jonathan", None) + assert result == "2026-02-05-1136-Jonathan-ATTACH-unnamed" + + def test_empty_filename(self): + result = generate_attachment_filename("2026-02-05-1136-Jonathan", "") + assert result == "2026-02-05-1136-Jonathan-ATTACH-unnamed" diff --git a/.ai/scripts/tests/test_integration_stdout.py b/.ai/scripts/tests/test_integration_stdout.py new file mode 100644 index 0000000..d87478e --- /dev/null +++ b/.ai/scripts/tests/test_integration_stdout.py @@ -0,0 +1,68 @@ +"""Integration tests for backwards-compatible stdout mode (no --output-dir).""" + +import os +import shutil +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +print_email = eml_script.print_email + +FIXTURES = os.path.join(os.path.dirname(__file__), 'fixtures') + + +class TestPlainTextStdout: + def test_metadata_and_body_printed(self, tmp_path, capsys): + eml_src = os.path.join(FIXTURES, 'plain-text.eml') + working_eml = tmp_path / "message.eml" + shutil.copy2(eml_src, working_eml) + + print_email(str(working_eml)) + captured = capsys.readouterr() + + assert "From: Jonathan Smith <jsmith@example.com>" in captured.out + assert "To: Craig Jennings <craig@example.com>" in captured.out + assert "Subject: Re: Fw: 4319 Danneel Street" in captured.out + assert "Date:" in captured.out + assert "Sent:" in captured.out + assert "Received:" in captured.out + assert "4319 Danneel Street" in captured.out + + +class TestHtmlFallbackStdout: + def test_html_converted_on_stdout(self, tmp_path, capsys): + eml_src = os.path.join(FIXTURES, 'html-only.eml') + working_eml = tmp_path / "message.eml" + shutil.copy2(eml_src, working_eml) + + print_email(str(working_eml)) + captured = capsys.readouterr() + + # Should see converted text, not raw HTML + assert "HTML" in captured.out + assert "<p>" not in captured.out + + +class TestAttachmentsStdout: + def test_attachment_extracted_alongside_eml(self, tmp_path, capsys): + eml_src = os.path.join(FIXTURES, 'with-attachment.eml') + working_eml = tmp_path / "message.eml" + shutil.copy2(eml_src, working_eml) + + print_email(str(working_eml)) + captured = capsys.readouterr() + + assert "Extracted attachment:" in captured.out + assert "Ltr Carrollton.pdf" in captured.out + + # File should exist alongside the EML + extracted = tmp_path / "Ltr Carrollton.pdf" + assert extracted.exists() diff --git a/.ai/scripts/tests/test_parse_received_headers.py b/.ai/scripts/tests/test_parse_received_headers.py new file mode 100644 index 0000000..e12e1fb --- /dev/null +++ b/.ai/scripts/tests/test_parse_received_headers.py @@ -0,0 +1,105 @@ +"""Tests for parse_received_headers().""" + +import email +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from conftest import make_plain_message, add_received_headers +from email.message import EmailMessage + +# Import the function under test +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +parse_received_headers = eml_script.parse_received_headers + + +class TestSingleHeader: + def test_header_with_from_and_by(self): + msg = EmailMessage() + msg['Received'] = ( + 'from mail-sender.example.com by mx.receiver.example.com ' + 'with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600' + ) + result = parse_received_headers(msg) + assert result['sent_server'] == 'mail-sender.example.com' + assert result['received_server'] == 'mx.receiver.example.com' + assert result['sent_time'] == 'Thu, 05 Feb 2026 11:36:05 -0600' + assert result['received_time'] == 'Thu, 05 Feb 2026 11:36:05 -0600' + + +class TestMultipleHeaders: + def test_uses_first_with_both_from_and_by(self): + msg = EmailMessage() + # Most recent first (by only) + msg['Received'] = 'by internal.example.com with SMTP; Thu, 05 Feb 2026 11:36:10 -0600' + # Next: has both from and by — this should be selected + msg['Received'] = ( + 'from mail-sender.example.com by mx.receiver.example.com ' + 'with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600' + ) + # Oldest + msg['Received'] = ( + 'from originator.example.com by relay.example.com ' + 'with SMTP; Thu, 05 Feb 2026 11:35:58 -0600' + ) + result = parse_received_headers(msg) + assert result['sent_server'] == 'mail-sender.example.com' + assert result['received_server'] == 'mx.receiver.example.com' + + +class TestNoReceivedHeaders: + def test_all_values_none(self): + msg = EmailMessage() + result = parse_received_headers(msg) + assert result['sent_time'] is None + assert result['sent_server'] is None + assert result['received_time'] is None + assert result['received_server'] is None + + +class TestByButNoFrom: + def test_falls_back_to_first_header(self): + msg = EmailMessage() + msg['Received'] = 'by internal.example.com with SMTP; Thu, 05 Feb 2026 11:36:10 -0600' + result = parse_received_headers(msg) + assert result['received_server'] == 'internal.example.com' + assert result['received_time'] == 'Thu, 05 Feb 2026 11:36:10 -0600' + # No from in any header, so sent_server stays None + assert result['sent_server'] is None + + +class TestMultilineFoldedHeader: + def test_normalizes_whitespace(self): + # Use email.message_from_string to parse raw folded headers + # (EmailMessage policy rejects embedded CRLF in set values) + raw = ( + "From: test@example.com\r\n" + "Received: from mail-sender.example.com\r\n" + " by mx.receiver.example.com\r\n" + " with ESMTP; Thu, 05 Feb 2026 11:36:05 -0600\r\n" + "\r\n" + "body\r\n" + ) + msg = email.message_from_string(raw) + result = parse_received_headers(msg) + assert result['sent_server'] == 'mail-sender.example.com' + assert result['received_server'] == 'mx.receiver.example.com' + + +class TestMalformedTimestamp: + def test_no_semicolon(self): + msg = EmailMessage() + msg['Received'] = 'from sender.example.com by receiver.example.com with SMTP' + result = parse_received_headers(msg) + assert result['sent_server'] == 'sender.example.com' + assert result['received_server'] == 'receiver.example.com' + assert result['sent_time'] is None + assert result['received_time'] is None diff --git a/.ai/scripts/tests/test_process_eml.py b/.ai/scripts/tests/test_process_eml.py new file mode 100644 index 0000000..612cbb1 --- /dev/null +++ b/.ai/scripts/tests/test_process_eml.py @@ -0,0 +1,162 @@ +"""Integration tests for process_eml() — full pipeline with --output-dir.""" + +import os +import shutil +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +process_eml = eml_script.process_eml + +import pytest + + +FIXTURES = os.path.join(os.path.dirname(__file__), 'fixtures') + + +class TestPlainTextPipeline: + def test_creates_eml_and_txt(self, tmp_path): + eml_src = os.path.join(FIXTURES, 'plain-text.eml') + # Copy fixture to tmp_path so temp dir can be created as sibling + working_eml = tmp_path / "inbox" / "message.eml" + working_eml.parent.mkdir() + shutil.copy2(eml_src, working_eml) + + output_dir = tmp_path / "output" + result = process_eml(str(working_eml), str(output_dir)) + + # Should have exactly 2 files: .eml and .txt + assert len(result['files']) == 2 + eml_file = result['files'][0] + txt_file = result['files'][1] + + assert eml_file['type'] == 'eml' + assert txt_file['type'] == 'txt' + assert eml_file['name'].endswith('.eml') + assert txt_file['name'].endswith('.txt') + + # Files exist in output dir + assert os.path.isfile(eml_file['path']) + assert os.path.isfile(txt_file['path']) + + # Filenames contain expected components + assert 'Jonathan' in eml_file['name'] + assert 'EMAIL' in eml_file['name'] + assert '2026-02-05' in eml_file['name'] + + # Temp dir cleaned up (no extract-* dirs in inbox) + inbox_contents = os.listdir(str(tmp_path / "inbox")) + assert not any(d.startswith('extract-') for d in inbox_contents) + + +class TestHtmlFallbackPipeline: + def test_txt_contains_converted_html(self, tmp_path): + eml_src = os.path.join(FIXTURES, 'html-only.eml') + working_eml = tmp_path / "inbox" / "message.eml" + working_eml.parent.mkdir() + shutil.copy2(eml_src, working_eml) + + output_dir = tmp_path / "output" + result = process_eml(str(working_eml), str(output_dir)) + + txt_file = result['files'][1] + with open(txt_file['path'], 'r') as f: + content = f.read() + + # Should be converted, not raw HTML + assert '<p>' not in content + assert '<strong>' not in content + assert 'HTML' in content + + +class TestAttachmentPipeline: + def test_eml_txt_and_attachment_created(self, tmp_path): + eml_src = os.path.join(FIXTURES, 'with-attachment.eml') + working_eml = tmp_path / "inbox" / "message.eml" + working_eml.parent.mkdir() + shutil.copy2(eml_src, working_eml) + + output_dir = tmp_path / "output" + result = process_eml(str(working_eml), str(output_dir)) + + assert len(result['files']) == 3 + types = [f['type'] for f in result['files']] + assert types == ['eml', 'txt', 'attach'] + + # Attachment is auto-renamed + attach_file = result['files'][2] + assert 'ATTACH' in attach_file['name'] + assert attach_file['name'].endswith('.pdf') + assert os.path.isfile(attach_file['path']) + + +class TestDuplicateAttachmentNames: + """Outlook inlines the same signature image multiple times under one + filename. Each part must be saved to its own file, not silently + overwritten in temp_dir (which leaves the move step pointing at a + missing file).""" + + def test_each_duplicate_attachment_kept_with_counter_suffix(self, tmp_path): + eml_src = os.path.join(FIXTURES, 'duplicate-attachment-names.eml') + working_eml = tmp_path / "inbox" / "message.eml" + working_eml.parent.mkdir() + shutil.copy2(eml_src, working_eml) + + output_dir = tmp_path / "output" + result = process_eml(str(working_eml), str(output_dir)) + + # eml + txt + 3 attachments + assert len(result['files']) == 5 + attach_files = [f for f in result['files'] if f['type'] == 'attach'] + assert len(attach_files) == 3 + + # Each file must have a unique name and exist on disk with its own + # bytes — overwriting earlier ones would leave fewer than 3 files + # and the move step would fail. + names = [f['name'] for f in attach_files] + assert len(set(names)) == 3 + for f in attach_files: + assert os.path.isfile(f['path']) + + # Bytes are preserved per part (fixture has -1, -2, -3 payloads) + contents = sorted(open(f['path'], 'rb').read() for f in attach_files) + assert contents == [b'image-content-1', b'image-content-2', b'image-content-3'] + + +class TestCollisionDetection: + def test_raises_on_existing_file(self, tmp_path): + eml_src = os.path.join(FIXTURES, 'plain-text.eml') + working_eml = tmp_path / "inbox" / "message.eml" + working_eml.parent.mkdir() + shutil.copy2(eml_src, working_eml) + + output_dir = tmp_path / "output" + # Run once to create files + result = process_eml(str(working_eml), str(output_dir)) + + # Run again — should raise FileExistsError + with pytest.raises(FileExistsError, match="Collision"): + process_eml(str(working_eml), str(output_dir)) + + +class TestMissingOutputDir: + def test_creates_directory(self, tmp_path): + eml_src = os.path.join(FIXTURES, 'plain-text.eml') + working_eml = tmp_path / "inbox" / "message.eml" + working_eml.parent.mkdir() + shutil.copy2(eml_src, working_eml) + + output_dir = tmp_path / "new" / "nested" / "output" + assert not output_dir.exists() + + result = process_eml(str(working_eml), str(output_dir)) + assert output_dir.exists() + assert len(result['files']) == 2 diff --git a/.ai/scripts/tests/test_save_attachments.py b/.ai/scripts/tests/test_save_attachments.py new file mode 100644 index 0000000..32f02a6 --- /dev/null +++ b/.ai/scripts/tests/test_save_attachments.py @@ -0,0 +1,97 @@ +"""Tests for save_attachments().""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from conftest import make_plain_message, make_message_with_attachment +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.application import MIMEApplication + +import importlib.util +spec = importlib.util.spec_from_file_location( + "eml_script", + os.path.join(os.path.dirname(__file__), '..', 'eml-view-and-extract-attachments.py') +) +eml_script = importlib.util.module_from_spec(spec) +spec.loader.exec_module(eml_script) + +save_attachments = eml_script.save_attachments + + +class TestSingleAttachment: + def test_file_written_and_returned(self, tmp_path): + msg = make_message_with_attachment( + attachment_filename="report.pdf", + attachment_content=b"pdf bytes here" + ) + result = save_attachments(msg, str(tmp_path), "2026-02-05-1136-Jonathan") + + assert len(result) == 1 + assert result[0]['original_name'] == "report.pdf" + assert "ATTACH" in result[0]['renamed_name'] + assert result[0]['renamed_name'].endswith(".pdf") + + # File actually exists and has correct content + written_path = result[0]['path'] + assert os.path.isfile(written_path) + with open(written_path, 'rb') as f: + assert f.read() == b"pdf bytes here" + + +class TestMultipleAttachments: + def test_all_written_and_returned(self, tmp_path): + msg = MIMEMultipart() + msg['From'] = 'test@example.com' + msg['Date'] = 'Thu, 05 Feb 2026 11:36:00 -0600' + msg.attach(MIMEText("body", 'plain')) + + for name, content in [("doc1.pdf", b"pdf1"), ("image.png", b"png1")]: + att = MIMEApplication(content, Name=name) + att['Content-Disposition'] = f'attachment; filename="{name}"' + msg.attach(att) + + result = save_attachments(msg, str(tmp_path), "2026-02-05-1136-Jonathan") + + assert len(result) == 2 + for r in result: + assert os.path.isfile(r['path']) + + +class TestNoAttachments: + def test_empty_list(self, tmp_path): + msg = make_plain_message() + result = save_attachments(msg, str(tmp_path), "2026-02-05-1136-Jonathan") + assert result == [] + + +class TestFilenameWithSpaces: + def test_cleaned_filename(self, tmp_path): + msg = make_message_with_attachment( + attachment_filename="My Document (1).pdf", + attachment_content=b"data" + ) + result = save_attachments(msg, str(tmp_path), "2026-02-05-1136-Jonathan") + + assert len(result) == 1 + assert " " not in result[0]['renamed_name'] + assert os.path.isfile(result[0]['path']) + + +class TestNoContentDisposition: + def test_skipped(self, tmp_path): + msg = MIMEMultipart() + msg['From'] = 'test@example.com' + msg.attach(MIMEText("body", 'plain')) + + # Add a part without Content-Disposition + part = MIMEApplication(b"data", Name="file.bin") + # Explicitly remove Content-Disposition if present + if 'Content-Disposition' in part: + del part['Content-Disposition'] + msg.attach(part) + + result = save_attachments(msg, str(tmp_path), "2026-02-05-1136-Jonathan") + assert result == [] |
