aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
committerCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
commit65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch)
treea64d7a8bd466cb08e77829ca86e25a221aa64710
parent1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff)
downloadrulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz
rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail Bridge (list-unread, read, mark-read, star, unstar, trash, send, folders) mirroring the Gmail MCP workflow. Also add comprehensive tests for cmail-action, gmail-fetch-attachments, and maildir-flag-manager scripts.
-rwxr-xr-x.ai/scripts/cmail-action.py387
-rw-r--r--.ai/scripts/tests/test_cmail_action.py669
-rw-r--r--.ai/scripts/tests/test_gmail_fetch_attachments.py420
-rw-r--r--.ai/scripts/tests/test_maildir_flag_manager.py310
4 files changed, 1786 insertions, 0 deletions
diff --git a/.ai/scripts/cmail-action.py b/.ai/scripts/cmail-action.py
new file mode 100755
index 0000000..10eb215
--- /dev/null
+++ b/.ai/scripts/cmail-action.py
@@ -0,0 +1,387 @@
+#!/usr/bin/env python3
+"""
+cmail-action — IMAP triage operations against Proton Mail Bridge.
+
+Mirrors the operations the Gmail MCP server provides for gmail/dmail
+(list-unread, read, mark-read, star, unstar, trash) so the
+process-unread-emails workflow can drive cmail end-to-end the same way.
+
+Connects to local Proton Bridge IMAP at 127.0.0.1:1143 with STARTTLS,
+using the Bridge-generated app password at ~/.config/.cmailpass and the
+Bridge self-signed certificate at ~/.config/protonbridge.pem. Cert CN
+is 127.0.0.1 but lacks a SubjectAltName, so hostname verification is
+disabled (connection is to localhost — verifying via the pinned cert
+is sufficient).
+
+IMAP -> Proton mapping:
+- \\Seen flag -> Read state
+- \\Flagged flag -> Starred label
+- MOVE to Trash -> Trash folder
+- COPY to label -> applies the label (Starred etc.)
+"""
+
+import argparse
+import email
+import imaplib
+import json
+import mimetypes
+import smtplib
+import ssl
+import sys
+from email.message import EmailMessage
+from email.policy import default as default_policy
+from pathlib import Path
+
+HOST = "127.0.0.1"
+PORT = 1143
+SMTP_PORT = 1025
+USER = "c@cjennings.net"
+PASS_FILE = Path.home() / ".config" / ".cmailpass"
+CERT_FILE = Path.home() / ".config" / "protonbridge.pem"
+
+INBOX = "INBOX"
+TRASH = "Trash"
+
+
+def connect():
+ if not PASS_FILE.is_file():
+ sys.exit(f"error: missing password file {PASS_FILE}")
+ if not CERT_FILE.is_file():
+ sys.exit(f"error: missing bridge cert {CERT_FILE}")
+ ctx = ssl.create_default_context(cafile=str(CERT_FILE))
+ ctx.check_hostname = False
+ try:
+ M = imaplib.IMAP4(HOST, PORT)
+ except OSError as e:
+ sys.exit(f"error: cannot reach Bridge at {HOST}:{PORT} ({e}). "
+ f"Is protonmail-bridge running? "
+ f"(systemctl --user status protonmail-bridge)")
+ M.starttls(ssl_context=ctx)
+ password = PASS_FILE.read_text().strip()
+ try:
+ M.login(USER, password)
+ except imaplib.IMAP4.error as e:
+ sys.exit(f"error: IMAP login failed for {USER}: {e}")
+ return M
+
+
+def _select(M, mailbox=INBOX, readonly=False):
+ typ, data = M.select(mailbox, readonly=readonly)
+ if typ != "OK":
+ sys.exit(f"error: cannot select {mailbox}: {data}")
+
+
+def _decode_header(value):
+ if value is None:
+ return ""
+ return str(value)
+
+
+def parse_fetch_metadata(meta):
+ """Parse FLAGS and RFC822.SIZE out of an IMAP FETCH metadata string.
+
+ Returns {"flags": str, "size": int | None}. Tolerates malformed input
+ (returns the defaults rather than raising).
+ """
+ result = {"flags": "", "size": None}
+ flags_idx = meta.find("FLAGS (")
+ if flags_idx != -1:
+ end = meta.find(")", flags_idx)
+ if end != -1:
+ result["flags"] = meta[flags_idx + 7:end]
+ # Tokenize with parens stripped so RFC822.SIZE matches whether or not
+ # it abuts an opening paren in the raw response (e.g. "(RFC822.SIZE 500)"
+ # would otherwise tokenize as "(RFC822.SIZE" and miss the equality check).
+ tokens = meta.replace("(", " ").replace(")", " ").split()
+ for i, p in enumerate(tokens):
+ if p == "RFC822.SIZE" and i + 1 < len(tokens):
+ try:
+ result["size"] = int(tokens[i + 1])
+ except ValueError:
+ pass
+ break
+ return result
+
+
+def extract_body(msg):
+ """Pick a printable body out of an email.message.EmailMessage.
+
+ Multipart: text/plain preferred, text/html fallback. Single-part:
+ returns content directly. Returns None if no body found.
+ """
+ if msg.is_multipart():
+ for part in msg.walk():
+ if part.get_content_type() == "text/plain":
+ return part.get_content()
+ for part in msg.walk():
+ if part.get_content_type() == "text/html":
+ return part.get_content()
+ return None
+ return msg.get_content()
+
+
+def build_message(from_addr, to_addr, subject, body, attachments=None):
+ """Construct an EmailMessage from the given fields and attachments.
+
+ attachments is a list of (filename, maintype, subtype, content_bytes)
+ tuples — typically the return value of load_attachment per file. Pure
+ function: no I/O, no SMTP.
+ """
+ msg = EmailMessage()
+ msg["From"] = from_addr
+ msg["To"] = to_addr
+ msg["Subject"] = subject
+ msg.set_content(body)
+ for filename, maintype, subtype, content in (attachments or []):
+ msg.add_attachment(content, maintype=maintype, subtype=subtype,
+ filename=filename)
+ return msg
+
+
+def load_attachment(path):
+ """Read a file and return (filename, maintype, subtype, content_bytes).
+
+ MIME type comes from mimetypes.guess_type; falls back to
+ application/octet-stream when guess returns None. Raises FileNotFoundError
+ for missing paths and IsADirectoryError for directories.
+ """
+ if not path.exists():
+ raise FileNotFoundError(f"attachment not found: {path}")
+ if path.is_dir():
+ raise IsADirectoryError(f"attachment path is a directory: {path}")
+ mime, _ = mimetypes.guess_type(path.name)
+ if mime is None:
+ maintype, subtype = "application", "octet-stream"
+ else:
+ maintype, subtype = mime.split("/", 1)
+ return (path.name, maintype, subtype, path.read_bytes())
+
+
+def smtp_connect():
+ """Connect to Proton Bridge's local SMTP submission endpoint.
+
+ Mirrors connect()'s pattern: STARTTLS against the pinned cert,
+ plaintext password from PASS_FILE. Skipped from unit tests for
+ the same reason connect() is — network + SSL + file I/O.
+ """
+ if not PASS_FILE.is_file():
+ sys.exit(f"error: missing password file {PASS_FILE}")
+ if not CERT_FILE.is_file():
+ sys.exit(f"error: missing bridge cert {CERT_FILE}")
+ ctx = ssl.create_default_context(cafile=str(CERT_FILE))
+ ctx.check_hostname = False
+ try:
+ smtp = smtplib.SMTP(HOST, SMTP_PORT)
+ except OSError as e:
+ sys.exit(f"error: cannot reach Bridge SMTP at {HOST}:{SMTP_PORT} ({e}). "
+ f"Is protonmail-bridge running?")
+ smtp.starttls(context=ctx)
+ password = PASS_FILE.read_text().strip()
+ try:
+ smtp.login(USER, password)
+ except smtplib.SMTPException as e:
+ sys.exit(f"error: SMTP login failed for {USER}: {e}")
+ return smtp
+
+
+def cmd_list_unread(args):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=True)
+ typ, data = M.uid("SEARCH", None, "UNSEEN")
+ if typ != "OK":
+ sys.exit(f"error: search failed: {data}")
+ uids = data[0].split() if data and data[0] else []
+ if args.limit and len(uids) > args.limit:
+ uids = uids[-args.limit:]
+ out = []
+ for uid in uids:
+ uid_s = uid.decode()
+ typ, data = M.uid(
+ "FETCH", uid,
+ "(BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)] "
+ "FLAGS RFC822.SIZE)"
+ )
+ if typ != "OK" or not data or not data[0]:
+ continue
+ # FLAGS / RFC822.SIZE may arrive in a non-tuple chunk after
+ # the BODY literal closes. Concatenate all chunks before
+ # parsing so the parser sees the full metadata.
+ hdr_raw = b""
+ meta_str = ""
+ for chunk in data:
+ if isinstance(chunk, tuple):
+ hdr_raw = chunk[1]
+ meta_str += chunk[0].decode("utf-8", errors="replace") + " "
+ elif isinstance(chunk, (bytes, bytearray)):
+ meta_str += chunk.decode("utf-8", errors="replace") + " "
+ parsed = parse_fetch_metadata(meta_str)
+ msg = email.message_from_bytes(hdr_raw, policy=default_policy)
+ out.append({
+ "uid": uid_s,
+ "from": _decode_header(msg.get("From")),
+ "to": _decode_header(msg.get("To")),
+ "subject": _decode_header(msg.get("Subject")),
+ "date": _decode_header(msg.get("Date")),
+ "flags": parsed["flags"],
+ "size": parsed["size"],
+ })
+ print(json.dumps(out, indent=2, ensure_ascii=False))
+ finally:
+ M.logout()
+
+
+def cmd_read(args):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=True)
+ typ, data = M.uid("FETCH", str(args.uid).encode(), "(RFC822)")
+ if typ != "OK" or not data or not data[0]:
+ sys.exit(f"error: uid {args.uid} not found in {INBOX}")
+ raw = data[0][1]
+ msg = email.message_from_bytes(raw, policy=default_policy)
+ for h in ("From", "To", "Cc", "Date", "Subject"):
+ v = msg.get(h)
+ if v:
+ print(f"{h}: {v}")
+ print()
+ body = extract_body(msg)
+ print(body if body is not None else "<no body>")
+ finally:
+ M.logout()
+
+
+def _store(uids, op, flags):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=False)
+ for uid in uids:
+ typ, data = M.uid("STORE", str(uid).encode(), op, flags)
+ if typ != "OK":
+ sys.exit(f"error: STORE {op} {flags} on uid {uid} failed: {data}")
+ print(f"ok: STORE {op} {flags} on {len(uids)} uid(s)")
+ finally:
+ M.logout()
+
+
+def cmd_mark_read(args):
+ _store(args.uids, "+FLAGS", r"(\Seen)")
+
+
+def cmd_mark_unread(args):
+ _store(args.uids, "-FLAGS", r"(\Seen)")
+
+
+def cmd_star(args):
+ # Workflow convention: starring also marks read (matches the Gmail flow).
+ _store(args.uids, "+FLAGS", r"(\Flagged \Seen)")
+
+
+def cmd_unstar(args):
+ _store(args.uids, "-FLAGS", r"(\Flagged)")
+
+
+def cmd_trash(args):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=False)
+ moved = 0
+ for uid in args.uids:
+ typ, data = M.uid("MOVE", str(uid).encode(), TRASH)
+ if typ != "OK":
+ # Fallback for servers without RFC 6851 MOVE.
+ typ2, data2 = M.uid("COPY", str(uid).encode(), TRASH)
+ if typ2 != "OK":
+ sys.exit(f"error: COPY uid {uid} -> {TRASH} failed: {data2}")
+ M.uid("STORE", str(uid).encode(), "+FLAGS", r"(\Deleted)")
+ moved += 1
+ M.expunge()
+ print(f"ok: moved {moved} uid(s) to {TRASH}")
+ finally:
+ M.logout()
+
+
+def cmd_send(args):
+ # Resolve attachments first so a missing file fails before SMTP opens.
+ attachments = [load_attachment(Path(p)) for p in (args.attach or [])]
+ if args.body is not None:
+ body = args.body
+ elif args.body_file is not None:
+ body = Path(args.body_file).read_text()
+ else:
+ body = sys.stdin.read()
+ msg = build_message(USER, args.to, args.subject, body, attachments)
+ smtp = smtp_connect()
+ try:
+ smtp.send_message(msg)
+ print(f"ok: sent to {args.to}")
+ finally:
+ smtp.quit()
+
+
+def cmd_folders(_args):
+ M = connect()
+ try:
+ typ, data = M.list()
+ if typ != "OK":
+ sys.exit(f"error: LIST failed: {data}")
+ for line in data:
+ print(line.decode("utf-8", errors="replace"))
+ finally:
+ M.logout()
+
+
+def main():
+ p = argparse.ArgumentParser(prog="cmail-action",
+ description="IMAP triage against Proton Bridge")
+ sp = p.add_subparsers(dest="cmd", required=True)
+
+ p_list = sp.add_parser("list-unread", help="list unread INBOX messages as JSON")
+ p_list.add_argument("--limit", type=int, default=50,
+ help="cap to N most recent (default 50)")
+ p_list.set_defaults(func=cmd_list_unread)
+
+ p_read = sp.add_parser("read", help="print headers + body of a UID")
+ p_read.add_argument("uid", type=int)
+ p_read.set_defaults(func=cmd_read)
+
+ p_mr = sp.add_parser("mark-read")
+ p_mr.add_argument("uids", nargs="+", type=int)
+ p_mr.set_defaults(func=cmd_mark_read)
+
+ p_mu = sp.add_parser("mark-unread")
+ p_mu.add_argument("uids", nargs="+", type=int)
+ p_mu.set_defaults(func=cmd_mark_unread)
+
+ p_s = sp.add_parser("star", help="star (sets \\Flagged + \\Seen)")
+ p_s.add_argument("uids", nargs="+", type=int)
+ p_s.set_defaults(func=cmd_star)
+
+ p_us = sp.add_parser("unstar")
+ p_us.add_argument("uids", nargs="+", type=int)
+ p_us.set_defaults(func=cmd_unstar)
+
+ p_t = sp.add_parser("trash", help="MOVE uid(s) to Trash")
+ p_t.add_argument("uids", nargs="+", type=int)
+ p_t.set_defaults(func=cmd_trash)
+
+ p_f = sp.add_parser("folders", help="list IMAP folders (debug)")
+ p_f.set_defaults(func=cmd_folders)
+
+ p_send = sp.add_parser("send", help="send an email via Bridge SMTP")
+ p_send.add_argument("--to", required=True, help="recipient address")
+ p_send.add_argument("--subject", required=True)
+ body_group = p_send.add_mutually_exclusive_group()
+ body_group.add_argument("--body", help="body text inline")
+ body_group.add_argument("--body-file", help="path to a file whose "
+ "contents become the body")
+ p_send.add_argument("--attach", action="append", default=[],
+ help="path to attach (repeatable)")
+ p_send.set_defaults(func=cmd_send)
+
+ args = p.parse_args()
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/.ai/scripts/tests/test_cmail_action.py b/.ai/scripts/tests/test_cmail_action.py
new file mode 100644
index 0000000..3f77ca3
--- /dev/null
+++ b/.ai/scripts/tests/test_cmail_action.py
@@ -0,0 +1,669 @@
+"""Tests for cmail-action.py.
+
+Covers:
+- Pure helpers: parse_fetch_metadata, extract_body, _decode_header
+- I/O commands: cmd_list_unread, cmd_read, cmd_trash, _store wrappers,
+ cmd_folders
+- Argparse dispatch (subprocess --help)
+
+Strategy: import the script via importlib.util (filename has a hyphen,
+so a regular `import cmail_action` won't work). Patch
+cmail_action.connect to return a configured MagicMock IMAP4 instance
+for the I/O tests. connect() itself is testability-blocked (network +
+SSL + file I/O); manual smoke testing covers it.
+"""
+
+from __future__ import annotations
+
+import email
+import importlib.util
+import json
+import subprocess
+import sys
+from email.message import EmailMessage
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.policy import default as default_policy
+from pathlib import Path
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+SCRIPT_PATH = Path(__file__).resolve().parent.parent / "cmail-action.py"
+
+
+def _load_module():
+ spec = importlib.util.spec_from_file_location("cmail_action", str(SCRIPT_PATH))
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
+
+
+@pytest.fixture(scope="module")
+def cmail_action():
+ return _load_module()
+
+
+# ---------------------------------------------------------------------------
+# parse_fetch_metadata — pure
+# ---------------------------------------------------------------------------
+
+class TestParseFetchMetadata:
+
+ def test_normal_flags_and_size(self, cmail_action):
+ meta = "1 (FLAGS (\\Seen) RFC822.SIZE 12345)"
+ assert cmail_action.parse_fetch_metadata(meta) == {
+ "flags": "\\Seen",
+ "size": 12345,
+ }
+
+ def test_boundary_empty_flags_zero_size(self, cmail_action):
+ meta = "1 (FLAGS () RFC822.SIZE 0)"
+ assert cmail_action.parse_fetch_metadata(meta) == {
+ "flags": "",
+ "size": 0,
+ }
+
+ def test_boundary_multiple_flags(self, cmail_action):
+ meta = "1 (FLAGS (\\Seen \\Flagged \\Recent) RFC822.SIZE 999)"
+ result = cmail_action.parse_fetch_metadata(meta)
+ assert result["flags"] == "\\Seen \\Flagged \\Recent"
+ assert result["size"] == 999
+
+ def test_boundary_no_size_key(self, cmail_action):
+ meta = "1 (FLAGS (\\Recent))"
+ result = cmail_action.parse_fetch_metadata(meta)
+ assert result["flags"] == "\\Recent"
+ assert result["size"] is None
+
+ def test_boundary_no_flags_key(self, cmail_action):
+ meta = "1 (RFC822.SIZE 500)"
+ result = cmail_action.parse_fetch_metadata(meta)
+ assert result["flags"] == ""
+ assert result["size"] == 500
+
+ def test_boundary_metadata_split_across_chunks_concatenated(self, cmail_action):
+ # The bug fix that motivated extracting this helper: imaplib returns
+ # FLAGS / RFC822.SIZE in a non-tuple chunk after the BODY literal
+ # closes. cmd_list_unread now concatenates all chunks, then
+ # parse_fetch_metadata sees the combined string. Verify the parser
+ # handles the combined shape.
+ combined = ("3315 (BODY[HEADER.FIELDS (FROM TO)] {123}"
+ " FLAGS () RFC822.SIZE 65546)")
+ result = cmail_action.parse_fetch_metadata(combined)
+ assert result["flags"] == ""
+ assert result["size"] == 65546
+
+ def test_error_empty_input(self, cmail_action):
+ assert cmail_action.parse_fetch_metadata("") == {"flags": "", "size": None}
+
+ def test_error_malformed_size_value_does_not_raise(self, cmail_action):
+ meta = "1 (RFC822.SIZE notanumber)"
+ result = cmail_action.parse_fetch_metadata(meta)
+ assert result["size"] is None
+
+ def test_error_unclosed_flags_paren_returns_empty_flags(self, cmail_action):
+ # Defensive: parser doesn't find a closing paren after FLAGS (, so
+ # flags stays empty. Size still parses since RFC822.SIZE is found
+ # via the independent token-scan path.
+ meta = "1 (FLAGS (\\Seen RFC822.SIZE 100"
+ result = cmail_action.parse_fetch_metadata(meta)
+ assert result["flags"] == ""
+ assert result["size"] == 100
+
+
+# ---------------------------------------------------------------------------
+# extract_body — pure
+# ---------------------------------------------------------------------------
+
+class TestExtractBody:
+
+ @staticmethod
+ def _multipart_alt(plain="plain text body", html="<p>html body</p>"):
+ # Build with the legacy MIME* constructors, then round-trip
+ # through email.message_from_bytes with the default policy so the
+ # parts are EmailMessage instances with .get_content() — matching
+ # what cmd_read sees when imaplib hands it RFC822 bytes.
+ msg = MIMEMultipart("alternative")
+ if plain is not None:
+ msg.attach(MIMEText(plain, "plain"))
+ if html is not None:
+ msg.attach(MIMEText(html, "html"))
+ return email.message_from_bytes(msg.as_bytes(), policy=default_policy)
+
+ def test_normal_multipart_prefers_text_plain(self, cmail_action):
+ msg = self._multipart_alt(plain="plain wins", html="<p>html loses</p>")
+ assert cmail_action.extract_body(msg) == "plain wins"
+
+ def test_boundary_html_only_multipart_falls_back_to_html(self, cmail_action):
+ msg = self._multipart_alt(plain=None, html="<p>only html</p>")
+ result = cmail_action.extract_body(msg)
+ assert result is not None
+ assert "only html" in result
+
+ def test_boundary_singlepart_returns_content_directly(self, cmail_action):
+ msg = EmailMessage()
+ msg.set_content("single-part body")
+ # set_content adds Content-Type: text/plain by default; result has
+ # a trailing newline from the policy formatter.
+ assert cmail_action.extract_body(msg).strip() == "single-part body"
+
+ def test_error_multipart_with_no_text_parts_returns_none(self, cmail_action):
+ msg = MIMEMultipart("alternative")
+ msg.attach(MIMEApplication(b"binary blob"))
+ # Round-trip for parity with the parser-based path real callers use.
+ parsed = email.message_from_bytes(msg.as_bytes(), policy=default_policy)
+ assert cmail_action.extract_body(parsed) is None
+
+
+# ---------------------------------------------------------------------------
+# _decode_header — pure
+# ---------------------------------------------------------------------------
+
+class TestDecodeHeader:
+
+ def test_normal_string(self, cmail_action):
+ assert cmail_action._decode_header("hello") == "hello"
+
+ def test_boundary_empty_string(self, cmail_action):
+ assert cmail_action._decode_header("") == ""
+
+ def test_boundary_none_returns_empty(self, cmail_action):
+ assert cmail_action._decode_header(None) == ""
+
+ def test_boundary_non_string_coerced_via_str(self, cmail_action):
+ assert cmail_action._decode_header(42) == "42"
+
+
+# ---------------------------------------------------------------------------
+# Helpers for I/O command tests
+# ---------------------------------------------------------------------------
+
+def _build_fetch_response(uid, from_addr="alice@example.com", subject="Hello",
+ size=1500):
+ """Mimic imaplib's FETCH response shape: BODY literal as a tuple,
+ trailing FLAGS/SIZE/close-paren as a separate bytes chunk.
+ """
+ headers = (
+ f"From: {from_addr}\r\n"
+ f"To: c@cjennings.net\r\n"
+ f"Subject: {subject}\r\n"
+ f"Date: Thu, 07 May 2026 12:00:00 -0500\r\n"
+ ).encode()
+ return ("OK", [
+ (f"{uid} (BODY[HEADER.FIELDS (FROM TO SUBJECT DATE)] "
+ f"{{{len(headers)}}}".encode(), headers),
+ f" FLAGS () RFC822.SIZE {size})".encode(),
+ ])
+
+
+# ---------------------------------------------------------------------------
+# cmd_list_unread — mocked imaplib
+# ---------------------------------------------------------------------------
+
+class TestCmdListUnread:
+
+ def test_normal_three_unread(self, cmail_action, capsys):
+ fetch_responses = {
+ b"100": _build_fetch_response("100", "alice@example.com", "Hello", 1500),
+ b"101": _build_fetch_response("101", "bob@example.com", "Howdy", 2000),
+ b"102": _build_fetch_response("102", "carol@example.com", "Hi", 500),
+ }
+
+ def uid_side_effect(cmd, *args):
+ if cmd == "SEARCH":
+ return ("OK", [b"100 101 102"])
+ if cmd == "FETCH":
+ return fetch_responses[args[0]]
+ return ("OK", [b""])
+
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.side_effect = uid_side_effect
+
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_list_unread(SimpleNamespace(limit=50))
+
+ parsed = json.loads(capsys.readouterr().out)
+ assert len(parsed) == 3
+ assert parsed[0]["uid"] == "100"
+ assert parsed[0]["from"] == "alice@example.com"
+ assert parsed[0]["subject"] == "Hello"
+ assert parsed[0]["size"] == 1500
+ assert parsed[2]["uid"] == "102"
+
+ def test_boundary_zero_unread(self, cmail_action, capsys):
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.side_effect = lambda cmd, *a: ("OK", [b""])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_list_unread(SimpleNamespace(limit=50))
+ assert json.loads(capsys.readouterr().out) == []
+
+ def test_boundary_single_unread(self, cmail_action, capsys):
+ def uid_se(cmd, *args):
+ if cmd == "SEARCH":
+ return ("OK", [b"42"])
+ if cmd == "FETCH":
+ return _build_fetch_response("42", "x@y", "Solo", 100)
+ return ("OK", [b""])
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.side_effect = uid_se
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_list_unread(SimpleNamespace(limit=50))
+ parsed = json.loads(capsys.readouterr().out)
+ assert len(parsed) == 1
+ assert parsed[0]["uid"] == "42"
+
+ def test_boundary_limit_truncates_to_most_recent(self, cmail_action, capsys):
+ # 10 unread, limit=3 — keeps the last 3 (most recent).
+ all_uids = [str(i).encode() for i in range(100, 110)]
+
+ def uid_se(cmd, *args):
+ if cmd == "SEARCH":
+ return ("OK", [b" ".join(all_uids)])
+ if cmd == "FETCH":
+ return _build_fetch_response(args[0].decode(), "x@y", "S", 100)
+ return ("OK", [b""])
+
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.side_effect = uid_se
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_list_unread(SimpleNamespace(limit=3))
+ parsed = json.loads(capsys.readouterr().out)
+ assert [p["uid"] for p in parsed] == ["107", "108", "109"]
+
+ def test_error_search_returns_no(self, cmail_action):
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.return_value = ("NO", [b"server error"])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ with pytest.raises(SystemExit):
+ cmail_action.cmd_list_unread(SimpleNamespace(limit=50))
+
+
+# ---------------------------------------------------------------------------
+# cmd_read — mocked imaplib
+# ---------------------------------------------------------------------------
+
+class TestCmdRead:
+
+ @staticmethod
+ def _rfc822(body="hello world", subject="Test"):
+ msg = EmailMessage()
+ msg["From"] = "alice@example.com"
+ msg["To"] = "c@cjennings.net"
+ msg["Subject"] = subject
+ msg["Date"] = "Thu, 07 May 2026 12:00:00 -0500"
+ msg.set_content(body)
+ return bytes(msg)
+
+ def test_normal_prints_headers_and_body(self, cmail_action, capsys):
+ raw = self._rfc822(body="body content here", subject="subj")
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.return_value = ("OK", [(b"1 (RFC822 {N}", raw)])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_read(SimpleNamespace(uid=42))
+ out = capsys.readouterr().out
+ assert "From: alice@example.com" in out
+ assert "Subject: subj" in out
+ assert "body content here" in out
+
+ def test_error_uid_not_found(self, cmail_action):
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ # imaplib's shape when the UID has no match: ('OK', [None])
+ mock_imap.uid.return_value = ("OK", [None])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ with pytest.raises(SystemExit):
+ cmail_action.cmd_read(SimpleNamespace(uid=999999))
+
+
+# ---------------------------------------------------------------------------
+# _store wrappers — STORE command shape verification
+# ---------------------------------------------------------------------------
+
+class TestStoreCommands:
+
+ @staticmethod
+ def _capture_calls(cmail_action, cmd_func, uids, store_typ="OK"):
+ calls = []
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+
+ def uid_se(cmd, uid, op, flags):
+ calls.append((cmd, op, flags))
+ return (store_typ, [b""])
+
+ mock_imap.uid.side_effect = uid_se
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmd_func(SimpleNamespace(uids=uids))
+ return calls
+
+ def test_normal_mark_read_uses_plus_seen(self, cmail_action):
+ calls = self._capture_calls(cmail_action, cmail_action.cmd_mark_read, [42])
+ assert calls == [("STORE", "+FLAGS", r"(\Seen)")]
+
+ def test_normal_mark_unread_uses_minus_seen(self, cmail_action):
+ calls = self._capture_calls(cmail_action, cmail_action.cmd_mark_unread, [42])
+ assert calls == [("STORE", "-FLAGS", r"(\Seen)")]
+
+ def test_normal_star_uses_plus_flagged_and_seen(self, cmail_action):
+ calls = self._capture_calls(cmail_action, cmail_action.cmd_star, [42])
+ assert calls == [("STORE", "+FLAGS", r"(\Flagged \Seen)")]
+
+ def test_normal_unstar_uses_minus_flagged(self, cmail_action):
+ calls = self._capture_calls(cmail_action, cmail_action.cmd_unstar, [42])
+ assert calls == [("STORE", "-FLAGS", r"(\Flagged)")]
+
+ def test_boundary_multi_uid_calls_store_per_uid(self, cmail_action):
+ calls = self._capture_calls(
+ cmail_action, cmail_action.cmd_mark_read, [1, 2, 3]
+ )
+ assert len(calls) == 3
+ assert all(c == ("STORE", "+FLAGS", r"(\Seen)") for c in calls)
+
+ def test_error_store_failure_raises_systemexit(self, cmail_action):
+ with pytest.raises(SystemExit):
+ self._capture_calls(
+ cmail_action, cmail_action.cmd_mark_read, [42], store_typ="NO"
+ )
+
+
+# ---------------------------------------------------------------------------
+# cmd_trash — MOVE happy path + COPY+DELETE+EXPUNGE fallback
+# ---------------------------------------------------------------------------
+
+class TestCmdTrash:
+
+ def test_normal_move_succeeds_and_expunges(self, cmail_action):
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.return_value = ("OK", [b""])
+ mock_imap.expunge.return_value = ("OK", [b""])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_trash(SimpleNamespace(uids=[100, 101]))
+ move_calls = [c for c in mock_imap.uid.call_args_list
+ if c[0][0] == "MOVE"]
+ assert len(move_calls) == 2
+ assert mock_imap.expunge.called
+
+ def test_boundary_move_fails_falls_back_to_copy_then_delete(self, cmail_action):
+ # MOVE returns NO -> fallback path: COPY, then STORE +FLAGS \Deleted,
+ # then EXPUNGE. Verify the sequence executes as documented.
+ seen_cmds = []
+
+ def uid_se(cmd, *args):
+ seen_cmds.append(cmd)
+ if cmd == "MOVE":
+ return ("NO", [b"not supported"])
+ return ("OK", [b""])
+
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.side_effect = uid_se
+ mock_imap.expunge.return_value = ("OK", [b""])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_trash(SimpleNamespace(uids=[100]))
+ assert seen_cmds == ["MOVE", "COPY", "STORE"]
+ assert mock_imap.expunge.called
+
+ def test_error_copy_also_fails(self, cmail_action):
+ mock_imap = MagicMock()
+ mock_imap.select.return_value = ("OK", [b""])
+ mock_imap.uid.side_effect = lambda cmd, *a: ("NO", [b"both fail"])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ with pytest.raises(SystemExit):
+ cmail_action.cmd_trash(SimpleNamespace(uids=[100]))
+
+
+# ---------------------------------------------------------------------------
+# cmd_folders
+# ---------------------------------------------------------------------------
+
+class TestCmdFolders:
+
+ def test_normal_lists_folders(self, cmail_action, capsys):
+ mock_imap = MagicMock()
+ mock_imap.list.return_value = ("OK", [
+ b'(\\HasNoChildren) "/" "INBOX"',
+ b'(\\HasNoChildren \\Trash) "/" "Trash"',
+ ])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ cmail_action.cmd_folders(SimpleNamespace())
+ out = capsys.readouterr().out
+ assert "INBOX" in out
+ assert "Trash" in out
+
+ def test_error_list_returns_no(self, cmail_action):
+ mock_imap = MagicMock()
+ mock_imap.list.return_value = ("NO", [b"server error"])
+ with patch.object(cmail_action, "connect", return_value=mock_imap):
+ with pytest.raises(SystemExit):
+ cmail_action.cmd_folders(SimpleNamespace())
+
+
+# ---------------------------------------------------------------------------
+# build_message — pure
+# ---------------------------------------------------------------------------
+
+class TestBuildMessage:
+
+ def test_normal_no_attachments_is_singlepart(self, cmail_action):
+ msg = cmail_action.build_message(
+ from_addr="c@cjennings.net",
+ to_addr="recipient@example.com",
+ subject="Hello",
+ body="hello world",
+ )
+ assert msg["From"] == "c@cjennings.net"
+ assert msg["To"] == "recipient@example.com"
+ assert msg["Subject"] == "Hello"
+ assert not msg.is_multipart()
+ assert msg.get_content().strip() == "hello world"
+ assert msg.get_content_type() == "text/plain"
+
+ def test_normal_one_attachment_makes_multipart(self, cmail_action):
+ attachment = ("report.txt", "text", "plain", b"line1\nline2\n")
+ msg = cmail_action.build_message(
+ from_addr="c@cjennings.net",
+ to_addr="recipient@example.com",
+ subject="With file",
+ body="see attached",
+ attachments=[attachment],
+ )
+ assert msg.is_multipart()
+ # Find the attachment part by Content-Disposition.
+ attached_parts = [
+ p for p in msg.iter_attachments()
+ if p.get_filename() == "report.txt"
+ ]
+ assert len(attached_parts) == 1
+ att = attached_parts[0]
+ assert att.get_content_type() == "text/plain"
+ assert att.get_content().rstrip("\n") == "line1\nline2"
+
+ def test_boundary_two_attachments(self, cmail_action):
+ atts = [
+ ("a.txt", "text", "plain", b"alpha"),
+ ("b.bin", "application", "octet-stream", b"\x00\x01\x02"),
+ ]
+ msg = cmail_action.build_message(
+ from_addr="c@cjennings.net",
+ to_addr="recipient@example.com",
+ subject="Two files",
+ body="see attached",
+ attachments=atts,
+ )
+ names = sorted(p.get_filename() for p in msg.iter_attachments())
+ assert names == ["a.txt", "b.bin"]
+
+ def test_boundary_empty_body(self, cmail_action):
+ msg = cmail_action.build_message(
+ from_addr="c@cjennings.net",
+ to_addr="recipient@example.com",
+ subject="Empty",
+ body="",
+ )
+ # Body part exists, content is empty (modulo trailing newline).
+ assert msg.get_content().strip() == ""
+
+ def test_boundary_unicode_preserved_through_serialization(self, cmail_action):
+ msg = cmail_action.build_message(
+ from_addr="c@cjennings.net",
+ to_addr="recipient@example.com",
+ subject="日本語 ñ ü",
+ body="café — naïve résumé",
+ )
+ # Round-trip: serialize, parse, check both Subject and body survived.
+ raw = msg.as_bytes()
+ parsed = email.message_from_bytes(raw, policy=default_policy)
+ assert parsed["Subject"] == "日本語 ñ ü"
+ assert "café" in parsed.get_content()
+
+
+# ---------------------------------------------------------------------------
+# load_attachment — file I/O via tmp_path
+# ---------------------------------------------------------------------------
+
+class TestLoadAttachment:
+
+ def test_normal_text_file(self, cmail_action, tmp_path):
+ p = tmp_path / "notes.txt"
+ p.write_text("hello\n")
+ filename, maintype, subtype, content = cmail_action.load_attachment(p)
+ assert filename == "notes.txt"
+ assert maintype == "text"
+ assert subtype == "plain"
+ assert content == b"hello\n"
+
+ def test_normal_pdf_mime_detected(self, cmail_action, tmp_path):
+ p = tmp_path / "doc.pdf"
+ p.write_bytes(b"%PDF-1.4 fake")
+ filename, maintype, subtype, _ = cmail_action.load_attachment(p)
+ assert filename == "doc.pdf"
+ assert (maintype, subtype) == ("application", "pdf")
+
+ def test_boundary_no_extension_falls_back_to_octet_stream(self, cmail_action, tmp_path):
+ p = tmp_path / "README"
+ p.write_text("readme content")
+ filename, maintype, subtype, _ = cmail_action.load_attachment(p)
+ assert filename == "README"
+ assert (maintype, subtype) == ("application", "octet-stream")
+
+ def test_boundary_empty_file(self, cmail_action, tmp_path):
+ p = tmp_path / "empty.txt"
+ p.write_text("")
+ _, _, _, content = cmail_action.load_attachment(p)
+ assert content == b""
+
+ def test_error_missing_file_raises(self, cmail_action, tmp_path):
+ p = tmp_path / "does-not-exist.txt"
+ with pytest.raises(FileNotFoundError):
+ cmail_action.load_attachment(p)
+
+ def test_error_directory_raises(self, cmail_action, tmp_path):
+ with pytest.raises(IsADirectoryError):
+ cmail_action.load_attachment(tmp_path)
+
+
+# ---------------------------------------------------------------------------
+# cmd_send — mocked smtp_connect
+# ---------------------------------------------------------------------------
+
+class TestCmdSend:
+
+ @staticmethod
+ def _args(to="r@example.com", subject="s", body="b", body_file=None,
+ attach=None, stdin=False):
+ return SimpleNamespace(
+ to=to, subject=subject,
+ body=None if stdin else body,
+ body_file=body_file,
+ attach=attach or [],
+ )
+
+ def test_normal_inline_body_calls_send_message(self, cmail_action):
+ mock_smtp = MagicMock()
+ with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp):
+ cmail_action.cmd_send(self._args(
+ to="recipient@example.com",
+ subject="testing cmail action script",
+ body="lorem ipsum dolor sit amet",
+ ))
+ mock_smtp.send_message.assert_called_once()
+ sent = mock_smtp.send_message.call_args[0][0]
+ assert sent["To"] == "recipient@example.com"
+ assert sent["Subject"] == "testing cmail action script"
+ assert sent["From"] == cmail_action.USER
+ assert "lorem ipsum dolor sit amet" in sent.get_content()
+ mock_smtp.quit.assert_called_once()
+
+ def test_boundary_body_from_file(self, cmail_action, tmp_path):
+ body_file = tmp_path / "body.txt"
+ body_file.write_text("body from file")
+ mock_smtp = MagicMock()
+ with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp):
+ cmail_action.cmd_send(self._args(body=None, body_file=str(body_file)))
+ sent = mock_smtp.send_message.call_args[0][0]
+ assert "body from file" in sent.get_content()
+
+ def test_boundary_with_attachment(self, cmail_action, tmp_path):
+ att = tmp_path / "report.txt"
+ att.write_text("attachment content")
+ mock_smtp = MagicMock()
+ with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp):
+ cmail_action.cmd_send(self._args(attach=[str(att)]))
+ sent = mock_smtp.send_message.call_args[0][0]
+ assert sent.is_multipart()
+ atts = list(sent.iter_attachments())
+ assert len(atts) == 1
+ assert atts[0].get_filename() == "report.txt"
+ assert atts[0].get_content().rstrip("\n") == "attachment content"
+
+ def test_error_missing_attachment_exits_before_smtp(self, cmail_action, tmp_path):
+ # Attachment files are validated first; SMTP is never opened on failure.
+ mock_smtp = MagicMock()
+ with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp):
+ with pytest.raises((SystemExit, FileNotFoundError)):
+ cmail_action.cmd_send(self._args(
+ attach=[str(tmp_path / "does-not-exist.txt")]
+ ))
+ mock_smtp.send_message.assert_not_called()
+
+ def test_error_smtp_send_failure_raises(self, cmail_action):
+ import smtplib
+ mock_smtp = MagicMock()
+ mock_smtp.send_message.side_effect = smtplib.SMTPException("boom")
+ with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp):
+ with pytest.raises((SystemExit, smtplib.SMTPException)):
+ cmail_action.cmd_send(self._args())
+
+
+# ---------------------------------------------------------------------------
+# Argparse — black-box subprocess sanity check
+# ---------------------------------------------------------------------------
+
+class TestArgparseShape:
+
+ def test_normal_help_lists_all_subcommands(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH), "--help"],
+ capture_output=True, text=True,
+ )
+ assert result.returncode == 0
+ for sub in ("list-unread", "read", "mark-read", "mark-unread",
+ "star", "unstar", "trash", "folders", "send"):
+ assert sub in result.stdout
+
+ def test_error_no_subcommand_exits_nonzero(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH)],
+ capture_output=True, text=True,
+ )
+ assert result.returncode != 0
diff --git a/.ai/scripts/tests/test_gmail_fetch_attachments.py b/.ai/scripts/tests/test_gmail_fetch_attachments.py
new file mode 100644
index 0000000..b4fba41
--- /dev/null
+++ b/.ai/scripts/tests/test_gmail_fetch_attachments.py
@@ -0,0 +1,420 @@
+"""Tests for gmail-fetch-attachments.py.
+
+Covers:
+- Pure helpers: safe_filename, collect_attachments, load_client_creds
+- File I/O: load_mcp_env, load_refresh_token (tmp_path + monkeypatch on
+ module-level constants CLAUDE_CONFIG and TOKEN_DIR)
+- HTTP wrappers: refresh_access_token, gmail_get (monkeypatch on
+ urllib.request.urlopen)
+- Argparse: --help / missing-args via subprocess
+
+Strategy mirrors test_cmail_action.py: import the script via importlib
+(filename has hyphens), mock at external boundaries, no integration
+test for main() — the components are tested individually.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import subprocess
+import sys
+from pathlib import Path
+from unittest.mock import MagicMock
+
+import pytest
+
+SCRIPT_PATH = Path(__file__).resolve().parent.parent / "gmail-fetch-attachments.py"
+
+
+def _load_module():
+ spec = importlib.util.spec_from_file_location(
+ "gmail_fetch_attachments", str(SCRIPT_PATH)
+ )
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
+
+
+@pytest.fixture(scope="module")
+def gfa():
+ return _load_module()
+
+
+def _mock_urlopen_response(payload):
+ """Build a MagicMock mimicking urllib.request.urlopen()'s context-manager response."""
+ mock_resp = MagicMock()
+ mock_resp.read.return_value = json.dumps(payload).encode()
+ mock_resp.__enter__ = MagicMock(return_value=mock_resp)
+ mock_resp.__exit__ = MagicMock(return_value=False)
+ return mock_resp
+
+
+# ---------------------------------------------------------------------------
+# safe_filename — pure
+# ---------------------------------------------------------------------------
+
+class TestSafeFilename:
+
+ def test_normal_clean_filename(self, gfa):
+ assert gfa.safe_filename("report.pdf") == "report.pdf"
+
+ def test_boundary_forward_slash_replaced_with_underscore(self, gfa):
+ assert gfa.safe_filename("foo/bar.txt") == "foo_bar.txt"
+
+ def test_boundary_backslash_replaced_with_underscore(self, gfa):
+ assert gfa.safe_filename("foo\\bar.txt") == "foo_bar.txt"
+
+ def test_boundary_path_traversal_stripped(self, gfa):
+ # "../etc/passwd" -> after slash replace: ".._etc_passwd"
+ # While loop strips leading "..": "_etc_passwd"
+ assert gfa.safe_filename("../etc/passwd") == "_etc_passwd"
+
+ def test_boundary_dotfile_preserved(self, gfa):
+ # The fix Craig requested: single-dot prefixes survive so dotfiles
+ # like .gitignore aren't silently renamed.
+ assert gfa.safe_filename(".gitignore") == ".gitignore"
+ assert gfa.safe_filename(".env.local") == ".env.local"
+
+ def test_boundary_empty_string(self, gfa):
+ assert gfa.safe_filename("") == ""
+
+ @pytest.mark.parametrize("input_name,expected", [
+ ("..", ""), # single ".." stripped, leaves empty
+ ("...", "."), # one strip leaves a single dot
+ ("....", ""), # two strips leave empty
+ (".....", "."), # two strips leave one dot
+ ])
+ def test_boundary_only_dots(self, gfa, input_name, expected):
+ assert gfa.safe_filename(input_name) == expected
+
+ def test_boundary_double_dot_followed_by_name_stripped(self, gfa):
+ assert gfa.safe_filename("..foo") == "foo"
+
+ def test_boundary_middle_dotdot_preserved(self, gfa):
+ # Only LEADING ".." gets stripped. Mid-string ".." stays.
+ # "foo..bar" has no leading dots, so it's preserved as-is.
+ assert gfa.safe_filename("foo..bar") == "foo..bar"
+
+
+# ---------------------------------------------------------------------------
+# collect_attachments — pure
+# ---------------------------------------------------------------------------
+
+class TestCollectAttachments:
+
+ def test_normal_single_attachment(self, gfa):
+ payload = {
+ "parts": [
+ {"mimeType": "text/plain", "body": {"size": 100}},
+ {"filename": "doc.pdf", "mimeType": "application/pdf",
+ "body": {"attachmentId": "abc123", "size": 5000}},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ assert result == [{
+ "filename": "doc.pdf",
+ "attachmentId": "abc123",
+ "size": 5000,
+ "mimeType": "application/pdf",
+ }]
+
+ def test_boundary_nested_multipart_recursion(self, gfa):
+ payload = {
+ "parts": [
+ {"mimeType": "multipart/mixed", "parts": [
+ {"mimeType": "multipart/alternative", "parts": [
+ {"filename": "deep.pdf", "mimeType": "application/pdf",
+ "body": {"attachmentId": "deep1", "size": 100}},
+ ]},
+ ]},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ assert len(result) == 1
+ assert result[0]["filename"] == "deep.pdf"
+ assert result[0]["attachmentId"] == "deep1"
+
+ def test_boundary_no_attachments_returns_empty(self, gfa):
+ payload = {
+ "parts": [
+ {"mimeType": "text/plain", "body": {"size": 100}},
+ {"mimeType": "text/html", "body": {"size": 200}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_inline_image_no_filename_skipped(self, gfa):
+ # Inline images embedded via cid: typically have an attachmentId
+ # but no filename. The "user-visible attachments" heuristic skips
+ # them so they don't litter the output dir as image001.png.
+ payload = {
+ "parts": [
+ {"mimeType": "image/png",
+ "body": {"attachmentId": "inline1", "size": 500}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_empty_filename_skipped(self, gfa):
+ # Empty-string filename also skipped (truthy check).
+ payload = {
+ "parts": [
+ {"filename": "", "mimeType": "image/png",
+ "body": {"attachmentId": "empty1", "size": 500}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_filename_without_attachment_id_skipped(self, gfa):
+ # A part with a filename but no attachmentId isn't a separately
+ # downloadable attachment — it's inline content with a name.
+ payload = {
+ "parts": [
+ {"filename": "fake.txt", "mimeType": "text/plain",
+ "body": {"size": 100}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_multiple_attachments_at_different_depths(self, gfa):
+ payload = {
+ "parts": [
+ {"filename": "top.pdf", "mimeType": "application/pdf",
+ "body": {"attachmentId": "top1", "size": 100}},
+ {"mimeType": "multipart/mixed", "parts": [
+ {"filename": "nested.txt", "mimeType": "text/plain",
+ "body": {"attachmentId": "nested1", "size": 50}},
+ ]},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ names = sorted(r["filename"] for r in result)
+ assert names == ["nested.txt", "top.pdf"]
+
+ def test_boundary_default_mimetype_when_missing(self, gfa):
+ payload = {
+ "parts": [
+ {"filename": "x.bin",
+ "body": {"attachmentId": "x1", "size": 10}},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ assert result[0]["mimeType"] == "application/octet-stream"
+
+ def test_error_empty_payload(self, gfa):
+ assert gfa.collect_attachments({}) == []
+
+ def test_error_payload_with_null_parts(self, gfa):
+ # Defensive: parts = None falls through to empty list via `or []`.
+ payload = {"parts": None}
+ assert gfa.collect_attachments(payload) == []
+
+
+# ---------------------------------------------------------------------------
+# load_client_creds — pure
+# ---------------------------------------------------------------------------
+
+class TestLoadClientCreds:
+
+ def test_normal_both_credentials_present(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": "secret456"}
+ assert gfa.load_client_creds(env) == ("cid123", "secret456")
+
+ def test_error_missing_client_id(self, gfa):
+ env = {"GOOGLE_CLIENT_SECRET": "secret456"}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+ def test_error_missing_client_secret(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "cid123"}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+ def test_error_empty_client_id(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "", "GOOGLE_CLIENT_SECRET": "secret456"}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+ def test_error_empty_client_secret(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": ""}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+
+# ---------------------------------------------------------------------------
+# load_mcp_env — file I/O via tmp_path + monkeypatch CLAUDE_CONFIG
+# ---------------------------------------------------------------------------
+
+class TestLoadMcpEnv:
+
+ @staticmethod
+ def _write_config(tmp_path, monkeypatch, gfa, content):
+ config_path = tmp_path / ".claude.json"
+ config_path.write_text(json.dumps(content))
+ monkeypatch.setattr(gfa, "CLAUDE_CONFIG", config_path)
+ return config_path
+
+ def test_normal_personal_profile_with_env(self, monkeypatch, gfa, tmp_path):
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {
+ "google-docs-personal": {
+ "env": {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"}
+ }
+ }
+ })
+ env = gfa.load_mcp_env("personal")
+ assert env == {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"}
+
+ def test_boundary_server_present_no_env_key(self, monkeypatch, gfa, tmp_path):
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {"google-docs-work": {}}
+ })
+ assert gfa.load_mcp_env("work") == {}
+
+ def test_boundary_env_explicitly_null(self, monkeypatch, gfa, tmp_path):
+ # The `or {}` defends against null env. Returns empty dict, not None.
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {"google-docs-personal": {"env": None}}
+ })
+ assert gfa.load_mcp_env("personal") == {}
+
+ def test_error_config_file_missing(self, monkeypatch, gfa, tmp_path):
+ monkeypatch.setattr(gfa, "CLAUDE_CONFIG", tmp_path / "nope.json")
+ with pytest.raises(SystemExit):
+ gfa.load_mcp_env("personal")
+
+ def test_error_server_not_in_config(self, monkeypatch, gfa, tmp_path):
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {"google-docs-personal": {"env": {}}}
+ })
+ with pytest.raises(SystemExit):
+ gfa.load_mcp_env("work")
+
+
+# ---------------------------------------------------------------------------
+# load_refresh_token — file I/O via tmp_path + monkeypatch TOKEN_DIR
+# ---------------------------------------------------------------------------
+
+class TestLoadRefreshToken:
+
+ @staticmethod
+ def _setup_token(tmp_path, monkeypatch, gfa, profile=None, content=None):
+ token_dir = tmp_path / "google-docs-mcp"
+ token_dir.mkdir()
+ if profile:
+ (token_dir / profile).mkdir()
+ token_path = token_dir / profile / "token.json"
+ else:
+ token_path = token_dir / "token.json"
+ if content is not None:
+ token_path.write_text(json.dumps(content))
+ monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir)
+ return token_path
+
+ def test_normal_no_profile_token_at_root(self, monkeypatch, gfa, tmp_path):
+ self._setup_token(tmp_path, monkeypatch, gfa,
+ content={"refresh_token": "rt-root"})
+ assert gfa.load_refresh_token({}) == "rt-root"
+
+ def test_boundary_with_profile_subdir(self, monkeypatch, gfa, tmp_path):
+ self._setup_token(tmp_path, monkeypatch, gfa, profile="personal",
+ content={"refresh_token": "rt-personal"})
+ assert gfa.load_refresh_token(
+ {"GOOGLE_MCP_PROFILE": "personal"}
+ ) == "rt-personal"
+
+ def test_boundary_explicit_empty_profile_falls_back_to_root(
+ self, monkeypatch, gfa, tmp_path):
+ # GOOGLE_MCP_PROFILE="" is treated the same as the key being missing —
+ # both fall back to TOKEN_DIR/token.json. Pinning both shapes so a
+ # future refactor that drops `or ""` doesn't silently break this.
+ self._setup_token(tmp_path, monkeypatch, gfa,
+ content={"refresh_token": "rt-root"})
+ assert gfa.load_refresh_token({"GOOGLE_MCP_PROFILE": ""}) == "rt-root"
+
+ def test_error_token_file_missing(self, monkeypatch, gfa, tmp_path):
+ token_dir = tmp_path / "google-docs-mcp"
+ token_dir.mkdir()
+ monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir)
+ with pytest.raises(SystemExit):
+ gfa.load_refresh_token({})
+
+ def test_error_no_refresh_token_field_in_file(self, monkeypatch, gfa, tmp_path):
+ self._setup_token(tmp_path, monkeypatch, gfa,
+ content={"access_token": "at-only"})
+ with pytest.raises(SystemExit):
+ gfa.load_refresh_token({})
+
+
+# ---------------------------------------------------------------------------
+# refresh_access_token — mocked urllib
+# ---------------------------------------------------------------------------
+
+class TestRefreshAccessToken:
+
+ def test_normal_returns_access_token(self, monkeypatch, gfa):
+ mock_urlopen = MagicMock(
+ return_value=_mock_urlopen_response({"access_token": "at-new"})
+ )
+ monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
+ result = gfa.refresh_access_token("rt-val", "cid-val", "sec-val")
+ assert result == "at-new"
+ # Verify the request shape: URL, body grant_type and refresh_token.
+ req = mock_urlopen.call_args[0][0]
+ assert req.full_url == gfa.OAUTH_TOKEN_URL
+ body = req.data.decode()
+ assert "grant_type=refresh_token" in body
+ assert "refresh_token=rt-val" in body
+ assert "client_id=cid-val" in body
+
+ def test_error_response_missing_access_token(self, monkeypatch, gfa):
+ mock_urlopen = MagicMock(
+ return_value=_mock_urlopen_response({"error": "invalid_grant"})
+ )
+ monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
+ with pytest.raises(SystemExit):
+ gfa.refresh_access_token("rt", "cid", "sec")
+
+
+# ---------------------------------------------------------------------------
+# gmail_get — mocked urllib
+# ---------------------------------------------------------------------------
+
+class TestGmailGet:
+
+ def test_normal_returns_parsed_json_with_bearer_header(self, monkeypatch, gfa):
+ mock_urlopen = MagicMock(
+ return_value=_mock_urlopen_response({"id": "msg123", "snippet": "hi"})
+ )
+ monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
+ result = gfa.gmail_get("/messages/msg123", "at-token")
+ assert result == {"id": "msg123", "snippet": "hi"}
+ req = mock_urlopen.call_args[0][0]
+ assert req.full_url == f"{gfa.GMAIL_API}/messages/msg123"
+ # urllib.request.Request lowercases header names except the first
+ # char via .capitalize() → "Authorization" stays as "Authorization".
+ assert req.headers["Authorization"] == "Bearer at-token"
+
+
+# ---------------------------------------------------------------------------
+# Argparse — black-box subprocess sanity check
+# ---------------------------------------------------------------------------
+
+class TestArgparseShape:
+
+ def test_normal_help_lists_all_required_args(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH), "--help"],
+ capture_output=True, text=True,
+ )
+ assert result.returncode == 0
+ for flag in ("--profile", "--message-id", "--output-dir"):
+ assert flag in result.stdout
+
+ def test_error_no_args_exits_nonzero(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH)],
+ capture_output=True, text=True,
+ )
+ assert result.returncode != 0
diff --git a/.ai/scripts/tests/test_maildir_flag_manager.py b/.ai/scripts/tests/test_maildir_flag_manager.py
new file mode 100644
index 0000000..268af5b
--- /dev/null
+++ b/.ai/scripts/tests/test_maildir_flag_manager.py
@@ -0,0 +1,310 @@
+"""Tests for maildir-flag-manager.py.
+
+Covers:
+- Pure parsers: parse_maildir_flags, build_flagged_filename
+- File-I/O ops: rename_with_flag, process_maildir, process_specific_files
+ (tmp_path with real maildir directory structures)
+- Subprocess wrapper: reindex_mu (monkeypatch on shutil.which + subprocess.run)
+- Argparse: --help / missing-subcommand via subprocess
+
+The cmd_mark_read / cmd_star orchestrators are intentionally skipped —
+they call the helpers and print summaries; the helpers are tested
+directly so testing the orchestrators would mostly assert call counts.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import subprocess
+import sys
+from pathlib import Path
+from unittest.mock import MagicMock
+
+import pytest
+
+SCRIPT_PATH = Path(__file__).resolve().parent.parent / "maildir-flag-manager.py"
+
+
+def _load_module():
+ spec = importlib.util.spec_from_file_location(
+ "maildir_flag_manager", str(SCRIPT_PATH)
+ )
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
+
+
+@pytest.fixture(scope="module")
+def mfm():
+ return _load_module()
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_maildir(tmp_path: Path, files=None) -> Path:
+ """Construct a maildir at tmp_path/inbox with new/ and cur/ subdirs.
+
+ files is a list of (subdir, filename) tuples. Each becomes an empty
+ file at tmp_path/inbox/<subdir>/<filename>.
+ """
+ inbox = tmp_path / "inbox"
+ (inbox / "new").mkdir(parents=True)
+ (inbox / "cur").mkdir()
+ for subdir, fname in (files or []):
+ (inbox / subdir / fname).write_text("body")
+ return inbox
+
+
+# ---------------------------------------------------------------------------
+# parse_maildir_flags — pure
+# ---------------------------------------------------------------------------
+
+class TestParseMaildirFlags:
+
+ def test_normal_typical_filename(self, mfm):
+ assert mfm.parse_maildir_flags("12345.host:2,FS") == ("12345.host", "FS")
+
+ def test_boundary_no_flag_suffix(self, mfm):
+ # No ":2," in filename — return whole name as base, empty flags.
+ assert mfm.parse_maildir_flags("12345.host") == ("12345.host", "")
+
+ def test_boundary_empty_flags_section(self, mfm):
+ # ":2," with nothing after — base is parsed, flags are empty.
+ assert mfm.parse_maildir_flags("12345.host:2,") == ("12345.host", "")
+
+ def test_boundary_multiple_colons_in_base(self, mfm):
+ # rsplit on the LAST ":2," — base may contain colons or even ":2,"-like
+ # substrings. Real maildir names sometimes have these from migrations.
+ assert mfm.parse_maildir_flags("weird:thing:2,FS") == ("weird:thing", "FS")
+
+ def test_boundary_empty_string(self, mfm):
+ assert mfm.parse_maildir_flags("") == ("", "")
+
+
+# ---------------------------------------------------------------------------
+# build_flagged_filename — pure
+# ---------------------------------------------------------------------------
+
+class TestBuildFlaggedFilename:
+
+ def test_normal_base_plus_flags(self, mfm):
+ assert mfm.build_flagged_filename("12345.host", "FS") == "12345.host:2,FS"
+
+ def test_boundary_replaces_existing_flags(self, mfm):
+ # Existing flags get parsed away — the new_flags arg is the source of truth.
+ assert mfm.build_flagged_filename("12345.host:2,F", "FS") == "12345.host:2,FS"
+
+ def test_boundary_flags_sorted_alphabetically(self, mfm):
+ # Maildir spec requires alphabetical sort. SFR -> FRS.
+ assert mfm.build_flagged_filename("12345.host", "SFR") == "12345.host:2,FRS"
+
+ def test_boundary_duplicate_flags_dedup(self, mfm):
+ # set() dedups before sort. FFS -> FS.
+ assert mfm.build_flagged_filename("12345.host", "FFS") == "12345.host:2,FS"
+
+ def test_boundary_empty_flags(self, mfm):
+ assert mfm.build_flagged_filename("12345.host", "") == "12345.host:2,"
+
+
+# ---------------------------------------------------------------------------
+# rename_with_flag — file I/O via tmp_path
+# ---------------------------------------------------------------------------
+
+class TestRenameWithFlag:
+
+ def test_normal_add_F_to_cur_file_renamed_in_place(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")])
+ original = inbox / "cur" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "F") is True
+ assert not original.exists()
+ assert (inbox / "cur" / "12345.host:2,F").exists()
+
+ def test_boundary_add_S_to_new_file_moves_to_cur(self, mfm, tmp_path):
+ # Maildir spec: messages with Seen flag belong in cur/, not new/.
+ inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")])
+ original = inbox / "new" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "S") is True
+ assert not original.exists()
+ # Should land in cur/, not new/.
+ assert (inbox / "cur" / "12345.host:2,S").exists()
+ assert not (inbox / "new" / "12345.host:2,S").exists()
+
+ def test_boundary_add_F_to_new_file_stays_in_new(self, mfm, tmp_path):
+ # F (Flagged) doesn't trigger the new/ -> cur/ migration; only S does.
+ inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")])
+ original = inbox / "new" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "F") is True
+ assert (inbox / "new" / "12345.host:2,F").exists()
+ assert not (inbox / "cur" / "12345.host:2,F").exists()
+
+ def test_boundary_flag_already_present_returns_false(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,FS")])
+ original = inbox / "cur" / "12345.host:2,FS"
+ assert mfm.rename_with_flag(str(original), "F") is False
+ # Original file unchanged.
+ assert original.exists()
+
+ def test_boundary_dry_run_does_not_modify_filesystem(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")])
+ original = inbox / "cur" / "12345.host:2,"
+ assert mfm.rename_with_flag(str(original), "F", dry_run=True) is True
+ # Original still exists, no new file.
+ assert original.exists()
+ assert not (inbox / "cur" / "12345.host:2,F").exists()
+
+ def test_error_file_path_does_not_exist(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path)
+ with pytest.raises(FileNotFoundError):
+ mfm.rename_with_flag(str(inbox / "cur" / "ghost:2,"), "F")
+
+
+# ---------------------------------------------------------------------------
+# process_maildir — tmp_path
+# ---------------------------------------------------------------------------
+
+class TestProcessMaildir:
+
+ def test_normal_mixed_flagged_and_unflagged(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [
+ ("new", "msg1:2,"),
+ ("new", "msg2:2,"),
+ ("cur", "msg3:2,S"), # already has S, will skip
+ ("cur", "msg4:2,"),
+ ])
+ changed, skipped, errors = mfm.process_maildir(str(inbox), "S")
+ # 3 didn't have S yet, 1 already did.
+ assert (changed, skipped, errors) == (3, 1, 0)
+ # The two from new/ have moved to cur/ (S triggers the migration).
+ assert (inbox / "cur" / "msg1:2,S").exists()
+ assert (inbox / "cur" / "msg2:2,S").exists()
+ assert (inbox / "cur" / "msg4:2,S").exists()
+
+ def test_boundary_empty_maildir(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path)
+ assert mfm.process_maildir(str(inbox), "S") == (0, 0, 0)
+
+ def test_boundary_maildir_does_not_exist(self, mfm, tmp_path, capsys):
+ # Returns (0, 0, 0) and logs a friendly message to stderr.
+ result = mfm.process_maildir(str(tmp_path / "nope"), "S")
+ assert result == (0, 0, 0)
+ err = capsys.readouterr().err
+ assert "Skipping" in err
+
+ def test_boundary_non_file_entries_skipped(self, mfm, tmp_path):
+ # A stray subdirectory in cur/ shouldn't crash the scan.
+ inbox = _make_maildir(tmp_path, [("cur", "msg1:2,")])
+ (inbox / "cur" / "stray-dir").mkdir()
+ changed, skipped, errors = mfm.process_maildir(str(inbox), "S")
+ assert (changed, skipped, errors) == (1, 0, 0)
+
+ def test_boundary_only_new_subdir_present(self, mfm, tmp_path):
+ # If cur/ doesn't exist, the loop just skips it instead of erroring.
+ inbox = tmp_path / "inbox"
+ (inbox / "new").mkdir(parents=True)
+ (inbox / "new" / "msg1:2,").write_text("body")
+ changed, skipped, errors = mfm.process_maildir(str(inbox), "F")
+ assert (changed, skipped, errors) == (1, 0, 0)
+
+
+# ---------------------------------------------------------------------------
+# process_specific_files — tmp_path
+# ---------------------------------------------------------------------------
+
+class TestProcessSpecificFiles:
+
+ def test_normal_paths_in_cur_and_new(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [
+ ("cur", "msg1:2,"),
+ ("new", "msg2:2,"),
+ ])
+ paths = [
+ str(inbox / "cur" / "msg1:2,"),
+ str(inbox / "new" / "msg2:2,"),
+ ]
+ changed, skipped, errors = mfm.process_specific_files(paths, "F")
+ assert (changed, skipped, errors) == (2, 0, 0)
+
+ def test_error_file_not_found(self, mfm, tmp_path, capsys):
+ inbox = _make_maildir(tmp_path)
+ ghost = str(inbox / "cur" / "ghost:2,")
+ changed, skipped, errors = mfm.process_specific_files([ghost], "F")
+ assert errors == 1
+ assert "File not found" in capsys.readouterr().err
+
+ def test_error_file_outside_cur_or_new(self, mfm, tmp_path, capsys):
+ # Path validation: only files whose parent dir is named "cur" or "new"
+ # are accepted. Defends against pointing at the wrong file.
+ bogus = tmp_path / "elsewhere" / "msg1:2,"
+ bogus.parent.mkdir()
+ bogus.write_text("body")
+ changed, skipped, errors = mfm.process_specific_files([str(bogus)], "F")
+ assert errors == 1
+ assert "Not in a maildir" in capsys.readouterr().err
+ # File untouched.
+ assert bogus.exists()
+
+ def test_error_already_set_counted_as_skipped(self, mfm, tmp_path):
+ inbox = _make_maildir(tmp_path, [("cur", "msg1:2,F")])
+ path = str(inbox / "cur" / "msg1:2,F")
+ changed, skipped, errors = mfm.process_specific_files([path], "F")
+ assert (changed, skipped, errors) == (0, 1, 0)
+
+
+# ---------------------------------------------------------------------------
+# reindex_mu — mocked subprocess
+# ---------------------------------------------------------------------------
+
+class TestReindexMu:
+
+ def test_normal_mu_present_returns_true(self, mfm, monkeypatch):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
+ result_obj = MagicMock(returncode=0, stderr="")
+ monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj)
+ assert mfm.reindex_mu() is True
+
+ def test_error_mu_not_in_path_returns_false(self, mfm, monkeypatch, capsys):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: None)
+ assert mfm.reindex_mu() is False
+ assert "mu not found" in capsys.readouterr().err
+
+ def test_error_mu_index_returns_nonzero(self, mfm, monkeypatch, capsys):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
+ result_obj = MagicMock(returncode=1, stderr="db locked")
+ monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj)
+ assert mfm.reindex_mu() is False
+ assert "mu index failed" in capsys.readouterr().err
+
+ def test_error_mu_index_times_out(self, mfm, monkeypatch, capsys):
+ monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu")
+
+ def raise_timeout(*_a, **_kw):
+ raise subprocess.TimeoutExpired(cmd="mu index", timeout=120)
+
+ monkeypatch.setattr(mfm.subprocess, "run", raise_timeout)
+ assert mfm.reindex_mu() is False
+ assert "timed out" in capsys.readouterr().err
+
+
+# ---------------------------------------------------------------------------
+# Argparse — black-box subprocess sanity check
+# ---------------------------------------------------------------------------
+
+class TestArgparseShape:
+
+ def test_normal_help_lists_subcommands(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH), "--help"],
+ capture_output=True, text=True,
+ )
+ assert result.returncode == 0
+ assert "mark-read" in result.stdout
+ assert "star" in result.stdout
+
+ def test_error_no_subcommand_exits_nonzero(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH)],
+ capture_output=True, text=True,
+ )
+ assert result.returncode != 0