1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
|
"""Tests for maildir-flag-manager.py.
Covers:
- Pure parsers: parse_maildir_flags, build_flagged_filename
- File-I/O ops: rename_with_flag, process_maildir, process_specific_files
(tmp_path with real maildir directory structures)
- Subprocess wrapper: reindex_mu (monkeypatch on shutil.which + subprocess.run)
- Argparse: --help / missing-subcommand via subprocess
The cmd_mark_read / cmd_star orchestrators are intentionally skipped —
they call the helpers and print summaries; the helpers are tested
directly so testing the orchestrators would mostly assert call counts.
"""
from __future__ import annotations
import importlib.util
import subprocess
import sys
from pathlib import Path
from unittest.mock import MagicMock
import pytest
SCRIPT_PATH = Path(__file__).resolve().parent.parent / "maildir-flag-manager.py"
def _load_module():
spec = importlib.util.spec_from_file_location(
"maildir_flag_manager", str(SCRIPT_PATH)
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@pytest.fixture(scope="module")
def mfm():
return _load_module()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_maildir(tmp_path: Path, files=None) -> Path:
"""Construct a maildir at tmp_path/inbox with new/ and cur/ subdirs.
files is a list of (subdir, filename) tuples. Each becomes an empty
file at tmp_path/inbox/<subdir>/<filename>.
"""
inbox = tmp_path / "inbox"
(inbox / "new").mkdir(parents=True)
(inbox / "cur").mkdir()
for subdir, fname in (files or []):
(inbox / subdir / fname).write_text("body")
return inbox
# ---------------------------------------------------------------------------
# parse_maildir_flags — pure
# ---------------------------------------------------------------------------
class TestParseMaildirFlags:
def test_normal_typical_filename(self, mfm):
assert mfm.parse_maildir_flags("12345.host:2,FS") == ("12345.host", "FS")
def test_boundary_no_flag_suffix(self, mfm):
# No ":2," in filename — return whole name as base, empty flags.
assert mfm.parse_maildir_flags("12345.host") == ("12345.host", "")
def test_boundary_empty_flags_section(self, mfm):
# ":2," with nothing after — base is parsed, flags are empty.
assert mfm.parse_maildir_flags("12345.host:2,") == ("12345.host", "")
def test_boundary_multiple_colons_in_base(self, mfm):
# rsplit on the LAST ":2," — base may contain colons or even ":2,"-like
# substrings. Real maildir names sometimes have these from migrations.
assert mfm.parse_maildir_flags("weird:thing:2,FS") == ("weird:thing", "FS")
def test_boundary_empty_string(self, mfm):
assert mfm.parse_maildir_flags("") == ("", "")
# ---------------------------------------------------------------------------
# build_flagged_filename — pure
# ---------------------------------------------------------------------------
class TestBuildFlaggedFilename:
def test_normal_base_plus_flags(self, mfm):
assert mfm.build_flagged_filename("12345.host", "FS") == "12345.host:2,FS"
def test_boundary_replaces_existing_flags(self, mfm):
# Existing flags get parsed away — the new_flags arg is the source of truth.
assert mfm.build_flagged_filename("12345.host:2,F", "FS") == "12345.host:2,FS"
def test_boundary_flags_sorted_alphabetically(self, mfm):
# Maildir spec requires alphabetical sort. SFR -> FRS.
assert mfm.build_flagged_filename("12345.host", "SFR") == "12345.host:2,FRS"
def test_boundary_duplicate_flags_dedup(self, mfm):
# set() dedups before sort. FFS -> FS.
assert mfm.build_flagged_filename("12345.host", "FFS") == "12345.host:2,FS"
def test_boundary_empty_flags(self, mfm):
assert mfm.build_flagged_filename("12345.host", "") == "12345.host:2,"
# ---------------------------------------------------------------------------
# rename_with_flag — file I/O via tmp_path
# ---------------------------------------------------------------------------
class TestRenameWithFlag:
def test_normal_add_F_to_cur_file_renamed_in_place(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")])
original = inbox / "cur" / "12345.host:2,"
assert mfm.rename_with_flag(str(original), "F") is True
assert not original.exists()
assert (inbox / "cur" / "12345.host:2,F").exists()
def test_boundary_add_S_to_new_file_moves_to_cur(self, mfm, tmp_path):
# Maildir spec: messages with Seen flag belong in cur/, not new/.
inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")])
original = inbox / "new" / "12345.host:2,"
assert mfm.rename_with_flag(str(original), "S") is True
assert not original.exists()
# Should land in cur/, not new/.
assert (inbox / "cur" / "12345.host:2,S").exists()
assert not (inbox / "new" / "12345.host:2,S").exists()
def test_boundary_add_F_to_new_file_stays_in_new(self, mfm, tmp_path):
# F (Flagged) doesn't trigger the new/ -> cur/ migration; only S does.
inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")])
original = inbox / "new" / "12345.host:2,"
assert mfm.rename_with_flag(str(original), "F") is True
assert (inbox / "new" / "12345.host:2,F").exists()
assert not (inbox / "cur" / "12345.host:2,F").exists()
def test_boundary_flag_already_present_returns_false(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,FS")])
original = inbox / "cur" / "12345.host:2,FS"
assert mfm.rename_with_flag(str(original), "F") is False
# Original file unchanged.
assert original.exists()
def test_boundary_dry_run_does_not_modify_filesystem(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")])
original = inbox / "cur" / "12345.host:2,"
assert mfm.rename_with_flag(str(original), "F", dry_run=True) is True
# Original still exists, no new file.
assert original.exists()
assert not (inbox / "cur" / "12345.host:2,F").exists()
def test_error_file_path_does_not_exist(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path)
with pytest.raises(FileNotFoundError):
mfm.rename_with_flag(str(inbox / "cur" / "ghost:2,"), "F")
# ---------------------------------------------------------------------------
# process_maildir — tmp_path
# ---------------------------------------------------------------------------
class TestProcessMaildir:
def test_normal_mixed_flagged_and_unflagged(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path, [
("new", "msg1:2,"),
("new", "msg2:2,"),
("cur", "msg3:2,S"), # already has S, will skip
("cur", "msg4:2,"),
])
changed, skipped, errors = mfm.process_maildir(str(inbox), "S")
# 3 didn't have S yet, 1 already did.
assert (changed, skipped, errors) == (3, 1, 0)
# The two from new/ have moved to cur/ (S triggers the migration).
assert (inbox / "cur" / "msg1:2,S").exists()
assert (inbox / "cur" / "msg2:2,S").exists()
assert (inbox / "cur" / "msg4:2,S").exists()
def test_boundary_empty_maildir(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path)
assert mfm.process_maildir(str(inbox), "S") == (0, 0, 0)
def test_boundary_maildir_does_not_exist(self, mfm, tmp_path, capsys):
# Returns (0, 0, 0) and logs a friendly message to stderr.
result = mfm.process_maildir(str(tmp_path / "nope"), "S")
assert result == (0, 0, 0)
err = capsys.readouterr().err
assert "Skipping" in err
def test_boundary_non_file_entries_skipped(self, mfm, tmp_path):
# A stray subdirectory in cur/ shouldn't crash the scan.
inbox = _make_maildir(tmp_path, [("cur", "msg1:2,")])
(inbox / "cur" / "stray-dir").mkdir()
changed, skipped, errors = mfm.process_maildir(str(inbox), "S")
assert (changed, skipped, errors) == (1, 0, 0)
def test_boundary_only_new_subdir_present(self, mfm, tmp_path):
# If cur/ doesn't exist, the loop just skips it instead of erroring.
inbox = tmp_path / "inbox"
(inbox / "new").mkdir(parents=True)
(inbox / "new" / "msg1:2,").write_text("body")
changed, skipped, errors = mfm.process_maildir(str(inbox), "F")
assert (changed, skipped, errors) == (1, 0, 0)
# ---------------------------------------------------------------------------
# process_specific_files — tmp_path
# ---------------------------------------------------------------------------
class TestProcessSpecificFiles:
def test_normal_paths_in_cur_and_new(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path, [
("cur", "msg1:2,"),
("new", "msg2:2,"),
])
paths = [
str(inbox / "cur" / "msg1:2,"),
str(inbox / "new" / "msg2:2,"),
]
changed, skipped, errors = mfm.process_specific_files(paths, "F")
assert (changed, skipped, errors) == (2, 0, 0)
def test_error_file_not_found(self, mfm, tmp_path, capsys):
inbox = _make_maildir(tmp_path)
ghost = str(inbox / "cur" / "ghost:2,")
changed, skipped, errors = mfm.process_specific_files([ghost], "F")
assert errors == 1
assert "File not found" in capsys.readouterr().err
def test_error_file_outside_cur_or_new(self, mfm, tmp_path, capsys):
# Path validation: only files whose parent dir is named "cur" or "new"
# are accepted. Defends against pointing at the wrong file.
bogus = tmp_path / "elsewhere" / "msg1:2,"
bogus.parent.mkdir()
bogus.write_text("body")
changed, skipped, errors = mfm.process_specific_files([str(bogus)], "F")
assert errors == 1
assert "Not in a maildir" in capsys.readouterr().err
# File untouched.
assert bogus.exists()
def test_error_already_set_counted_as_skipped(self, mfm, tmp_path):
inbox = _make_maildir(tmp_path, [("cur", "msg1:2,F")])
path = str(inbox / "cur" / "msg1:2,F")
changed, skipped, errors = mfm.process_specific_files([path], "F")
assert (changed, skipped, errors) == (0, 1, 0)
# ---------------------------------------------------------------------------
# reindex_mu — mocked subprocess
# ---------------------------------------------------------------------------
class TestReindexMu:
def test_normal_mu_present_returns_true(self, mfm, monkeypatch):
monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
result_obj = MagicMock(returncode=0, stderr="")
monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj)
assert mfm.reindex_mu() is True
def test_error_mu_not_in_path_returns_false(self, mfm, monkeypatch, capsys):
monkeypatch.setattr(mfm.shutil, "which", lambda _name: None)
assert mfm.reindex_mu() is False
assert "mu not found" in capsys.readouterr().err
def test_error_mu_index_returns_nonzero(self, mfm, monkeypatch, capsys):
monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
result_obj = MagicMock(returncode=1, stderr="db locked")
monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj)
assert mfm.reindex_mu() is False
assert "mu index failed" in capsys.readouterr().err
def test_error_mu_index_times_out(self, mfm, monkeypatch, capsys):
monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
def raise_timeout(*_a, **_kw):
raise subprocess.TimeoutExpired(cmd="mu index", timeout=120)
monkeypatch.setattr(mfm.subprocess, "run", raise_timeout)
assert mfm.reindex_mu() is False
assert "timed out" in capsys.readouterr().err
# ---------------------------------------------------------------------------
# Argparse — black-box subprocess sanity check
# ---------------------------------------------------------------------------
class TestArgparseShape:
def test_normal_help_lists_subcommands(self):
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), "--help"],
capture_output=True, text=True,
)
assert result.returncode == 0
assert "mark-read" in result.stdout
assert "star" in result.stdout
def test_error_no_subcommand_exits_nonzero(self):
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH)],
capture_output=True, text=True,
)
assert result.returncode != 0
|