From 65c962e01f0ec7cefe70157f4f11f54638cdc995 Mon Sep 17 00:00:00 2001 From: Craig Jennings Date: Fri, 8 May 2026 23:23:31 -0500 Subject: feat: Add cmail IMAP action script and test suite Add cmail-action.py for IMAP triage operations against Proton Mail Bridge (list-unread, read, mark-read, star, unstar, trash, send, folders) mirroring the Gmail MCP workflow. Also add comprehensive tests for cmail-action, gmail-fetch-attachments, and maildir-flag-manager scripts. --- .ai/scripts/cmail-action.py | 387 +++++++++++++ .ai/scripts/tests/test_cmail_action.py | 669 ++++++++++++++++++++++ .ai/scripts/tests/test_gmail_fetch_attachments.py | 420 ++++++++++++++ .ai/scripts/tests/test_maildir_flag_manager.py | 310 ++++++++++ 4 files changed, 1786 insertions(+) create mode 100755 .ai/scripts/cmail-action.py create mode 100644 .ai/scripts/tests/test_cmail_action.py create mode 100644 .ai/scripts/tests/test_gmail_fetch_attachments.py create mode 100644 .ai/scripts/tests/test_maildir_flag_manager.py diff --git a/.ai/scripts/cmail-action.py b/.ai/scripts/cmail-action.py new file mode 100755 index 0000000..10eb215 --- /dev/null +++ b/.ai/scripts/cmail-action.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +cmail-action — IMAP triage operations against Proton Mail Bridge. + +Mirrors the operations the Gmail MCP server provides for gmail/dmail +(list-unread, read, mark-read, star, unstar, trash) so the +process-unread-emails workflow can drive cmail end-to-end the same way. + +Connects to local Proton Bridge IMAP at 127.0.0.1:1143 with STARTTLS, +using the Bridge-generated app password at ~/.config/.cmailpass and the +Bridge self-signed certificate at ~/.config/protonbridge.pem. Cert CN +is 127.0.0.1 but lacks a SubjectAltName, so hostname verification is +disabled (connection is to localhost — verifying via the pinned cert +is sufficient). + +IMAP -> Proton mapping: +- \\Seen flag -> Read state +- \\Flagged flag -> Starred label +- MOVE to Trash -> Trash folder +- COPY to label -> applies the label (Starred etc.) +""" + +import argparse +import email +import imaplib +import json +import mimetypes +import smtplib +import ssl +import sys +from email.message import EmailMessage +from email.policy import default as default_policy +from pathlib import Path + +HOST = "127.0.0.1" +PORT = 1143 +SMTP_PORT = 1025 +USER = "c@cjennings.net" +PASS_FILE = Path.home() / ".config" / ".cmailpass" +CERT_FILE = Path.home() / ".config" / "protonbridge.pem" + +INBOX = "INBOX" +TRASH = "Trash" + + +def connect(): + if not PASS_FILE.is_file(): + sys.exit(f"error: missing password file {PASS_FILE}") + if not CERT_FILE.is_file(): + sys.exit(f"error: missing bridge cert {CERT_FILE}") + ctx = ssl.create_default_context(cafile=str(CERT_FILE)) + ctx.check_hostname = False + try: + M = imaplib.IMAP4(HOST, PORT) + except OSError as e: + sys.exit(f"error: cannot reach Bridge at {HOST}:{PORT} ({e}). " + f"Is protonmail-bridge running? " + f"(systemctl --user status protonmail-bridge)") + M.starttls(ssl_context=ctx) + password = PASS_FILE.read_text().strip() + try: + M.login(USER, password) + except imaplib.IMAP4.error as e: + sys.exit(f"error: IMAP login failed for {USER}: {e}") + return M + + +def _select(M, mailbox=INBOX, readonly=False): + typ, data = M.select(mailbox, readonly=readonly) + if typ != "OK": + sys.exit(f"error: cannot select {mailbox}: {data}") + + +def _decode_header(value): + if value is None: + return "" + return str(value) + + +def parse_fetch_metadata(meta): + """Parse FLAGS and RFC822.SIZE out of an IMAP FETCH metadata string. + + Returns {"flags": str, "size": int | None}. Tolerates malformed input + (returns the defaults rather than raising). + """ + result = {"flags": "", "size": None} + flags_idx = meta.find("FLAGS (") + if flags_idx != -1: + end = meta.find(")", flags_idx) + if end != -1: + result["flags"] = meta[flags_idx + 7:end] + # Tokenize with parens stripped so RFC822.SIZE matches whether or not + # it abuts an opening paren in the raw response (e.g. "(RFC822.SIZE 500)" + # would otherwise tokenize as "(RFC822.SIZE" and miss the equality check). + tokens = meta.replace("(", " ").replace(")", " ").split() + for i, p in enumerate(tokens): + if p == "RFC822.SIZE" and i + 1 < len(tokens): + try: + result["size"] = int(tokens[i + 1]) + except ValueError: + pass + break + return result + + +def extract_body(msg): + """Pick a printable body out of an email.message.EmailMessage. + + Multipart: text/plain preferred, text/html fallback. Single-part: + returns content directly. Returns None if no body found. + """ + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + return part.get_content() + for part in msg.walk(): + if part.get_content_type() == "text/html": + return part.get_content() + return None + return msg.get_content() + + +def build_message(from_addr, to_addr, subject, body, attachments=None): + """Construct an EmailMessage from the given fields and attachments. + + attachments is a list of (filename, maintype, subtype, content_bytes) + tuples — typically the return value of load_attachment per file. Pure + function: no I/O, no SMTP. + """ + msg = EmailMessage() + msg["From"] = from_addr + msg["To"] = to_addr + msg["Subject"] = subject + msg.set_content(body) + for filename, maintype, subtype, content in (attachments or []): + msg.add_attachment(content, maintype=maintype, subtype=subtype, + filename=filename) + return msg + + +def load_attachment(path): + """Read a file and return (filename, maintype, subtype, content_bytes). + + MIME type comes from mimetypes.guess_type; falls back to + application/octet-stream when guess returns None. Raises FileNotFoundError + for missing paths and IsADirectoryError for directories. + """ + if not path.exists(): + raise FileNotFoundError(f"attachment not found: {path}") + if path.is_dir(): + raise IsADirectoryError(f"attachment path is a directory: {path}") + mime, _ = mimetypes.guess_type(path.name) + if mime is None: + maintype, subtype = "application", "octet-stream" + else: + maintype, subtype = mime.split("/", 1) + return (path.name, maintype, subtype, path.read_bytes()) + + +def smtp_connect(): + """Connect to Proton Bridge's local SMTP submission endpoint. + + Mirrors connect()'s pattern: STARTTLS against the pinned cert, + plaintext password from PASS_FILE. Skipped from unit tests for + the same reason connect() is — network + SSL + file I/O. + """ + if not PASS_FILE.is_file(): + sys.exit(f"error: missing password file {PASS_FILE}") + if not CERT_FILE.is_file(): + sys.exit(f"error: missing bridge cert {CERT_FILE}") + ctx = ssl.create_default_context(cafile=str(CERT_FILE)) + ctx.check_hostname = False + try: + smtp = smtplib.SMTP(HOST, SMTP_PORT) + except OSError as e: + sys.exit(f"error: cannot reach Bridge SMTP at {HOST}:{SMTP_PORT} ({e}). " + f"Is protonmail-bridge running?") + smtp.starttls(context=ctx) + password = PASS_FILE.read_text().strip() + try: + smtp.login(USER, password) + except smtplib.SMTPException as e: + sys.exit(f"error: SMTP login failed for {USER}: {e}") + return smtp + + +def cmd_list_unread(args): + M = connect() + try: + _select(M, INBOX, readonly=True) + typ, data = M.uid("SEARCH", None, "UNSEEN") + if typ != "OK": + sys.exit(f"error: search failed: {data}") + uids = data[0].split() if data and data[0] else [] + if args.limit and len(uids) > args.limit: + uids = uids[-args.limit:] + out = [] + for uid in uids: + uid_s = uid.decode() + typ, data = M.uid( + "FETCH", uid, + "(BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)] " + "FLAGS RFC822.SIZE)" + ) + if typ != "OK" or not data or not data[0]: + continue + # FLAGS / RFC822.SIZE may arrive in a non-tuple chunk after + # the BODY literal closes. Concatenate all chunks before + # parsing so the parser sees the full metadata. + hdr_raw = b"" + meta_str = "" + for chunk in data: + if isinstance(chunk, tuple): + hdr_raw = chunk[1] + meta_str += chunk[0].decode("utf-8", errors="replace") + " " + elif isinstance(chunk, (bytes, bytearray)): + meta_str += chunk.decode("utf-8", errors="replace") + " " + parsed = parse_fetch_metadata(meta_str) + msg = email.message_from_bytes(hdr_raw, policy=default_policy) + out.append({ + "uid": uid_s, + "from": _decode_header(msg.get("From")), + "to": _decode_header(msg.get("To")), + "subject": _decode_header(msg.get("Subject")), + "date": _decode_header(msg.get("Date")), + "flags": parsed["flags"], + "size": parsed["size"], + }) + print(json.dumps(out, indent=2, ensure_ascii=False)) + finally: + M.logout() + + +def cmd_read(args): + M = connect() + try: + _select(M, INBOX, readonly=True) + typ, data = M.uid("FETCH", str(args.uid).encode(), "(RFC822)") + if typ != "OK" or not data or not data[0]: + sys.exit(f"error: uid {args.uid} not found in {INBOX}") + raw = data[0][1] + msg = email.message_from_bytes(raw, policy=default_policy) + for h in ("From", "To", "Cc", "Date", "Subject"): + v = msg.get(h) + if v: + print(f"{h}: {v}") + print() + body = extract_body(msg) + print(body if body is not None else "") + finally: + M.logout() + + +def _store(uids, op, flags): + M = connect() + try: + _select(M, INBOX, readonly=False) + for uid in uids: + typ, data = M.uid("STORE", str(uid).encode(), op, flags) + if typ != "OK": + sys.exit(f"error: STORE {op} {flags} on uid {uid} failed: {data}") + print(f"ok: STORE {op} {flags} on {len(uids)} uid(s)") + finally: + M.logout() + + +def cmd_mark_read(args): + _store(args.uids, "+FLAGS", r"(\Seen)") + + +def cmd_mark_unread(args): + _store(args.uids, "-FLAGS", r"(\Seen)") + + +def cmd_star(args): + # Workflow convention: starring also marks read (matches the Gmail flow). + _store(args.uids, "+FLAGS", r"(\Flagged \Seen)") + + +def cmd_unstar(args): + _store(args.uids, "-FLAGS", r"(\Flagged)") + + +def cmd_trash(args): + M = connect() + try: + _select(M, INBOX, readonly=False) + moved = 0 + for uid in args.uids: + typ, data = M.uid("MOVE", str(uid).encode(), TRASH) + if typ != "OK": + # Fallback for servers without RFC 6851 MOVE. + typ2, data2 = M.uid("COPY", str(uid).encode(), TRASH) + if typ2 != "OK": + sys.exit(f"error: COPY uid {uid} -> {TRASH} failed: {data2}") + M.uid("STORE", str(uid).encode(), "+FLAGS", r"(\Deleted)") + moved += 1 + M.expunge() + print(f"ok: moved {moved} uid(s) to {TRASH}") + finally: + M.logout() + + +def cmd_send(args): + # Resolve attachments first so a missing file fails before SMTP opens. + attachments = [load_attachment(Path(p)) for p in (args.attach or [])] + if args.body is not None: + body = args.body + elif args.body_file is not None: + body = Path(args.body_file).read_text() + else: + body = sys.stdin.read() + msg = build_message(USER, args.to, args.subject, body, attachments) + smtp = smtp_connect() + try: + smtp.send_message(msg) + print(f"ok: sent to {args.to}") + finally: + smtp.quit() + + +def cmd_folders(_args): + M = connect() + try: + typ, data = M.list() + if typ != "OK": + sys.exit(f"error: LIST failed: {data}") + for line in data: + print(line.decode("utf-8", errors="replace")) + finally: + M.logout() + + +def main(): + p = argparse.ArgumentParser(prog="cmail-action", + description="IMAP triage against Proton Bridge") + sp = p.add_subparsers(dest="cmd", required=True) + + p_list = sp.add_parser("list-unread", help="list unread INBOX messages as JSON") + p_list.add_argument("--limit", type=int, default=50, + help="cap to N most recent (default 50)") + p_list.set_defaults(func=cmd_list_unread) + + p_read = sp.add_parser("read", help="print headers + body of a UID") + p_read.add_argument("uid", type=int) + p_read.set_defaults(func=cmd_read) + + p_mr = sp.add_parser("mark-read") + p_mr.add_argument("uids", nargs="+", type=int) + p_mr.set_defaults(func=cmd_mark_read) + + p_mu = sp.add_parser("mark-unread") + p_mu.add_argument("uids", nargs="+", type=int) + p_mu.set_defaults(func=cmd_mark_unread) + + p_s = sp.add_parser("star", help="star (sets \\Flagged + \\Seen)") + p_s.add_argument("uids", nargs="+", type=int) + p_s.set_defaults(func=cmd_star) + + p_us = sp.add_parser("unstar") + p_us.add_argument("uids", nargs="+", type=int) + p_us.set_defaults(func=cmd_unstar) + + p_t = sp.add_parser("trash", help="MOVE uid(s) to Trash") + p_t.add_argument("uids", nargs="+", type=int) + p_t.set_defaults(func=cmd_trash) + + p_f = sp.add_parser("folders", help="list IMAP folders (debug)") + p_f.set_defaults(func=cmd_folders) + + p_send = sp.add_parser("send", help="send an email via Bridge SMTP") + p_send.add_argument("--to", required=True, help="recipient address") + p_send.add_argument("--subject", required=True) + body_group = p_send.add_mutually_exclusive_group() + body_group.add_argument("--body", help="body text inline") + body_group.add_argument("--body-file", help="path to a file whose " + "contents become the body") + p_send.add_argument("--attach", action="append", default=[], + help="path to attach (repeatable)") + p_send.set_defaults(func=cmd_send) + + args = p.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/.ai/scripts/tests/test_cmail_action.py b/.ai/scripts/tests/test_cmail_action.py new file mode 100644 index 0000000..3f77ca3 --- /dev/null +++ b/.ai/scripts/tests/test_cmail_action.py @@ -0,0 +1,669 @@ +"""Tests for cmail-action.py. + +Covers: +- Pure helpers: parse_fetch_metadata, extract_body, _decode_header +- I/O commands: cmd_list_unread, cmd_read, cmd_trash, _store wrappers, + cmd_folders +- Argparse dispatch (subprocess --help) + +Strategy: import the script via importlib.util (filename has a hyphen, +so a regular `import cmail_action` won't work). Patch +cmail_action.connect to return a configured MagicMock IMAP4 instance +for the I/O tests. connect() itself is testability-blocked (network + +SSL + file I/O); manual smoke testing covers it. +""" + +from __future__ import annotations + +import email +import importlib.util +import json +import subprocess +import sys +from email.message import EmailMessage +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.policy import default as default_policy +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +SCRIPT_PATH = Path(__file__).resolve().parent.parent / "cmail-action.py" + + +def _load_module(): + spec = importlib.util.spec_from_file_location("cmail_action", str(SCRIPT_PATH)) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def cmail_action(): + return _load_module() + + +# --------------------------------------------------------------------------- +# parse_fetch_metadata — pure +# --------------------------------------------------------------------------- + +class TestParseFetchMetadata: + + def test_normal_flags_and_size(self, cmail_action): + meta = "1 (FLAGS (\\Seen) RFC822.SIZE 12345)" + assert cmail_action.parse_fetch_metadata(meta) == { + "flags": "\\Seen", + "size": 12345, + } + + def test_boundary_empty_flags_zero_size(self, cmail_action): + meta = "1 (FLAGS () RFC822.SIZE 0)" + assert cmail_action.parse_fetch_metadata(meta) == { + "flags": "", + "size": 0, + } + + def test_boundary_multiple_flags(self, cmail_action): + meta = "1 (FLAGS (\\Seen \\Flagged \\Recent) RFC822.SIZE 999)" + result = cmail_action.parse_fetch_metadata(meta) + assert result["flags"] == "\\Seen \\Flagged \\Recent" + assert result["size"] == 999 + + def test_boundary_no_size_key(self, cmail_action): + meta = "1 (FLAGS (\\Recent))" + result = cmail_action.parse_fetch_metadata(meta) + assert result["flags"] == "\\Recent" + assert result["size"] is None + + def test_boundary_no_flags_key(self, cmail_action): + meta = "1 (RFC822.SIZE 500)" + result = cmail_action.parse_fetch_metadata(meta) + assert result["flags"] == "" + assert result["size"] == 500 + + def test_boundary_metadata_split_across_chunks_concatenated(self, cmail_action): + # The bug fix that motivated extracting this helper: imaplib returns + # FLAGS / RFC822.SIZE in a non-tuple chunk after the BODY literal + # closes. cmd_list_unread now concatenates all chunks, then + # parse_fetch_metadata sees the combined string. Verify the parser + # handles the combined shape. + combined = ("3315 (BODY[HEADER.FIELDS (FROM TO)] {123}" + " FLAGS () RFC822.SIZE 65546)") + result = cmail_action.parse_fetch_metadata(combined) + assert result["flags"] == "" + assert result["size"] == 65546 + + def test_error_empty_input(self, cmail_action): + assert cmail_action.parse_fetch_metadata("") == {"flags": "", "size": None} + + def test_error_malformed_size_value_does_not_raise(self, cmail_action): + meta = "1 (RFC822.SIZE notanumber)" + result = cmail_action.parse_fetch_metadata(meta) + assert result["size"] is None + + def test_error_unclosed_flags_paren_returns_empty_flags(self, cmail_action): + # Defensive: parser doesn't find a closing paren after FLAGS (, so + # flags stays empty. Size still parses since RFC822.SIZE is found + # via the independent token-scan path. + meta = "1 (FLAGS (\\Seen RFC822.SIZE 100" + result = cmail_action.parse_fetch_metadata(meta) + assert result["flags"] == "" + assert result["size"] == 100 + + +# --------------------------------------------------------------------------- +# extract_body — pure +# --------------------------------------------------------------------------- + +class TestExtractBody: + + @staticmethod + def _multipart_alt(plain="plain text body", html="

html body

"): + # Build with the legacy MIME* constructors, then round-trip + # through email.message_from_bytes with the default policy so the + # parts are EmailMessage instances with .get_content() — matching + # what cmd_read sees when imaplib hands it RFC822 bytes. + msg = MIMEMultipart("alternative") + if plain is not None: + msg.attach(MIMEText(plain, "plain")) + if html is not None: + msg.attach(MIMEText(html, "html")) + return email.message_from_bytes(msg.as_bytes(), policy=default_policy) + + def test_normal_multipart_prefers_text_plain(self, cmail_action): + msg = self._multipart_alt(plain="plain wins", html="

html loses

") + assert cmail_action.extract_body(msg) == "plain wins" + + def test_boundary_html_only_multipart_falls_back_to_html(self, cmail_action): + msg = self._multipart_alt(plain=None, html="

only html

") + result = cmail_action.extract_body(msg) + assert result is not None + assert "only html" in result + + def test_boundary_singlepart_returns_content_directly(self, cmail_action): + msg = EmailMessage() + msg.set_content("single-part body") + # set_content adds Content-Type: text/plain by default; result has + # a trailing newline from the policy formatter. + assert cmail_action.extract_body(msg).strip() == "single-part body" + + def test_error_multipart_with_no_text_parts_returns_none(self, cmail_action): + msg = MIMEMultipart("alternative") + msg.attach(MIMEApplication(b"binary blob")) + # Round-trip for parity with the parser-based path real callers use. + parsed = email.message_from_bytes(msg.as_bytes(), policy=default_policy) + assert cmail_action.extract_body(parsed) is None + + +# --------------------------------------------------------------------------- +# _decode_header — pure +# --------------------------------------------------------------------------- + +class TestDecodeHeader: + + def test_normal_string(self, cmail_action): + assert cmail_action._decode_header("hello") == "hello" + + def test_boundary_empty_string(self, cmail_action): + assert cmail_action._decode_header("") == "" + + def test_boundary_none_returns_empty(self, cmail_action): + assert cmail_action._decode_header(None) == "" + + def test_boundary_non_string_coerced_via_str(self, cmail_action): + assert cmail_action._decode_header(42) == "42" + + +# --------------------------------------------------------------------------- +# Helpers for I/O command tests +# --------------------------------------------------------------------------- + +def _build_fetch_response(uid, from_addr="alice@example.com", subject="Hello", + size=1500): + """Mimic imaplib's FETCH response shape: BODY literal as a tuple, + trailing FLAGS/SIZE/close-paren as a separate bytes chunk. + """ + headers = ( + f"From: {from_addr}\r\n" + f"To: c@cjennings.net\r\n" + f"Subject: {subject}\r\n" + f"Date: Thu, 07 May 2026 12:00:00 -0500\r\n" + ).encode() + return ("OK", [ + (f"{uid} (BODY[HEADER.FIELDS (FROM TO SUBJECT DATE)] " + f"{{{len(headers)}}}".encode(), headers), + f" FLAGS () RFC822.SIZE {size})".encode(), + ]) + + +# --------------------------------------------------------------------------- +# cmd_list_unread — mocked imaplib +# --------------------------------------------------------------------------- + +class TestCmdListUnread: + + def test_normal_three_unread(self, cmail_action, capsys): + fetch_responses = { + b"100": _build_fetch_response("100", "alice@example.com", "Hello", 1500), + b"101": _build_fetch_response("101", "bob@example.com", "Howdy", 2000), + b"102": _build_fetch_response("102", "carol@example.com", "Hi", 500), + } + + def uid_side_effect(cmd, *args): + if cmd == "SEARCH": + return ("OK", [b"100 101 102"]) + if cmd == "FETCH": + return fetch_responses[args[0]] + return ("OK", [b""]) + + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.side_effect = uid_side_effect + + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_list_unread(SimpleNamespace(limit=50)) + + parsed = json.loads(capsys.readouterr().out) + assert len(parsed) == 3 + assert parsed[0]["uid"] == "100" + assert parsed[0]["from"] == "alice@example.com" + assert parsed[0]["subject"] == "Hello" + assert parsed[0]["size"] == 1500 + assert parsed[2]["uid"] == "102" + + def test_boundary_zero_unread(self, cmail_action, capsys): + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.side_effect = lambda cmd, *a: ("OK", [b""]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_list_unread(SimpleNamespace(limit=50)) + assert json.loads(capsys.readouterr().out) == [] + + def test_boundary_single_unread(self, cmail_action, capsys): + def uid_se(cmd, *args): + if cmd == "SEARCH": + return ("OK", [b"42"]) + if cmd == "FETCH": + return _build_fetch_response("42", "x@y", "Solo", 100) + return ("OK", [b""]) + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.side_effect = uid_se + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_list_unread(SimpleNamespace(limit=50)) + parsed = json.loads(capsys.readouterr().out) + assert len(parsed) == 1 + assert parsed[0]["uid"] == "42" + + def test_boundary_limit_truncates_to_most_recent(self, cmail_action, capsys): + # 10 unread, limit=3 — keeps the last 3 (most recent). + all_uids = [str(i).encode() for i in range(100, 110)] + + def uid_se(cmd, *args): + if cmd == "SEARCH": + return ("OK", [b" ".join(all_uids)]) + if cmd == "FETCH": + return _build_fetch_response(args[0].decode(), "x@y", "S", 100) + return ("OK", [b""]) + + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.side_effect = uid_se + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_list_unread(SimpleNamespace(limit=3)) + parsed = json.loads(capsys.readouterr().out) + assert [p["uid"] for p in parsed] == ["107", "108", "109"] + + def test_error_search_returns_no(self, cmail_action): + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.return_value = ("NO", [b"server error"]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + with pytest.raises(SystemExit): + cmail_action.cmd_list_unread(SimpleNamespace(limit=50)) + + +# --------------------------------------------------------------------------- +# cmd_read — mocked imaplib +# --------------------------------------------------------------------------- + +class TestCmdRead: + + @staticmethod + def _rfc822(body="hello world", subject="Test"): + msg = EmailMessage() + msg["From"] = "alice@example.com" + msg["To"] = "c@cjennings.net" + msg["Subject"] = subject + msg["Date"] = "Thu, 07 May 2026 12:00:00 -0500" + msg.set_content(body) + return bytes(msg) + + def test_normal_prints_headers_and_body(self, cmail_action, capsys): + raw = self._rfc822(body="body content here", subject="subj") + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.return_value = ("OK", [(b"1 (RFC822 {N}", raw)]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_read(SimpleNamespace(uid=42)) + out = capsys.readouterr().out + assert "From: alice@example.com" in out + assert "Subject: subj" in out + assert "body content here" in out + + def test_error_uid_not_found(self, cmail_action): + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + # imaplib's shape when the UID has no match: ('OK', [None]) + mock_imap.uid.return_value = ("OK", [None]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + with pytest.raises(SystemExit): + cmail_action.cmd_read(SimpleNamespace(uid=999999)) + + +# --------------------------------------------------------------------------- +# _store wrappers — STORE command shape verification +# --------------------------------------------------------------------------- + +class TestStoreCommands: + + @staticmethod + def _capture_calls(cmail_action, cmd_func, uids, store_typ="OK"): + calls = [] + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + + def uid_se(cmd, uid, op, flags): + calls.append((cmd, op, flags)) + return (store_typ, [b""]) + + mock_imap.uid.side_effect = uid_se + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmd_func(SimpleNamespace(uids=uids)) + return calls + + def test_normal_mark_read_uses_plus_seen(self, cmail_action): + calls = self._capture_calls(cmail_action, cmail_action.cmd_mark_read, [42]) + assert calls == [("STORE", "+FLAGS", r"(\Seen)")] + + def test_normal_mark_unread_uses_minus_seen(self, cmail_action): + calls = self._capture_calls(cmail_action, cmail_action.cmd_mark_unread, [42]) + assert calls == [("STORE", "-FLAGS", r"(\Seen)")] + + def test_normal_star_uses_plus_flagged_and_seen(self, cmail_action): + calls = self._capture_calls(cmail_action, cmail_action.cmd_star, [42]) + assert calls == [("STORE", "+FLAGS", r"(\Flagged \Seen)")] + + def test_normal_unstar_uses_minus_flagged(self, cmail_action): + calls = self._capture_calls(cmail_action, cmail_action.cmd_unstar, [42]) + assert calls == [("STORE", "-FLAGS", r"(\Flagged)")] + + def test_boundary_multi_uid_calls_store_per_uid(self, cmail_action): + calls = self._capture_calls( + cmail_action, cmail_action.cmd_mark_read, [1, 2, 3] + ) + assert len(calls) == 3 + assert all(c == ("STORE", "+FLAGS", r"(\Seen)") for c in calls) + + def test_error_store_failure_raises_systemexit(self, cmail_action): + with pytest.raises(SystemExit): + self._capture_calls( + cmail_action, cmail_action.cmd_mark_read, [42], store_typ="NO" + ) + + +# --------------------------------------------------------------------------- +# cmd_trash — MOVE happy path + COPY+DELETE+EXPUNGE fallback +# --------------------------------------------------------------------------- + +class TestCmdTrash: + + def test_normal_move_succeeds_and_expunges(self, cmail_action): + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.return_value = ("OK", [b""]) + mock_imap.expunge.return_value = ("OK", [b""]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_trash(SimpleNamespace(uids=[100, 101])) + move_calls = [c for c in mock_imap.uid.call_args_list + if c[0][0] == "MOVE"] + assert len(move_calls) == 2 + assert mock_imap.expunge.called + + def test_boundary_move_fails_falls_back_to_copy_then_delete(self, cmail_action): + # MOVE returns NO -> fallback path: COPY, then STORE +FLAGS \Deleted, + # then EXPUNGE. Verify the sequence executes as documented. + seen_cmds = [] + + def uid_se(cmd, *args): + seen_cmds.append(cmd) + if cmd == "MOVE": + return ("NO", [b"not supported"]) + return ("OK", [b""]) + + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.side_effect = uid_se + mock_imap.expunge.return_value = ("OK", [b""]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_trash(SimpleNamespace(uids=[100])) + assert seen_cmds == ["MOVE", "COPY", "STORE"] + assert mock_imap.expunge.called + + def test_error_copy_also_fails(self, cmail_action): + mock_imap = MagicMock() + mock_imap.select.return_value = ("OK", [b""]) + mock_imap.uid.side_effect = lambda cmd, *a: ("NO", [b"both fail"]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + with pytest.raises(SystemExit): + cmail_action.cmd_trash(SimpleNamespace(uids=[100])) + + +# --------------------------------------------------------------------------- +# cmd_folders +# --------------------------------------------------------------------------- + +class TestCmdFolders: + + def test_normal_lists_folders(self, cmail_action, capsys): + mock_imap = MagicMock() + mock_imap.list.return_value = ("OK", [ + b'(\\HasNoChildren) "/" "INBOX"', + b'(\\HasNoChildren \\Trash) "/" "Trash"', + ]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + cmail_action.cmd_folders(SimpleNamespace()) + out = capsys.readouterr().out + assert "INBOX" in out + assert "Trash" in out + + def test_error_list_returns_no(self, cmail_action): + mock_imap = MagicMock() + mock_imap.list.return_value = ("NO", [b"server error"]) + with patch.object(cmail_action, "connect", return_value=mock_imap): + with pytest.raises(SystemExit): + cmail_action.cmd_folders(SimpleNamespace()) + + +# --------------------------------------------------------------------------- +# build_message — pure +# --------------------------------------------------------------------------- + +class TestBuildMessage: + + def test_normal_no_attachments_is_singlepart(self, cmail_action): + msg = cmail_action.build_message( + from_addr="c@cjennings.net", + to_addr="recipient@example.com", + subject="Hello", + body="hello world", + ) + assert msg["From"] == "c@cjennings.net" + assert msg["To"] == "recipient@example.com" + assert msg["Subject"] == "Hello" + assert not msg.is_multipart() + assert msg.get_content().strip() == "hello world" + assert msg.get_content_type() == "text/plain" + + def test_normal_one_attachment_makes_multipart(self, cmail_action): + attachment = ("report.txt", "text", "plain", b"line1\nline2\n") + msg = cmail_action.build_message( + from_addr="c@cjennings.net", + to_addr="recipient@example.com", + subject="With file", + body="see attached", + attachments=[attachment], + ) + assert msg.is_multipart() + # Find the attachment part by Content-Disposition. + attached_parts = [ + p for p in msg.iter_attachments() + if p.get_filename() == "report.txt" + ] + assert len(attached_parts) == 1 + att = attached_parts[0] + assert att.get_content_type() == "text/plain" + assert att.get_content().rstrip("\n") == "line1\nline2" + + def test_boundary_two_attachments(self, cmail_action): + atts = [ + ("a.txt", "text", "plain", b"alpha"), + ("b.bin", "application", "octet-stream", b"\x00\x01\x02"), + ] + msg = cmail_action.build_message( + from_addr="c@cjennings.net", + to_addr="recipient@example.com", + subject="Two files", + body="see attached", + attachments=atts, + ) + names = sorted(p.get_filename() for p in msg.iter_attachments()) + assert names == ["a.txt", "b.bin"] + + def test_boundary_empty_body(self, cmail_action): + msg = cmail_action.build_message( + from_addr="c@cjennings.net", + to_addr="recipient@example.com", + subject="Empty", + body="", + ) + # Body part exists, content is empty (modulo trailing newline). + assert msg.get_content().strip() == "" + + def test_boundary_unicode_preserved_through_serialization(self, cmail_action): + msg = cmail_action.build_message( + from_addr="c@cjennings.net", + to_addr="recipient@example.com", + subject="日本語 ñ ü", + body="café — naïve résumé", + ) + # Round-trip: serialize, parse, check both Subject and body survived. + raw = msg.as_bytes() + parsed = email.message_from_bytes(raw, policy=default_policy) + assert parsed["Subject"] == "日本語 ñ ü" + assert "café" in parsed.get_content() + + +# --------------------------------------------------------------------------- +# load_attachment — file I/O via tmp_path +# --------------------------------------------------------------------------- + +class TestLoadAttachment: + + def test_normal_text_file(self, cmail_action, tmp_path): + p = tmp_path / "notes.txt" + p.write_text("hello\n") + filename, maintype, subtype, content = cmail_action.load_attachment(p) + assert filename == "notes.txt" + assert maintype == "text" + assert subtype == "plain" + assert content == b"hello\n" + + def test_normal_pdf_mime_detected(self, cmail_action, tmp_path): + p = tmp_path / "doc.pdf" + p.write_bytes(b"%PDF-1.4 fake") + filename, maintype, subtype, _ = cmail_action.load_attachment(p) + assert filename == "doc.pdf" + assert (maintype, subtype) == ("application", "pdf") + + def test_boundary_no_extension_falls_back_to_octet_stream(self, cmail_action, tmp_path): + p = tmp_path / "README" + p.write_text("readme content") + filename, maintype, subtype, _ = cmail_action.load_attachment(p) + assert filename == "README" + assert (maintype, subtype) == ("application", "octet-stream") + + def test_boundary_empty_file(self, cmail_action, tmp_path): + p = tmp_path / "empty.txt" + p.write_text("") + _, _, _, content = cmail_action.load_attachment(p) + assert content == b"" + + def test_error_missing_file_raises(self, cmail_action, tmp_path): + p = tmp_path / "does-not-exist.txt" + with pytest.raises(FileNotFoundError): + cmail_action.load_attachment(p) + + def test_error_directory_raises(self, cmail_action, tmp_path): + with pytest.raises(IsADirectoryError): + cmail_action.load_attachment(tmp_path) + + +# --------------------------------------------------------------------------- +# cmd_send — mocked smtp_connect +# --------------------------------------------------------------------------- + +class TestCmdSend: + + @staticmethod + def _args(to="r@example.com", subject="s", body="b", body_file=None, + attach=None, stdin=False): + return SimpleNamespace( + to=to, subject=subject, + body=None if stdin else body, + body_file=body_file, + attach=attach or [], + ) + + def test_normal_inline_body_calls_send_message(self, cmail_action): + mock_smtp = MagicMock() + with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp): + cmail_action.cmd_send(self._args( + to="recipient@example.com", + subject="testing cmail action script", + body="lorem ipsum dolor sit amet", + )) + mock_smtp.send_message.assert_called_once() + sent = mock_smtp.send_message.call_args[0][0] + assert sent["To"] == "recipient@example.com" + assert sent["Subject"] == "testing cmail action script" + assert sent["From"] == cmail_action.USER + assert "lorem ipsum dolor sit amet" in sent.get_content() + mock_smtp.quit.assert_called_once() + + def test_boundary_body_from_file(self, cmail_action, tmp_path): + body_file = tmp_path / "body.txt" + body_file.write_text("body from file") + mock_smtp = MagicMock() + with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp): + cmail_action.cmd_send(self._args(body=None, body_file=str(body_file))) + sent = mock_smtp.send_message.call_args[0][0] + assert "body from file" in sent.get_content() + + def test_boundary_with_attachment(self, cmail_action, tmp_path): + att = tmp_path / "report.txt" + att.write_text("attachment content") + mock_smtp = MagicMock() + with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp): + cmail_action.cmd_send(self._args(attach=[str(att)])) + sent = mock_smtp.send_message.call_args[0][0] + assert sent.is_multipart() + atts = list(sent.iter_attachments()) + assert len(atts) == 1 + assert atts[0].get_filename() == "report.txt" + assert atts[0].get_content().rstrip("\n") == "attachment content" + + def test_error_missing_attachment_exits_before_smtp(self, cmail_action, tmp_path): + # Attachment files are validated first; SMTP is never opened on failure. + mock_smtp = MagicMock() + with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp): + with pytest.raises((SystemExit, FileNotFoundError)): + cmail_action.cmd_send(self._args( + attach=[str(tmp_path / "does-not-exist.txt")] + )) + mock_smtp.send_message.assert_not_called() + + def test_error_smtp_send_failure_raises(self, cmail_action): + import smtplib + mock_smtp = MagicMock() + mock_smtp.send_message.side_effect = smtplib.SMTPException("boom") + with patch.object(cmail_action, "smtp_connect", return_value=mock_smtp): + with pytest.raises((SystemExit, smtplib.SMTPException)): + cmail_action.cmd_send(self._args()) + + +# --------------------------------------------------------------------------- +# Argparse — black-box subprocess sanity check +# --------------------------------------------------------------------------- + +class TestArgparseShape: + + def test_normal_help_lists_all_subcommands(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH), "--help"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + for sub in ("list-unread", "read", "mark-read", "mark-unread", + "star", "unstar", "trash", "folders", "send"): + assert sub in result.stdout + + def test_error_no_subcommand_exits_nonzero(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH)], + capture_output=True, text=True, + ) + assert result.returncode != 0 diff --git a/.ai/scripts/tests/test_gmail_fetch_attachments.py b/.ai/scripts/tests/test_gmail_fetch_attachments.py new file mode 100644 index 0000000..b4fba41 --- /dev/null +++ b/.ai/scripts/tests/test_gmail_fetch_attachments.py @@ -0,0 +1,420 @@ +"""Tests for gmail-fetch-attachments.py. + +Covers: +- Pure helpers: safe_filename, collect_attachments, load_client_creds +- File I/O: load_mcp_env, load_refresh_token (tmp_path + monkeypatch on + module-level constants CLAUDE_CONFIG and TOKEN_DIR) +- HTTP wrappers: refresh_access_token, gmail_get (monkeypatch on + urllib.request.urlopen) +- Argparse: --help / missing-args via subprocess + +Strategy mirrors test_cmail_action.py: import the script via importlib +(filename has hyphens), mock at external boundaries, no integration +test for main() — the components are tested individually. +""" + +from __future__ import annotations + +import importlib.util +import json +import subprocess +import sys +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +SCRIPT_PATH = Path(__file__).resolve().parent.parent / "gmail-fetch-attachments.py" + + +def _load_module(): + spec = importlib.util.spec_from_file_location( + "gmail_fetch_attachments", str(SCRIPT_PATH) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def gfa(): + return _load_module() + + +def _mock_urlopen_response(payload): + """Build a MagicMock mimicking urllib.request.urlopen()'s context-manager response.""" + mock_resp = MagicMock() + mock_resp.read.return_value = json.dumps(payload).encode() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + return mock_resp + + +# --------------------------------------------------------------------------- +# safe_filename — pure +# --------------------------------------------------------------------------- + +class TestSafeFilename: + + def test_normal_clean_filename(self, gfa): + assert gfa.safe_filename("report.pdf") == "report.pdf" + + def test_boundary_forward_slash_replaced_with_underscore(self, gfa): + assert gfa.safe_filename("foo/bar.txt") == "foo_bar.txt" + + def test_boundary_backslash_replaced_with_underscore(self, gfa): + assert gfa.safe_filename("foo\\bar.txt") == "foo_bar.txt" + + def test_boundary_path_traversal_stripped(self, gfa): + # "../etc/passwd" -> after slash replace: ".._etc_passwd" + # While loop strips leading "..": "_etc_passwd" + assert gfa.safe_filename("../etc/passwd") == "_etc_passwd" + + def test_boundary_dotfile_preserved(self, gfa): + # The fix Craig requested: single-dot prefixes survive so dotfiles + # like .gitignore aren't silently renamed. + assert gfa.safe_filename(".gitignore") == ".gitignore" + assert gfa.safe_filename(".env.local") == ".env.local" + + def test_boundary_empty_string(self, gfa): + assert gfa.safe_filename("") == "" + + @pytest.mark.parametrize("input_name,expected", [ + ("..", ""), # single ".." stripped, leaves empty + ("...", "."), # one strip leaves a single dot + ("....", ""), # two strips leave empty + (".....", "."), # two strips leave one dot + ]) + def test_boundary_only_dots(self, gfa, input_name, expected): + assert gfa.safe_filename(input_name) == expected + + def test_boundary_double_dot_followed_by_name_stripped(self, gfa): + assert gfa.safe_filename("..foo") == "foo" + + def test_boundary_middle_dotdot_preserved(self, gfa): + # Only LEADING ".." gets stripped. Mid-string ".." stays. + # "foo..bar" has no leading dots, so it's preserved as-is. + assert gfa.safe_filename("foo..bar") == "foo..bar" + + +# --------------------------------------------------------------------------- +# collect_attachments — pure +# --------------------------------------------------------------------------- + +class TestCollectAttachments: + + def test_normal_single_attachment(self, gfa): + payload = { + "parts": [ + {"mimeType": "text/plain", "body": {"size": 100}}, + {"filename": "doc.pdf", "mimeType": "application/pdf", + "body": {"attachmentId": "abc123", "size": 5000}}, + ] + } + result = gfa.collect_attachments(payload) + assert result == [{ + "filename": "doc.pdf", + "attachmentId": "abc123", + "size": 5000, + "mimeType": "application/pdf", + }] + + def test_boundary_nested_multipart_recursion(self, gfa): + payload = { + "parts": [ + {"mimeType": "multipart/mixed", "parts": [ + {"mimeType": "multipart/alternative", "parts": [ + {"filename": "deep.pdf", "mimeType": "application/pdf", + "body": {"attachmentId": "deep1", "size": 100}}, + ]}, + ]}, + ] + } + result = gfa.collect_attachments(payload) + assert len(result) == 1 + assert result[0]["filename"] == "deep.pdf" + assert result[0]["attachmentId"] == "deep1" + + def test_boundary_no_attachments_returns_empty(self, gfa): + payload = { + "parts": [ + {"mimeType": "text/plain", "body": {"size": 100}}, + {"mimeType": "text/html", "body": {"size": 200}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_inline_image_no_filename_skipped(self, gfa): + # Inline images embedded via cid: typically have an attachmentId + # but no filename. The "user-visible attachments" heuristic skips + # them so they don't litter the output dir as image001.png. + payload = { + "parts": [ + {"mimeType": "image/png", + "body": {"attachmentId": "inline1", "size": 500}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_empty_filename_skipped(self, gfa): + # Empty-string filename also skipped (truthy check). + payload = { + "parts": [ + {"filename": "", "mimeType": "image/png", + "body": {"attachmentId": "empty1", "size": 500}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_filename_without_attachment_id_skipped(self, gfa): + # A part with a filename but no attachmentId isn't a separately + # downloadable attachment — it's inline content with a name. + payload = { + "parts": [ + {"filename": "fake.txt", "mimeType": "text/plain", + "body": {"size": 100}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_multiple_attachments_at_different_depths(self, gfa): + payload = { + "parts": [ + {"filename": "top.pdf", "mimeType": "application/pdf", + "body": {"attachmentId": "top1", "size": 100}}, + {"mimeType": "multipart/mixed", "parts": [ + {"filename": "nested.txt", "mimeType": "text/plain", + "body": {"attachmentId": "nested1", "size": 50}}, + ]}, + ] + } + result = gfa.collect_attachments(payload) + names = sorted(r["filename"] for r in result) + assert names == ["nested.txt", "top.pdf"] + + def test_boundary_default_mimetype_when_missing(self, gfa): + payload = { + "parts": [ + {"filename": "x.bin", + "body": {"attachmentId": "x1", "size": 10}}, + ] + } + result = gfa.collect_attachments(payload) + assert result[0]["mimeType"] == "application/octet-stream" + + def test_error_empty_payload(self, gfa): + assert gfa.collect_attachments({}) == [] + + def test_error_payload_with_null_parts(self, gfa): + # Defensive: parts = None falls through to empty list via `or []`. + payload = {"parts": None} + assert gfa.collect_attachments(payload) == [] + + +# --------------------------------------------------------------------------- +# load_client_creds — pure +# --------------------------------------------------------------------------- + +class TestLoadClientCreds: + + def test_normal_both_credentials_present(self, gfa): + env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": "secret456"} + assert gfa.load_client_creds(env) == ("cid123", "secret456") + + def test_error_missing_client_id(self, gfa): + env = {"GOOGLE_CLIENT_SECRET": "secret456"} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + def test_error_missing_client_secret(self, gfa): + env = {"GOOGLE_CLIENT_ID": "cid123"} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + def test_error_empty_client_id(self, gfa): + env = {"GOOGLE_CLIENT_ID": "", "GOOGLE_CLIENT_SECRET": "secret456"} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + def test_error_empty_client_secret(self, gfa): + env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": ""} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + +# --------------------------------------------------------------------------- +# load_mcp_env — file I/O via tmp_path + monkeypatch CLAUDE_CONFIG +# --------------------------------------------------------------------------- + +class TestLoadMcpEnv: + + @staticmethod + def _write_config(tmp_path, monkeypatch, gfa, content): + config_path = tmp_path / ".claude.json" + config_path.write_text(json.dumps(content)) + monkeypatch.setattr(gfa, "CLAUDE_CONFIG", config_path) + return config_path + + def test_normal_personal_profile_with_env(self, monkeypatch, gfa, tmp_path): + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": { + "google-docs-personal": { + "env": {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"} + } + } + }) + env = gfa.load_mcp_env("personal") + assert env == {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"} + + def test_boundary_server_present_no_env_key(self, monkeypatch, gfa, tmp_path): + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": {"google-docs-work": {}} + }) + assert gfa.load_mcp_env("work") == {} + + def test_boundary_env_explicitly_null(self, monkeypatch, gfa, tmp_path): + # The `or {}` defends against null env. Returns empty dict, not None. + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": {"google-docs-personal": {"env": None}} + }) + assert gfa.load_mcp_env("personal") == {} + + def test_error_config_file_missing(self, monkeypatch, gfa, tmp_path): + monkeypatch.setattr(gfa, "CLAUDE_CONFIG", tmp_path / "nope.json") + with pytest.raises(SystemExit): + gfa.load_mcp_env("personal") + + def test_error_server_not_in_config(self, monkeypatch, gfa, tmp_path): + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": {"google-docs-personal": {"env": {}}} + }) + with pytest.raises(SystemExit): + gfa.load_mcp_env("work") + + +# --------------------------------------------------------------------------- +# load_refresh_token — file I/O via tmp_path + monkeypatch TOKEN_DIR +# --------------------------------------------------------------------------- + +class TestLoadRefreshToken: + + @staticmethod + def _setup_token(tmp_path, monkeypatch, gfa, profile=None, content=None): + token_dir = tmp_path / "google-docs-mcp" + token_dir.mkdir() + if profile: + (token_dir / profile).mkdir() + token_path = token_dir / profile / "token.json" + else: + token_path = token_dir / "token.json" + if content is not None: + token_path.write_text(json.dumps(content)) + monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir) + return token_path + + def test_normal_no_profile_token_at_root(self, monkeypatch, gfa, tmp_path): + self._setup_token(tmp_path, monkeypatch, gfa, + content={"refresh_token": "rt-root"}) + assert gfa.load_refresh_token({}) == "rt-root" + + def test_boundary_with_profile_subdir(self, monkeypatch, gfa, tmp_path): + self._setup_token(tmp_path, monkeypatch, gfa, profile="personal", + content={"refresh_token": "rt-personal"}) + assert gfa.load_refresh_token( + {"GOOGLE_MCP_PROFILE": "personal"} + ) == "rt-personal" + + def test_boundary_explicit_empty_profile_falls_back_to_root( + self, monkeypatch, gfa, tmp_path): + # GOOGLE_MCP_PROFILE="" is treated the same as the key being missing — + # both fall back to TOKEN_DIR/token.json. Pinning both shapes so a + # future refactor that drops `or ""` doesn't silently break this. + self._setup_token(tmp_path, monkeypatch, gfa, + content={"refresh_token": "rt-root"}) + assert gfa.load_refresh_token({"GOOGLE_MCP_PROFILE": ""}) == "rt-root" + + def test_error_token_file_missing(self, monkeypatch, gfa, tmp_path): + token_dir = tmp_path / "google-docs-mcp" + token_dir.mkdir() + monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir) + with pytest.raises(SystemExit): + gfa.load_refresh_token({}) + + def test_error_no_refresh_token_field_in_file(self, monkeypatch, gfa, tmp_path): + self._setup_token(tmp_path, monkeypatch, gfa, + content={"access_token": "at-only"}) + with pytest.raises(SystemExit): + gfa.load_refresh_token({}) + + +# --------------------------------------------------------------------------- +# refresh_access_token — mocked urllib +# --------------------------------------------------------------------------- + +class TestRefreshAccessToken: + + def test_normal_returns_access_token(self, monkeypatch, gfa): + mock_urlopen = MagicMock( + return_value=_mock_urlopen_response({"access_token": "at-new"}) + ) + monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen) + result = gfa.refresh_access_token("rt-val", "cid-val", "sec-val") + assert result == "at-new" + # Verify the request shape: URL, body grant_type and refresh_token. + req = mock_urlopen.call_args[0][0] + assert req.full_url == gfa.OAUTH_TOKEN_URL + body = req.data.decode() + assert "grant_type=refresh_token" in body + assert "refresh_token=rt-val" in body + assert "client_id=cid-val" in body + + def test_error_response_missing_access_token(self, monkeypatch, gfa): + mock_urlopen = MagicMock( + return_value=_mock_urlopen_response({"error": "invalid_grant"}) + ) + monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen) + with pytest.raises(SystemExit): + gfa.refresh_access_token("rt", "cid", "sec") + + +# --------------------------------------------------------------------------- +# gmail_get — mocked urllib +# --------------------------------------------------------------------------- + +class TestGmailGet: + + def test_normal_returns_parsed_json_with_bearer_header(self, monkeypatch, gfa): + mock_urlopen = MagicMock( + return_value=_mock_urlopen_response({"id": "msg123", "snippet": "hi"}) + ) + monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen) + result = gfa.gmail_get("/messages/msg123", "at-token") + assert result == {"id": "msg123", "snippet": "hi"} + req = mock_urlopen.call_args[0][0] + assert req.full_url == f"{gfa.GMAIL_API}/messages/msg123" + # urllib.request.Request lowercases header names except the first + # char via .capitalize() → "Authorization" stays as "Authorization". + assert req.headers["Authorization"] == "Bearer at-token" + + +# --------------------------------------------------------------------------- +# Argparse — black-box subprocess sanity check +# --------------------------------------------------------------------------- + +class TestArgparseShape: + + def test_normal_help_lists_all_required_args(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH), "--help"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + for flag in ("--profile", "--message-id", "--output-dir"): + assert flag in result.stdout + + def test_error_no_args_exits_nonzero(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH)], + capture_output=True, text=True, + ) + assert result.returncode != 0 diff --git a/.ai/scripts/tests/test_maildir_flag_manager.py b/.ai/scripts/tests/test_maildir_flag_manager.py new file mode 100644 index 0000000..268af5b --- /dev/null +++ b/.ai/scripts/tests/test_maildir_flag_manager.py @@ -0,0 +1,310 @@ +"""Tests for maildir-flag-manager.py. + +Covers: +- Pure parsers: parse_maildir_flags, build_flagged_filename +- File-I/O ops: rename_with_flag, process_maildir, process_specific_files + (tmp_path with real maildir directory structures) +- Subprocess wrapper: reindex_mu (monkeypatch on shutil.which + subprocess.run) +- Argparse: --help / missing-subcommand via subprocess + +The cmd_mark_read / cmd_star orchestrators are intentionally skipped — +they call the helpers and print summaries; the helpers are tested +directly so testing the orchestrators would mostly assert call counts. +""" + +from __future__ import annotations + +import importlib.util +import subprocess +import sys +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +SCRIPT_PATH = Path(__file__).resolve().parent.parent / "maildir-flag-manager.py" + + +def _load_module(): + spec = importlib.util.spec_from_file_location( + "maildir_flag_manager", str(SCRIPT_PATH) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def mfm(): + return _load_module() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_maildir(tmp_path: Path, files=None) -> Path: + """Construct a maildir at tmp_path/inbox with new/ and cur/ subdirs. + + files is a list of (subdir, filename) tuples. Each becomes an empty + file at tmp_path/inbox//. + """ + inbox = tmp_path / "inbox" + (inbox / "new").mkdir(parents=True) + (inbox / "cur").mkdir() + for subdir, fname in (files or []): + (inbox / subdir / fname).write_text("body") + return inbox + + +# --------------------------------------------------------------------------- +# parse_maildir_flags — pure +# --------------------------------------------------------------------------- + +class TestParseMaildirFlags: + + def test_normal_typical_filename(self, mfm): + assert mfm.parse_maildir_flags("12345.host:2,FS") == ("12345.host", "FS") + + def test_boundary_no_flag_suffix(self, mfm): + # No ":2," in filename — return whole name as base, empty flags. + assert mfm.parse_maildir_flags("12345.host") == ("12345.host", "") + + def test_boundary_empty_flags_section(self, mfm): + # ":2," with nothing after — base is parsed, flags are empty. + assert mfm.parse_maildir_flags("12345.host:2,") == ("12345.host", "") + + def test_boundary_multiple_colons_in_base(self, mfm): + # rsplit on the LAST ":2," — base may contain colons or even ":2,"-like + # substrings. Real maildir names sometimes have these from migrations. + assert mfm.parse_maildir_flags("weird:thing:2,FS") == ("weird:thing", "FS") + + def test_boundary_empty_string(self, mfm): + assert mfm.parse_maildir_flags("") == ("", "") + + +# --------------------------------------------------------------------------- +# build_flagged_filename — pure +# --------------------------------------------------------------------------- + +class TestBuildFlaggedFilename: + + def test_normal_base_plus_flags(self, mfm): + assert mfm.build_flagged_filename("12345.host", "FS") == "12345.host:2,FS" + + def test_boundary_replaces_existing_flags(self, mfm): + # Existing flags get parsed away — the new_flags arg is the source of truth. + assert mfm.build_flagged_filename("12345.host:2,F", "FS") == "12345.host:2,FS" + + def test_boundary_flags_sorted_alphabetically(self, mfm): + # Maildir spec requires alphabetical sort. SFR -> FRS. + assert mfm.build_flagged_filename("12345.host", "SFR") == "12345.host:2,FRS" + + def test_boundary_duplicate_flags_dedup(self, mfm): + # set() dedups before sort. FFS -> FS. + assert mfm.build_flagged_filename("12345.host", "FFS") == "12345.host:2,FS" + + def test_boundary_empty_flags(self, mfm): + assert mfm.build_flagged_filename("12345.host", "") == "12345.host:2," + + +# --------------------------------------------------------------------------- +# rename_with_flag — file I/O via tmp_path +# --------------------------------------------------------------------------- + +class TestRenameWithFlag: + + def test_normal_add_F_to_cur_file_renamed_in_place(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")]) + original = inbox / "cur" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "F") is True + assert not original.exists() + assert (inbox / "cur" / "12345.host:2,F").exists() + + def test_boundary_add_S_to_new_file_moves_to_cur(self, mfm, tmp_path): + # Maildir spec: messages with Seen flag belong in cur/, not new/. + inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")]) + original = inbox / "new" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "S") is True + assert not original.exists() + # Should land in cur/, not new/. + assert (inbox / "cur" / "12345.host:2,S").exists() + assert not (inbox / "new" / "12345.host:2,S").exists() + + def test_boundary_add_F_to_new_file_stays_in_new(self, mfm, tmp_path): + # F (Flagged) doesn't trigger the new/ -> cur/ migration; only S does. + inbox = _make_maildir(tmp_path, [("new", "12345.host:2,")]) + original = inbox / "new" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "F") is True + assert (inbox / "new" / "12345.host:2,F").exists() + assert not (inbox / "cur" / "12345.host:2,F").exists() + + def test_boundary_flag_already_present_returns_false(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,FS")]) + original = inbox / "cur" / "12345.host:2,FS" + assert mfm.rename_with_flag(str(original), "F") is False + # Original file unchanged. + assert original.exists() + + def test_boundary_dry_run_does_not_modify_filesystem(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "12345.host:2,")]) + original = inbox / "cur" / "12345.host:2," + assert mfm.rename_with_flag(str(original), "F", dry_run=True) is True + # Original still exists, no new file. + assert original.exists() + assert not (inbox / "cur" / "12345.host:2,F").exists() + + def test_error_file_path_does_not_exist(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path) + with pytest.raises(FileNotFoundError): + mfm.rename_with_flag(str(inbox / "cur" / "ghost:2,"), "F") + + +# --------------------------------------------------------------------------- +# process_maildir — tmp_path +# --------------------------------------------------------------------------- + +class TestProcessMaildir: + + def test_normal_mixed_flagged_and_unflagged(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [ + ("new", "msg1:2,"), + ("new", "msg2:2,"), + ("cur", "msg3:2,S"), # already has S, will skip + ("cur", "msg4:2,"), + ]) + changed, skipped, errors = mfm.process_maildir(str(inbox), "S") + # 3 didn't have S yet, 1 already did. + assert (changed, skipped, errors) == (3, 1, 0) + # The two from new/ have moved to cur/ (S triggers the migration). + assert (inbox / "cur" / "msg1:2,S").exists() + assert (inbox / "cur" / "msg2:2,S").exists() + assert (inbox / "cur" / "msg4:2,S").exists() + + def test_boundary_empty_maildir(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path) + assert mfm.process_maildir(str(inbox), "S") == (0, 0, 0) + + def test_boundary_maildir_does_not_exist(self, mfm, tmp_path, capsys): + # Returns (0, 0, 0) and logs a friendly message to stderr. + result = mfm.process_maildir(str(tmp_path / "nope"), "S") + assert result == (0, 0, 0) + err = capsys.readouterr().err + assert "Skipping" in err + + def test_boundary_non_file_entries_skipped(self, mfm, tmp_path): + # A stray subdirectory in cur/ shouldn't crash the scan. + inbox = _make_maildir(tmp_path, [("cur", "msg1:2,")]) + (inbox / "cur" / "stray-dir").mkdir() + changed, skipped, errors = mfm.process_maildir(str(inbox), "S") + assert (changed, skipped, errors) == (1, 0, 0) + + def test_boundary_only_new_subdir_present(self, mfm, tmp_path): + # If cur/ doesn't exist, the loop just skips it instead of erroring. + inbox = tmp_path / "inbox" + (inbox / "new").mkdir(parents=True) + (inbox / "new" / "msg1:2,").write_text("body") + changed, skipped, errors = mfm.process_maildir(str(inbox), "F") + assert (changed, skipped, errors) == (1, 0, 0) + + +# --------------------------------------------------------------------------- +# process_specific_files — tmp_path +# --------------------------------------------------------------------------- + +class TestProcessSpecificFiles: + + def test_normal_paths_in_cur_and_new(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [ + ("cur", "msg1:2,"), + ("new", "msg2:2,"), + ]) + paths = [ + str(inbox / "cur" / "msg1:2,"), + str(inbox / "new" / "msg2:2,"), + ] + changed, skipped, errors = mfm.process_specific_files(paths, "F") + assert (changed, skipped, errors) == (2, 0, 0) + + def test_error_file_not_found(self, mfm, tmp_path, capsys): + inbox = _make_maildir(tmp_path) + ghost = str(inbox / "cur" / "ghost:2,") + changed, skipped, errors = mfm.process_specific_files([ghost], "F") + assert errors == 1 + assert "File not found" in capsys.readouterr().err + + def test_error_file_outside_cur_or_new(self, mfm, tmp_path, capsys): + # Path validation: only files whose parent dir is named "cur" or "new" + # are accepted. Defends against pointing at the wrong file. + bogus = tmp_path / "elsewhere" / "msg1:2," + bogus.parent.mkdir() + bogus.write_text("body") + changed, skipped, errors = mfm.process_specific_files([str(bogus)], "F") + assert errors == 1 + assert "Not in a maildir" in capsys.readouterr().err + # File untouched. + assert bogus.exists() + + def test_error_already_set_counted_as_skipped(self, mfm, tmp_path): + inbox = _make_maildir(tmp_path, [("cur", "msg1:2,F")]) + path = str(inbox / "cur" / "msg1:2,F") + changed, skipped, errors = mfm.process_specific_files([path], "F") + assert (changed, skipped, errors) == (0, 1, 0) + + +# --------------------------------------------------------------------------- +# reindex_mu — mocked subprocess +# --------------------------------------------------------------------------- + +class TestReindexMu: + + def test_normal_mu_present_returns_true(self, mfm, monkeypatch): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") + result_obj = MagicMock(returncode=0, stderr="") + monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj) + assert mfm.reindex_mu() is True + + def test_error_mu_not_in_path_returns_false(self, mfm, monkeypatch, capsys): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: None) + assert mfm.reindex_mu() is False + assert "mu not found" in capsys.readouterr().err + + def test_error_mu_index_returns_nonzero(self, mfm, monkeypatch, capsys): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") + result_obj = MagicMock(returncode=1, stderr="db locked") + monkeypatch.setattr(mfm.subprocess, "run", lambda *a, **kw: result_obj) + assert mfm.reindex_mu() is False + assert "mu index failed" in capsys.readouterr().err + + def test_error_mu_index_times_out(self, mfm, monkeypatch, capsys): + monkeypatch.setattr(mfm.shutil, "which", lambda _name: "/usr/bin/mu") + + def raise_timeout(*_a, **_kw): + raise subprocess.TimeoutExpired(cmd="mu index", timeout=120) + + monkeypatch.setattr(mfm.subprocess, "run", raise_timeout) + assert mfm.reindex_mu() is False + assert "timed out" in capsys.readouterr().err + + +# --------------------------------------------------------------------------- +# Argparse — black-box subprocess sanity check +# --------------------------------------------------------------------------- + +class TestArgparseShape: + + def test_normal_help_lists_subcommands(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH), "--help"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "mark-read" in result.stdout + assert "star" in result.stdout + + def test_error_no_subcommand_exits_nonzero(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH)], + capture_output=True, text=True, + ) + assert result.returncode != 0 -- cgit v1.2.3