diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-08 23:23:31 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-08 23:23:31 -0500 |
| commit | 65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch) | |
| tree | a64d7a8bd466cb08e77829ca86e25a221aa64710 /.ai/scripts/tests/test_maildir_flag_manager.py | |
| parent | 1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff) | |
| download | rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip | |
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail
Bridge (list-unread, read, mark-read, star, unstar, trash, send,
folders) mirroring the Gmail MCP workflow. Also add comprehensive
tests for cmail-action, gmail-fetch-attachments, and
maildir-flag-manager scripts.
Diffstat (limited to '.ai/scripts/tests/test_maildir_flag_manager.py')
| -rw-r--r-- | .ai/scripts/tests/test_maildir_flag_manager.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/.ai/scripts/tests/test_maildir_flag_manager.py b/.ai/scripts/tests/test_maildir_flag_manager.py new file mode 100644 index 0000000..268af5b --- /dev/null +++ b/.ai/scripts/tests/test_maildir_flag_manager.py @@ -0,0 +1,310 @@ +"""Tests for maildir-flag-manager.py. + +Covers: +- Pure parsers: parse_maildir_flags, build_flagged_filename +- File-I/O ops: rename_with_flag, process_maildir, process_specific_files + (tmp_path with real maildir directory structures) +- Subprocess wrapper: reindex_mu (monkeypatch on shutil.which + subprocess.run) +- Argparse: --help / missing-subcommand via subprocess + +The cmd_mark_read / cmd_star orchestrators are intentionally skipped — +they call the helpers and print summaries; the helpers are tested +directly so testing the orchestrators would mostly assert call counts. +""" + +from __future__ import annotations + +import importlib.util +import subprocess +import sys +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +SCRIPT_PATH = Path(__file__).resolve().parent.parent / "maildir-flag-manager.py" + + +def _load_module(): + spec = importlib.util.spec_from_file_location( + "maildir_flag_manager", str(SCRIPT_PATH) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def mfm(): + return _load_module() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_maildir(tmp_path: Path, files=None) -> Path: + """Construct a maildir at tmp_path/inbox with new/ and cur/ subdirs. + + files is a list of (subdir, filename) tuples. Each becomes an empty + file at tmp_path/inbox/<subdir>/<filename>. + """ + inbox = tmp_path / "inbox" + (inbox / "new").mkdir(parents=True) + (inbox / "cur").mkdir() + for subdir, fname in (files or []): + (inbox / subdir / fname).write_text("body") + return inbox + + +# --------------------------------------------------------------------------- +# parse_maildir_flags — pure +# --------------------------------------------------------------------------- + +class TestParseMaildirFlags: + + def test_normal_typical_filename(self, mfm): + assert mfm.parse_maildir_flags("12345.host:2,FS") == ("12345.host", "FS") + + def test_boundary_no_flag_suffix(self, mfm): + # No ":2," in filename — return whole name as base, empty flags. + assert mfm.parse_maildir_flags("12345.host") == ("12345.host", "") + + def test_boundary_empty_flags_section(self, mfm): + # ":2," with nothing after — base is parsed, flags are empty. + assert mfm.parse_maildir_flags("12345.host:2,") == ("12345.host", "") + + def test_boundary_multiple_colons_in_base(self, mfm): + # rsplit on the LAST ":2," — base may contain colons or even ":2,"-like + # substrings. Real maildir names sometimes have these from migrations. + assert mfm.parse_maildir_flags("weird:thing:2,FS") == ("weird:thing", "FS") + + def test_boundary_empty_string(self, mfm): + assert mfm.parse_maildir_flags("") == ("", "") + + +# --------------------------------------------------------------------------- +# build_flagged_filename — pure +# --------------------------------------------------------------------------- + +class TestBuildFlaggedFilename: + + def test_normal_base_plus_flags(self, mfm): + assert mfm.build_flagged_filename("12345.host", "FS") == "12345.host:2,FS" + + def test_boundary_replaces_existing_flags(self, mfm): + # Existing flags get parsed away — the new_flags arg is the source of truth. + assert mfm.build_flagged_filename("12345.host:2,F", "FS") == "12345.host:2,FS" + + def test_boundary_flags_sorted_alphabetically(self, mfm): + # Maildir spec requires alphabetical sort. SFR -> FRS. + assert mfm.build_flagged_filename("12345.host", "SFR") == "12345.host:2,FRS" + + def test_boundary_duplicate_flags_dedup(self, mfm): + # set() dedups before sort. FFS -> FS. + assert mfm.build_flagged_filename("12345.host", "FFS") == "12345.host:2,FS" + + def test_boundary_empty_flags(self, mfm): + assert mfm.build_flagged_filename("12345.host", "") == "12345.host:2," + + +# --------------------------------------------------------------------------- +# rename_with_flag — file I/O via tmp_path +# --------------------------------------------------------------------------- + +class TestRenameWithFlag: + + def test_normal_add_F_to_cur_file_renamed_in_place(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")]) + original = inbox / "cur" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "F") is True + assert not original.exists() + assert (inbox / "cur" / "12345.host:2,F").exists() + + def test_boundary_add_S_to_new_file_moves_to_cur(self, mfm, tmp_path): + # Maildir spec: messages with Seen flag belong in cur/, not new/. + inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")]) + original = inbox / "new" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "S") is True + assert not original.exists() + # Should land in cur/, not new/. + assert (inbox / "cur" / "12345.host:2,S").exists() + assert not (inbox / "new" / "12345.host:2,S").exists() + + def test_boundary_add_F_to_new_file_stays_in_new(self, mfm, tmp_path): + # F (Flagged) doesn't trigger the new/ -> cur/ migration; only S does. + inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")]) + original = inbox / "new" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "F") is True + assert (inbox / "new" / "12345.host:2,F").exists() + assert not (inbox / "cur" / "12345.host:2,F").exists() + + def test_boundary_flag_already_present_returns_false(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,FS")]) + original = inbox / "cur" / "12345.host:2,FS" + assert mfm.rename_with_flag(str(original), "F") is False + # Original file unchanged. + assert original.exists() + + def test_boundary_dry_run_does_not_modify_filesystem(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")]) + original = inbox / "cur" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "F", dry_run=True) is True + # Original still exists, no new file. + assert original.exists() + assert not (inbox / "cur" / "12345.host:2,F").exists() + + def test_error_file_path_does_not_exist(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path) + with pytest.raises(FileNotFoundError): + mfm.rename_with_flag(str(inbox / "cur" / "ghost:2,"), "F") + + +# --------------------------------------------------------------------------- +# process_maildir — tmp_path +# --------------------------------------------------------------------------- + +class TestProcessMaildir: + + def test_normal_mixed_flagged_and_unflagged(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [ + ("new", "msg1:2,"), + ("new", "msg2:2,"), + ("cur", "msg3:2,S"), # already has S, will skip + ("cur", "msg4:2,"), + ]) + changed, skipped, errors = mfm.process_maildir(str(inbox), "S") + # 3 didn't have S yet, 1 already did. + assert (changed, skipped, errors) == (3, 1, 0) + # The two from new/ have moved to cur/ (S triggers the migration). + assert (inbox / "cur" / "msg1:2,S").exists() + assert (inbox / "cur" / "msg2:2,S").exists() + assert (inbox / "cur" / "msg4:2,S").exists() + + def test_boundary_empty_maildir(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path) + assert mfm.process_maildir(str(inbox), "S") == (0, 0, 0) + + def test_boundary_maildir_does_not_exist(self, mfm, tmp_path, capsys): + # Returns (0, 0, 0) and logs a friendly message to stderr. + result = mfm.process_maildir(str(tmp_path / "nope"), "S") + assert result == (0, 0, 0) + err = capsys.readouterr().err + assert "Skipping" in err + + def test_boundary_non_file_entries_skipped(self, mfm, tmp_path): + # A stray subdirectory in cur/ shouldn't crash the scan. + inbox = _make_maildir(tmp_path, [("cur", "msg1:2,")]) + (inbox / "cur" / "stray-dir").mkdir() + changed, skipped, errors = mfm.process_maildir(str(inbox), "S") + assert (changed, skipped, errors) == (1, 0, 0) + + def test_boundary_only_new_subdir_present(self, mfm, tmp_path): + # If cur/ doesn't exist, the loop just skips it instead of erroring. + inbox = tmp_path / "inbox" + (inbox / "new").mkdir(parents=True) + (inbox / "new" / "msg1:2,").write_text("body") + changed, skipped, errors = mfm.process_maildir(str(inbox), "F") + assert (changed, skipped, errors) == (1, 0, 0) + + +# --------------------------------------------------------------------------- +# process_specific_files — tmp_path +# --------------------------------------------------------------------------- + +class TestProcessSpecificFiles: + + def test_normal_paths_in_cur_and_new(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [ + ("cur", "msg1:2,"), + ("new", "msg2:2,"), + ]) + paths = [ + str(inbox / "cur" / "msg1:2,"), + str(inbox / "new" / "msg2:2,"), + ] + changed, skipped, errors = mfm.process_specific_files(paths, "F") + assert (changed, skipped, errors) == (2, 0, 0) + + def test_error_file_not_found(self, mfm, tmp_path, capsys): + inbox = _make_maildir(tmp_path) + ghost = str(inbox / "cur" / "ghost:2,") + changed, skipped, errors = mfm.process_specific_files([ghost], "F") + assert errors == 1 + assert "File not found" in capsys.readouterr().err + + def test_error_file_outside_cur_or_new(self, mfm, tmp_path, capsys): + # Path validation: only files whose parent dir is named "cur" or "new" + # are accepted. Defends against pointing at the wrong file. + bogus = tmp_path / "elsewhere" / "msg1:2," + bogus.parent.mkdir() + bogus.write_text("body") + changed, skipped, errors = mfm.process_specific_files([str(bogus)], "F") + assert errors == 1 + assert "Not in a maildir" in capsys.readouterr().err + # File untouched. + assert bogus.exists() + + def test_error_already_set_counted_as_skipped(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "msg1:2,F")]) + path = str(inbox / "cur" / "msg1:2,F") + changed, skipped, errors = mfm.process_specific_files([path], "F") + assert (changed, skipped, errors) == (0, 1, 0) + + +# --------------------------------------------------------------------------- +# reindex_mu — mocked subprocess +# --------------------------------------------------------------------------- + +class TestReindexMu: + + def test_normal_mu_present_returns_true(self, mfm, monkeypatch): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") + result_obj = MagicMock(returncode=0, stderr="") + monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj) + assert mfm.reindex_mu() is True + + def test_error_mu_not_in_path_returns_false(self, mfm, monkeypatch, capsys): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: None) + assert mfm.reindex_mu() is False + assert "mu not found" in capsys.readouterr().err + + def test_error_mu_index_returns_nonzero(self, mfm, monkeypatch, capsys): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") + result_obj = MagicMock(returncode=1, stderr="db locked") + monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj) + assert mfm.reindex_mu() is False + assert "mu index failed" in capsys.readouterr().err + + def test_error_mu_index_times_out(self, mfm, monkeypatch, capsys): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") + + def raise_timeout(*_a, **_kw): + raise subprocess.TimeoutExpired(cmd="mu index", timeout=120) + + monkeypatch.setattr(mfm.subprocess, "run", raise_timeout) + assert mfm.reindex_mu() is False + assert "timed out" in capsys.readouterr().err + + +# --------------------------------------------------------------------------- +# Argparse — black-box subprocess sanity check +# --------------------------------------------------------------------------- + +class TestArgparseShape: + + def test_normal_help_lists_subcommands(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH), "--help"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "mark-read" in result.stdout + assert "star" in result.stdout + + def test_error_no_subcommand_exits_nonzero(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH)], + capture_output=True, text=True, + ) + assert result.returncode != 0 |
