aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/tests/test_maildir_flag_manager.py
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
committerCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
commit65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch)
treea64d7a8bd466cb08e77829ca86e25a221aa64710 /.ai/scripts/tests/test_maildir_flag_manager.py
parent1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff)
downloadrulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz
rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail Bridge (list-unread, read, mark-read, star, unstar, trash, send, folders) mirroring the Gmail MCP workflow. Also add comprehensive tests for cmail-action, gmail-fetch-attachments, and maildir-flag-manager scripts.
Diffstat (limited to '.ai/scripts/tests/test_maildir_flag_manager.py')
-rw-r--r--.ai/scripts/tests/test_maildir_flag_manager.py310
1 files changed, 310 insertions, 0 deletions
diff --git a/.ai/scripts/tests/test_maildir_flag_manager.py b/.ai/scripts/tests/test_maildir_flag_manager.py
new file mode 100644
index 0000000..268af5b
--- /dev/null
+++ b/.ai/scripts/tests/test_maildir_flag_manager.py
@@ -0,0 +1,310 @@
+"""Tests for maildir-flag-manager.py.
+
+Covers:
+- Pure parsers: parse_maildir_flags, build_flagged_filename
+- File-I/O ops: rename_with_flag, process_maildir, process_specific_files
+ (tmp_path with real maildir directory structures)
+- Subprocess wrapper: reindex_mu (monkeypatch on shutil.which + subprocess.run)
+- Argparse: --help / missing-subcommand via subprocess
+
+The cmd_mark_read / cmd_star orchestrators are intentionally skipped —
+they call the helpers and print summaries; the helpers are tested
+directly so testing the orchestrators would mostly assert call counts.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import subprocess
+import sys
+from pathlib import Path
+from unittest.mock import MagicMock
+
+import pytest
+
+SCRIPT_PATH = Path(__file__).resolve().parent.parent / "maildir-flag-manager.py"
+
+
+def _load_module():
+ spec = importlib.util.spec_from_file_location(
+ "maildir_flag_manager", str(SCRIPT_PATH)
+ )
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
+
+
+@pytest.fixture(scope="module")
+def mfm():
+ return _load_module()
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_maildir(tmp_path: Path, files=None) -> Path:
+ """Construct a maildir at tmp_path/inbox with new/ and cur/ subdirs.
+
+ files is a list of (subdir, filename) tuples. Each becomes an empty
+ file at tmp_path/inbox/<subdir>/<filename>.
+ """
+ inbox = tmp_path / "inbox"
+ (inbox / "new").mkdir(parents=True)
+ (inbox / "cur").mkdir()
+ for subdir, fname in (files or []):
+ (inbox / subdir / fname).write_text("body")
+ return inbox
+
+
+# ---------------------------------------------------------------------------
+# parse_maildir_flags — pure
+# ---------------------------------------------------------------------------
+
+class TestParseMaildirFlags:
+
+ def test_normal_typical_filename(self, mfm):
+ assert mfm.parse_maildir_flags("12345.host:2,FS") == ("12345.host", "FS")
+
+ def test_boundary_no_flag_suffix(self, mfm):
+ # No ":2," in filename — return whole name as base, empty flags.
+ assert mfm.parse_maildir_flags("12345.host") == ("12345.host", "")
+
+ def test_boundary_empty_flags_section(self, mfm):
+ # ":2," with nothing after — base is parsed, flags are empty.
+ assert mfm.parse_maildir_flags("12345.host:2,") == ("12345.host", "")
+
+ def test_boundary_multiple_colons_in_base(self, mfm):
+ # rsplit on the LAST ":2," — base may contain colons or even ":2,"-like
+ # substrings. Real maildir names sometimes have these from migrations.
+ assert mfm.parse_maildir_flags("weird:thing:2,FS") == ("weird:thing", "FS")
+
+ def test_boundary_empty_string(self, mfm):
+ assert mfm.parse_maildir_flags("") == ("", "")
+
+
+# ---------------------------------------------------------------------------
+# build_flagged_filename — pure
+# ---------------------------------------------------------------------------
+
+class TestBuildFlaggedFilename:
+
+ def test_normal_base_plus_flags(self, mfm):
+ assert mfm.build_flagged_filename("12345.host", "FS") == "12345.host:2,FS"
+
+ def test_boundary_replaces_existing_flags(self, mfm):
+ # Existing flags get parsed away — the new_flags arg is the source of truth.
+ assert mfm.build_flagged_filename("12345.host:2,F", "FS") == "12345.host:2,FS"
+
+ def test_boundary_flags_sorted_alphabetically(self, mfm):
+ # Maildir spec requires alphabetical sort. SFR -> FRS.
+ assert mfm.build_flagged_filename("12345.host", "SFR") == "12345.host:2,FRS"
+
+ def test_boundary_duplicate_flags_dedup(self, mfm):
+ # set() dedups before sort. FFS -> FS.
+ assert mfm.build_flagged_filename("12345.host", "FFS") == "12345.host:2,FS"
+
+ def test_boundary_empty_flags(self, mfm):
+ assert mfm.build_flagged_filename("12345.host", "") == "12345.host:2,"
+
+
+# ---------------------------------------------------------------------------
+# rename_with_flag — file I/O via tmp_path
+# ---------------------------------------------------------------------------
+
+class TestRenameWithFlag:
+
+ def test_normal_add_F_to_cur_file_renamed_in_place(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")])
+ original = inbox / "cur" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "F") is True
+ assert not original.exists()
+ assert (inbox / "cur" / "12345.host:2,F").exists()
+
+ def test_boundary_add_S_to_new_file_moves_to_cur(self, mfm, tmp_path):
+ # Maildir spec: messages with Seen flag belong in cur/, not new/.
+ inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")])
+ original = inbox / "new" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "S") is True
+ assert not original.exists()
+ # Should land in cur/, not new/.
+ assert (inbox / "cur" / "12345.host:2,S").exists()
+ assert not (inbox / "new" / "12345.host:2,S").exists()
+
+ def test_boundary_add_F_to_new_file_stays_in_new(self, mfm, tmp_path):
+ # F (Flagged) doesn't trigger the new/ -> cur/ migration; only S does.
+ inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")])
+ original = inbox / "new" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "F") is True
+ assert (inbox / "new" / "12345.host:2,F").exists()
+ assert not (inbox / "cur" / "12345.host:2,F").exists()
+
+ def test_boundary_flag_already_present_returns_false(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,FS")])
+ original = inbox / "cur" / "12345.host:2,FS"
+ assert mfm.rename_with_flag(str(original), "F") is False
+ # Original file unchanged.
+ assert original.exists()
+
+ def test_boundary_dry_run_does_not_modify_filesystem(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")])
+ original = inbox / "cur" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "F", dry_run=True) is True
+ # Original still exists, no new file.
+ assert original.exists()
+ assert not (inbox / "cur" / "12345.host:2,F").exists()
+
+ def test_error_file_path_does_not_exist(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path)
+ with pytest.raises(FileNotFoundError):
+ mfm.rename_with_flag(str(inbox / "cur" / "ghost:2,"), "F")
+
+
+# ---------------------------------------------------------------------------
+# process_maildir — tmp_path
+# ---------------------------------------------------------------------------
+
+class TestProcessMaildir:
+
+ def test_normal_mixed_flagged_and_unflagged(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [
+ ("new", "msg1:2,"),
+ ("new", "msg2:2,"),
+ ("cur", "msg3:2,S"), # already has S, will skip
+ ("cur", "msg4:2,"),
+ ])
+ changed, skipped, errors = mfm.process_maildir(str(inbox), "S")
+ # 3 didn't have S yet, 1 already did.
+ assert (changed, skipped, errors) == (3, 1, 0)
+ # The two from new/ have moved to cur/ (S triggers the migration).
+ assert (inbox / "cur" / "msg1:2,S").exists()
+ assert (inbox / "cur" / "msg2:2,S").exists()
+ assert (inbox / "cur" / "msg4:2,S").exists()
+
+ def test_boundary_empty_maildir(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path)
+ assert mfm.process_maildir(str(inbox), "S") == (0, 0, 0)
+
+ def test_boundary_maildir_does_not_exist(self, mfm, tmp_path, capsys):
+ # Returns (0, 0, 0) and logs a friendly message to stderr.
+ result = mfm.process_maildir(str(tmp_path / "nope"), "S")
+ assert result == (0, 0, 0)
+ err = capsys.readouterr().err
+ assert "Skipping" in err
+
+ def test_boundary_non_file_entries_skipped(self, mfm, tmp_path):
+ # A stray subdirectory in cur/ shouldn't crash the scan.
+ inbox = _make_maildir(tmp_path, [("cur", "msg1:2,")])
+ (inbox / "cur" / "stray-dir").mkdir()
+ changed, skipped, errors = mfm.process_maildir(str(inbox), "S")
+ assert (changed, skipped, errors) == (1, 0, 0)
+
+ def test_boundary_only_new_subdir_present(self, mfm, tmp_path):
+ # If cur/ doesn't exist, the loop just skips it instead of erroring.
+ inbox = tmp_path / "inbox"
+ (inbox / "new").mkdir(parents=True)
+ (inbox / "new" / "msg1:2,").write_text("body")
+ changed, skipped, errors = mfm.process_maildir(str(inbox), "F")
+ assert (changed, skipped, errors) == (1, 0, 0)
+
+
+# ---------------------------------------------------------------------------
+# process_specific_files — tmp_path
+# ---------------------------------------------------------------------------
+
+class TestProcessSpecificFiles:
+
+ def test_normal_paths_in_cur_and_new(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [
+ ("cur", "msg1:2,"),
+ ("new", "msg2:2,"),
+ ])
+ paths = [
+ str(inbox / "cur" / "msg1:2,"),
+ str(inbox / "new" / "msg2:2,"),
+ ]
+ changed, skipped, errors = mfm.process_specific_files(paths, "F")
+ assert (changed, skipped, errors) == (2, 0, 0)
+
+ def test_error_file_not_found(self, mfm, tmp_path, capsys):
+ inbox = _make_maildir(tmp_path)
+ ghost = str(inbox / "cur" / "ghost:2,")
+ changed, skipped, errors = mfm.process_specific_files([ghost], "F")
+ assert errors == 1
+ assert "File not found" in capsys.readouterr().err
+
+ def test_error_file_outside_cur_or_new(self, mfm, tmp_path, capsys):
+ # Path validation: only files whose parent dir is named "cur" or "new"
+ # are accepted. Defends against pointing at the wrong file.
+ bogus = tmp_path / "elsewhere" / "msg1:2,"
+ bogus.parent.mkdir()
+ bogus.write_text("body")
+ changed, skipped, errors = mfm.process_specific_files([str(bogus)], "F")
+ assert errors == 1
+ assert "Not in a maildir" in capsys.readouterr().err
+ # File untouched.
+ assert bogus.exists()
+
+ def test_error_already_set_counted_as_skipped(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "msg1:2,F")])
+ path = str(inbox / "cur" / "msg1:2,F")
+ changed, skipped, errors = mfm.process_specific_files([path], "F")
+ assert (changed, skipped, errors) == (0, 1, 0)
+
+
+# ---------------------------------------------------------------------------
+# reindex_mu — mocked subprocess
+# ---------------------------------------------------------------------------
+
+class TestReindexMu:
+
+ def test_normal_mu_present_returns_true(self, mfm, monkeypatch):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
+ result_obj = MagicMock(returncode=0, stderr="")
+ monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj)
+ assert mfm.reindex_mu() is True
+
+ def test_error_mu_not_in_path_returns_false(self, mfm, monkeypatch, capsys):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: None)
+ assert mfm.reindex_mu() is False
+ assert "mu not found" in capsys.readouterr().err
+
+ def test_error_mu_index_returns_nonzero(self, mfm, monkeypatch, capsys):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
+ result_obj = MagicMock(returncode=1, stderr="db locked")
+ monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj)
+ assert mfm.reindex_mu() is False
+ assert "mu index failed" in capsys.readouterr().err
+
+ def test_error_mu_index_times_out(self, mfm, monkeypatch, capsys):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
+
+ def raise_timeout(*_a, **_kw):
+ raise subprocess.TimeoutExpired(cmd="mu index", timeout=120)
+
+ monkeypatch.setattr(mfm.subprocess, "run", raise_timeout)
+ assert mfm.reindex_mu() is False
+ assert "timed out" in capsys.readouterr().err
+
+
+# ---------------------------------------------------------------------------
+# Argparse — black-box subprocess sanity check
+# ---------------------------------------------------------------------------
+
+class TestArgparseShape:
+
+ def test_normal_help_lists_subcommands(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH), "--help"],
+ capture_output=True, text=True,
+ )
+ assert result.returncode == 0
+ assert "mark-read" in result.stdout
+ assert "star" in result.stdout
+
+ def test_error_no_subcommand_exits_nonzero(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH)],
+ capture_output=True, text=True,
+ )
+ assert result.returncode != 0