"""Tests for maildir-flag-manager.py. Covers: - Pure parsers: parse_maildir_flags, build_flagged_filename - File-I/O ops: rename_with_flag, process_maildir, process_specific_files (tmp_path with real maildir directory structures) - Subprocess wrapper: reindex_mu (monkeypatch on shutil.which + subprocess.run) - Argparse: --help / missing-subcommand via subprocess The cmd_mark_read / cmd_star orchestrators are intentionally skipped — they call the helpers and print summaries; the helpers are tested directly so testing the orchestrators would mostly assert call counts. """ from __future__ import annotations import importlib.util import subprocess import sys from pathlib import Path from unittest.mock import MagicMock import pytest SCRIPT_PATH = Path(__file__).resolve().parent.parent / "maildir-flag-manager.py" def _load_module(): spec = importlib.util.spec_from_file_location( "maildir_flag_manager", str(SCRIPT_PATH) ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod @pytest.fixture(scope="module") def mfm(): return _load_module() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_maildir(tmp_path: Path, files=None) -> Path: """Construct a maildir at tmp_path/inbox with new/ and cur/ subdirs. files is a list of (subdir, filename) tuples. Each becomes an empty file at tmp_path/inbox//. """ inbox = tmp_path / "inbox" (inbox / "new").mkdir(parents=True) (inbox / "cur").mkdir() for subdir, fname in (files or []): (inbox / subdir / fname).write_text("body") return inbox # --------------------------------------------------------------------------- # parse_maildir_flags — pure # --------------------------------------------------------------------------- class TestParseMaildirFlags: def test_normal_typical_filename(self, mfm): assert mfm.parse_maildir_flags("12345.host:2,FS") == ("12345.host", "FS") def test_boundary_no_flag_suffix(self, mfm): # No ":2," in filename — return whole name as base, empty flags. assert mfm.parse_maildir_flags("12345.host") == ("12345.host", "") def test_boundary_empty_flags_section(self, mfm): # ":2," with nothing after — base is parsed, flags are empty. assert mfm.parse_maildir_flags("12345.host:2,") == ("12345.host", "") def test_boundary_multiple_colons_in_base(self, mfm): # rsplit on the LAST ":2," — base may contain colons or even ":2,"-like # substrings. Real maildir names sometimes have these from migrations. assert mfm.parse_maildir_flags("weird:thing:2,FS") == ("weird:thing", "FS") def test_boundary_empty_string(self, mfm): assert mfm.parse_maildir_flags("") == ("", "") # --------------------------------------------------------------------------- # build_flagged_filename — pure # --------------------------------------------------------------------------- class TestBuildFlaggedFilename: def test_normal_base_plus_flags(self, mfm): assert mfm.build_flagged_filename("12345.host", "FS") == "12345.host:2,FS" def test_boundary_replaces_existing_flags(self, mfm): # Existing flags get parsed away — the new_flags arg is the source of truth. assert mfm.build_flagged_filename("12345.host:2,F", "FS") == "12345.host:2,FS" def test_boundary_flags_sorted_alphabetically(self, mfm): # Maildir spec requires alphabetical sort. SFR -> FRS. assert mfm.build_flagged_filename("12345.host", "SFR") == "12345.host:2,FRS" def test_boundary_duplicate_flags_dedup(self, mfm): # set() dedups before sort. FFS -> FS. assert mfm.build_flagged_filename("12345.host", "FFS") == "12345.host:2,FS" def test_boundary_empty_flags(self, mfm): assert mfm.build_flagged_filename("12345.host", "") == "12345.host:2," # --------------------------------------------------------------------------- # rename_with_flag — file I/O via tmp_path # --------------------------------------------------------------------------- class TestRenameWithFlag: def test_normal_add_F_to_cur_file_renamed_in_place(self, mfm, tmp_path): inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")]) original = inbox / "cur" / "12345.host:2," assert mfm.rename_with_flag(str(original), "F") is True assert not original.exists() assert (inbox / "cur" / "12345.host:2,F").exists() def test_boundary_add_S_to_new_file_moves_to_cur(self, mfm, tmp_path): # Maildir spec: messages with Seen flag belong in cur/, not new/. inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")]) original = inbox / "new" / "12345.host:2," assert mfm.rename_with_flag(str(original), "S") is True assert not original.exists() # Should land in cur/, not new/. assert (inbox / "cur" / "12345.host:2,S").exists() assert not (inbox / "new" / "12345.host:2,S").exists() def test_boundary_add_F_to_new_file_stays_in_new(self, mfm, tmp_path): # F (Flagged) doesn't trigger the new/ -> cur/ migration; only S does. inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")]) original = inbox / "new" / "12345.host:2," assert mfm.rename_with_flag(str(original), "F") is True assert (inbox / "new" / "12345.host:2,F").exists() assert not (inbox / "cur" / "12345.host:2,F").exists() def test_boundary_flag_already_present_returns_false(self, mfm, tmp_path): inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,FS")]) original = inbox / "cur" / "12345.host:2,FS" assert mfm.rename_with_flag(str(original), "F") is False # Original file unchanged. assert original.exists() def test_boundary_dry_run_does_not_modify_filesystem(self, mfm, tmp_path): inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")]) original = inbox / "cur" / "12345.host:2," assert mfm.rename_with_flag(str(original), "F", dry_run=True) is True # Original still exists, no new file. assert original.exists() assert not (inbox / "cur" / "12345.host:2,F").exists() def test_error_file_path_does_not_exist(self, mfm, tmp_path): inbox = _make_maildir(tmp_path) with pytest.raises(FileNotFoundError): mfm.rename_with_flag(str(inbox / "cur" / "ghost:2,"), "F") # --------------------------------------------------------------------------- # process_maildir — tmp_path # --------------------------------------------------------------------------- class TestProcessMaildir: def test_normal_mixed_flagged_and_unflagged(self, mfm, tmp_path): inbox = _make_maildir(tmp_path, [ ("new", "msg1:2,"), ("new", "msg2:2,"), ("cur", "msg3:2,S"), # already has S, will skip ("cur", "msg4:2,"), ]) changed, skipped, errors = mfm.process_maildir(str(inbox), "S") # 3 didn't have S yet, 1 already did. assert (changed, skipped, errors) == (3, 1, 0) # The two from new/ have moved to cur/ (S triggers the migration). assert (inbox / "cur" / "msg1:2,S").exists() assert (inbox / "cur" / "msg2:2,S").exists() assert (inbox / "cur" / "msg4:2,S").exists() def test_boundary_empty_maildir(self, mfm, tmp_path): inbox = _make_maildir(tmp_path) assert mfm.process_maildir(str(inbox), "S") == (0, 0, 0) def test_boundary_maildir_does_not_exist(self, mfm, tmp_path, capsys): # Returns (0, 0, 0) and logs a friendly message to stderr. result = mfm.process_maildir(str(tmp_path / "nope"), "S") assert result == (0, 0, 0) err = capsys.readouterr().err assert "Skipping" in err def test_boundary_non_file_entries_skipped(self, mfm, tmp_path): # A stray subdirectory in cur/ shouldn't crash the scan. inbox = _make_maildir(tmp_path, [("cur", "msg1:2,")]) (inbox / "cur" / "stray-dir").mkdir() changed, skipped, errors = mfm.process_maildir(str(inbox), "S") assert (changed, skipped, errors) == (1, 0, 0) def test_boundary_only_new_subdir_present(self, mfm, tmp_path): # If cur/ doesn't exist, the loop just skips it instead of erroring. inbox = tmp_path / "inbox" (inbox / "new").mkdir(parents=True) (inbox / "new" / "msg1:2,").write_text("body") changed, skipped, errors = mfm.process_maildir(str(inbox), "F") assert (changed, skipped, errors) == (1, 0, 0) # --------------------------------------------------------------------------- # process_specific_files — tmp_path # --------------------------------------------------------------------------- class TestProcessSpecificFiles: def test_normal_paths_in_cur_and_new(self, mfm, tmp_path): inbox = _make_maildir(tmp_path, [ ("cur", "msg1:2,"), ("new", "msg2:2,"), ]) paths = [ str(inbox / "cur" / "msg1:2,"), str(inbox / "new" / "msg2:2,"), ] changed, skipped, errors = mfm.process_specific_files(paths, "F") assert (changed, skipped, errors) == (2, 0, 0) def test_error_file_not_found(self, mfm, tmp_path, capsys): inbox = _make_maildir(tmp_path) ghost = str(inbox / "cur" / "ghost:2,") changed, skipped, errors = mfm.process_specific_files([ghost], "F") assert errors == 1 assert "File not found" in capsys.readouterr().err def test_error_file_outside_cur_or_new(self, mfm, tmp_path, capsys): # Path validation: only files whose parent dir is named "cur" or "new" # are accepted. Defends against pointing at the wrong file. bogus = tmp_path / "elsewhere" / "msg1:2," bogus.parent.mkdir() bogus.write_text("body") changed, skipped, errors = mfm.process_specific_files([str(bogus)], "F") assert errors == 1 assert "Not in a maildir" in capsys.readouterr().err # File untouched. assert bogus.exists() def test_error_already_set_counted_as_skipped(self, mfm, tmp_path): inbox = _make_maildir(tmp_path, [("cur", "msg1:2,F")]) path = str(inbox / "cur" / "msg1:2,F") changed, skipped, errors = mfm.process_specific_files([path], "F") assert (changed, skipped, errors) == (0, 1, 0) # --------------------------------------------------------------------------- # reindex_mu — mocked subprocess # --------------------------------------------------------------------------- class TestReindexMu: def test_normal_mu_present_returns_true(self, mfm, monkeypatch): monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") result_obj = MagicMock(returncode=0, stderr="") monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj) assert mfm.reindex_mu() is True def test_error_mu_not_in_path_returns_false(self, mfm, monkeypatch, capsys): monkeypatch.setattr(mfm.shutil, "which", lambda _name: None) assert mfm.reindex_mu() is False assert "mu not found" in capsys.readouterr().err def test_error_mu_index_returns_nonzero(self, mfm, monkeypatch, capsys): monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") result_obj = MagicMock(returncode=1, stderr="db locked") monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj) assert mfm.reindex_mu() is False assert "mu index failed" in capsys.readouterr().err def test_error_mu_index_times_out(self, mfm, monkeypatch, capsys): monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") def raise_timeout(*_a, **_kw): raise subprocess.TimeoutExpired(cmd="mu index", timeout=120) monkeypatch.setattr(mfm.subprocess, "run", raise_timeout) assert mfm.reindex_mu() is False assert "timed out" in capsys.readouterr().err # --------------------------------------------------------------------------- # Argparse — black-box subprocess sanity check # --------------------------------------------------------------------------- class TestArgparseShape: def test_normal_help_lists_subcommands(self): result = subprocess.run( [sys.executable, str(SCRIPT_PATH), "--help"], capture_output=True, text=True, ) assert result.returncode == 0 assert "mark-read" in result.stdout assert "star" in result.stdout def test_error_no_subcommand_exits_nonzero(self): result = subprocess.run( [sys.executable, str(SCRIPT_PATH)], capture_output=True, text=True, ) assert result.returncode != 0