diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-08 23:23:31 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-08 23:23:31 -0500 |
| commit | 65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch) | |
| tree | a64d7a8bd466cb08e77829ca86e25a221aa64710 /.ai/scripts/cmail-action.py | |
| parent | 1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff) | |
| download | rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip | |
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail
Bridge (list-unread, read, mark-read, star, unstar, trash, send,
folders) mirroring the Gmail MCP workflow. Also add comprehensive
tests for cmail-action, gmail-fetch-attachments, and
maildir-flag-manager scripts.
Diffstat (limited to '.ai/scripts/cmail-action.py')
| -rwxr-xr-x | .ai/scripts/cmail-action.py | 387 |
1 files changed, 387 insertions, 0 deletions
diff --git a/.ai/scripts/cmail-action.py b/.ai/scripts/cmail-action.py new file mode 100755 index 0000000..10eb215 --- /dev/null +++ b/.ai/scripts/cmail-action.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +cmail-action — IMAP triage operations against Proton Mail Bridge. + +Mirrors the operations the Gmail MCP server provides for gmail/dmail +(list-unread, read, mark-read, star, unstar, trash) so the +process-unread-emails workflow can drive cmail end-to-end the same way. + +Connects to local Proton Bridge IMAP at 127.0.0.1:1143 with STARTTLS, +using the Bridge-generated app password at ~/.config/.cmailpass and the +Bridge self-signed certificate at ~/.config/protonbridge.pem. Cert CN +is 127.0.0.1 but lacks a SubjectAltName, so hostname verification is +disabled (connection is to localhost — verifying via the pinned cert +is sufficient). + +IMAP -> Proton mapping: +- \\Seen flag -> Read state +- \\Flagged flag -> Starred label +- MOVE to Trash -> Trash folder +- COPY to label -> applies the label (Starred etc.) +""" + +import argparse +import email +import imaplib +import json +import mimetypes +import smtplib +import ssl +import sys +from email.message import EmailMessage +from email.policy import default as default_policy +from pathlib import Path + +HOST = "127.0.0.1" +PORT = 1143 +SMTP_PORT = 1025 +USER = "c@cjennings.net" +PASS_FILE = Path.home() / ".config" / ".cmailpass" +CERT_FILE = Path.home() / ".config" / "protonbridge.pem" + +INBOX = "INBOX" +TRASH = "Trash" + + +def connect(): + if not PASS_FILE.is_file(): + sys.exit(f"error: missing password file {PASS_FILE}") + if not CERT_FILE.is_file(): + sys.exit(f"error: missing bridge cert {CERT_FILE}") + ctx = ssl.create_default_context(cafile=str(CERT_FILE)) + ctx.check_hostname = False + try: + M = imaplib.IMAP4(HOST, PORT) + except OSError as e: + sys.exit(f"error: cannot reach Bridge at {HOST}:{PORT} ({e}). " + f"Is protonmail-bridge running? " + f"(systemctl --user status protonmail-bridge)") + M.starttls(ssl_context=ctx) + password = PASS_FILE.read_text().strip() + try: + M.login(USER, password) + except imaplib.IMAP4.error as e: + sys.exit(f"error: IMAP login failed for {USER}: {e}") + return M + + +def _select(M, mailbox=INBOX, readonly=False): + typ, data = M.select(mailbox, readonly=readonly) + if typ != "OK": + sys.exit(f"error: cannot select {mailbox}: {data}") + + +def _decode_header(value): + if value is None: + return "" + return str(value) + + +def parse_fetch_metadata(meta): + """Parse FLAGS and RFC822.SIZE out of an IMAP FETCH metadata string. + + Returns {"flags": str, "size": int | None}. Tolerates malformed input + (returns the defaults rather than raising). + """ + result = {"flags": "", "size": None} + flags_idx = meta.find("FLAGS (") + if flags_idx != -1: + end = meta.find(")", flags_idx) + if end != -1: + result["flags"] = meta[flags_idx + 7:end] + # Tokenize with parens stripped so RFC822.SIZE matches whether or not + # it abuts an opening paren in the raw response (e.g. "(RFC822.SIZE 500)" + # would otherwise tokenize as "(RFC822.SIZE" and miss the equality check). + tokens = meta.replace("(", " ").replace(")", " ").split() + for i, p in enumerate(tokens): + if p == "RFC822.SIZE" and i + 1 < len(tokens): + try: + result["size"] = int(tokens[i + 1]) + except ValueError: + pass + break + return result + + +def extract_body(msg): + """Pick a printable body out of an email.message.EmailMessage. + + Multipart: text/plain preferred, text/html fallback. Single-part: + returns content directly. Returns None if no body found. + """ + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + return part.get_content() + for part in msg.walk(): + if part.get_content_type() == "text/html": + return part.get_content() + return None + return msg.get_content() + + +def build_message(from_addr, to_addr, subject, body, attachments=None): + """Construct an EmailMessage from the given fields and attachments. + + attachments is a list of (filename, maintype, subtype, content_bytes) + tuples — typically the return value of load_attachment per file. Pure + function: no I/O, no SMTP. + """ + msg = EmailMessage() + msg["From"] = from_addr + msg["To"] = to_addr + msg["Subject"] = subject + msg.set_content(body) + for filename, maintype, subtype, content in (attachments or []): + msg.add_attachment(content, maintype=maintype, subtype=subtype, + filename=filename) + return msg + + +def load_attachment(path): + """Read a file and return (filename, maintype, subtype, content_bytes). + + MIME type comes from mimetypes.guess_type; falls back to + application/octet-stream when guess returns None. Raises FileNotFoundError + for missing paths and IsADirectoryError for directories. + """ + if not path.exists(): + raise FileNotFoundError(f"attachment not found: {path}") + if path.is_dir(): + raise IsADirectoryError(f"attachment path is a directory: {path}") + mime, _ = mimetypes.guess_type(path.name) + if mime is None: + maintype, subtype = "application", "octet-stream" + else: + maintype, subtype = mime.split("/", 1) + return (path.name, maintype, subtype, path.read_bytes()) + + +def smtp_connect(): + """Connect to Proton Bridge's local SMTP submission endpoint. + + Mirrors connect()'s pattern: STARTTLS against the pinned cert, + plaintext password from PASS_FILE. Skipped from unit tests for + the same reason connect() is — network + SSL + file I/O. + """ + if not PASS_FILE.is_file(): + sys.exit(f"error: missing password file {PASS_FILE}") + if not CERT_FILE.is_file(): + sys.exit(f"error: missing bridge cert {CERT_FILE}") + ctx = ssl.create_default_context(cafile=str(CERT_FILE)) + ctx.check_hostname = False + try: + smtp = smtplib.SMTP(HOST, SMTP_PORT) + except OSError as e: + sys.exit(f"error: cannot reach Bridge SMTP at {HOST}:{SMTP_PORT} ({e}). " + f"Is protonmail-bridge running?") + smtp.starttls(context=ctx) + password = PASS_FILE.read_text().strip() + try: + smtp.login(USER, password) + except smtplib.SMTPException as e: + sys.exit(f"error: SMTP login failed for {USER}: {e}") + return smtp + + +def cmd_list_unread(args): + M = connect() + try: + _select(M, INBOX, readonly=True) + typ, data = M.uid("SEARCH", None, "UNSEEN") + if typ != "OK": + sys.exit(f"error: search failed: {data}") + uids = data[0].split() if data and data[0] else [] + if args.limit and len(uids) > args.limit: + uids = uids[-args.limit:] + out = [] + for uid in uids: + uid_s = uid.decode() + typ, data = M.uid( + "FETCH", uid, + "(BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)] " + "FLAGS RFC822.SIZE)" + ) + if typ != "OK" or not data or not data[0]: + continue + # FLAGS / RFC822.SIZE may arrive in a non-tuple chunk after + # the BODY literal closes. Concatenate all chunks before + # parsing so the parser sees the full metadata. + hdr_raw = b"" + meta_str = "" + for chunk in data: + if isinstance(chunk, tuple): + hdr_raw = chunk[1] + meta_str += chunk[0].decode("utf-8", errors="replace") + " " + elif isinstance(chunk, (bytes, bytearray)): + meta_str += chunk.decode("utf-8", errors="replace") + " " + parsed = parse_fetch_metadata(meta_str) + msg = email.message_from_bytes(hdr_raw, policy=default_policy) + out.append({ + "uid": uid_s, + "from": _decode_header(msg.get("From")), + "to": _decode_header(msg.get("To")), + "subject": _decode_header(msg.get("Subject")), + "date": _decode_header(msg.get("Date")), + "flags": parsed["flags"], + "size": parsed["size"], + }) + print(json.dumps(out, indent=2, ensure_ascii=False)) + finally: + M.logout() + + +def cmd_read(args): + M = connect() + try: + _select(M, INBOX, readonly=True) + typ, data = M.uid("FETCH", str(args.uid).encode(), "(RFC822)") + if typ != "OK" or not data or not data[0]: + sys.exit(f"error: uid {args.uid} not found in {INBOX}") + raw = data[0][1] + msg = email.message_from_bytes(raw, policy=default_policy) + for h in ("From", "To", "Cc", "Date", "Subject"): + v = msg.get(h) + if v: + print(f"{h}: {v}") + print() + body = extract_body(msg) + print(body if body is not None else "<no body>") + finally: + M.logout() + + +def _store(uids, op, flags): + M = connect() + try: + _select(M, INBOX, readonly=False) + for uid in uids: + typ, data = M.uid("STORE", str(uid).encode(), op, flags) + if typ != "OK": + sys.exit(f"error: STORE {op} {flags} on uid {uid} failed: {data}") + print(f"ok: STORE {op} {flags} on {len(uids)} uid(s)") + finally: + M.logout() + + +def cmd_mark_read(args): + _store(args.uids, "+FLAGS", r"(\Seen)") + + +def cmd_mark_unread(args): + _store(args.uids, "-FLAGS", r"(\Seen)") + + +def cmd_star(args): + # Workflow convention: starring also marks read (matches the Gmail flow). + _store(args.uids, "+FLAGS", r"(\Flagged \Seen)") + + +def cmd_unstar(args): + _store(args.uids, "-FLAGS", r"(\Flagged)") + + +def cmd_trash(args): + M = connect() + try: + _select(M, INBOX, readonly=False) + moved = 0 + for uid in args.uids: + typ, data = M.uid("MOVE", str(uid).encode(), TRASH) + if typ != "OK": + # Fallback for servers without RFC 6851 MOVE. + typ2, data2 = M.uid("COPY", str(uid).encode(), TRASH) + if typ2 != "OK": + sys.exit(f"error: COPY uid {uid} -> {TRASH} failed: {data2}") + M.uid("STORE", str(uid).encode(), "+FLAGS", r"(\Deleted)") + moved += 1 + M.expunge() + print(f"ok: moved {moved} uid(s) to {TRASH}") + finally: + M.logout() + + +def cmd_send(args): + # Resolve attachments first so a missing file fails before SMTP opens. + attachments = [load_attachment(Path(p)) for p in (args.attach or [])] + if args.body is not None: + body = args.body + elif args.body_file is not None: + body = Path(args.body_file).read_text() + else: + body = sys.stdin.read() + msg = build_message(USER, args.to, args.subject, body, attachments) + smtp = smtp_connect() + try: + smtp.send_message(msg) + print(f"ok: sent to {args.to}") + finally: + smtp.quit() + + +def cmd_folders(_args): + M = connect() + try: + typ, data = M.list() + if typ != "OK": + sys.exit(f"error: LIST failed: {data}") + for line in data: + print(line.decode("utf-8", errors="replace")) + finally: + M.logout() + + +def main(): + p = argparse.ArgumentParser(prog="cmail-action", + description="IMAP triage against Proton Bridge") + sp = p.add_subparsers(dest="cmd", required=True) + + p_list = sp.add_parser("list-unread", help="list unread INBOX messages as JSON") + p_list.add_argument("--limit", type=int, default=50, + help="cap to N most recent (default 50)") + p_list.set_defaults(func=cmd_list_unread) + + p_read = sp.add_parser("read", help="print headers + body of a UID") + p_read.add_argument("uid", type=int) + p_read.set_defaults(func=cmd_read) + + p_mr = sp.add_parser("mark-read") + p_mr.add_argument("uids", nargs="+", type=int) + p_mr.set_defaults(func=cmd_mark_read) + + p_mu = sp.add_parser("mark-unread") + p_mu.add_argument("uids", nargs="+", type=int) + p_mu.set_defaults(func=cmd_mark_unread) + + p_s = sp.add_parser("star", help="star (sets \\Flagged + \\Seen)") + p_s.add_argument("uids", nargs="+", type=int) + p_s.set_defaults(func=cmd_star) + + p_us = sp.add_parser("unstar") + p_us.add_argument("uids", nargs="+", type=int) + p_us.set_defaults(func=cmd_unstar) + + p_t = sp.add_parser("trash", help="MOVE uid(s) to Trash") + p_t.add_argument("uids", nargs="+", type=int) + p_t.set_defaults(func=cmd_trash) + + p_f = sp.add_parser("folders", help="list IMAP folders (debug)") + p_f.set_defaults(func=cmd_folders) + + p_send = sp.add_parser("send", help="send an email via Bridge SMTP") + p_send.add_argument("--to", required=True, help="recipient address") + p_send.add_argument("--subject", required=True) + body_group = p_send.add_mutually_exclusive_group() + body_group.add_argument("--body", help="body text inline") + body_group.add_argument("--body-file", help="path to a file whose " + "contents become the body") + p_send.add_argument("--attach", action="append", default=[], + help="path to attach (repeatable)") + p_send.set_defaults(func=cmd_send) + + args = p.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() |
