#!/usr/bin/env python3 """ cmail-action — IMAP triage operations against Proton Mail Bridge. Mirrors the operations the Gmail MCP server provides for gmail/dmail (list-unread, read, mark-read, star, unstar, trash) so the process-unread-emails workflow can drive cmail end-to-end the same way. Connects to local Proton Bridge IMAP at 127.0.0.1:1143 with STARTTLS, using the Bridge-generated app password at ~/.config/.cmailpass and the Bridge self-signed certificate at ~/.config/protonbridge.pem. Cert CN is 127.0.0.1 but lacks a SubjectAltName, so hostname verification is disabled (connection is to localhost — verifying via the pinned cert is sufficient). IMAP -> Proton mapping: - \\Seen flag -> Read state - \\Flagged flag -> Starred label - MOVE to Trash -> Trash folder - COPY to label -> applies the label (Starred etc.) """ import argparse import email import imaplib import json import mimetypes import smtplib import ssl import sys from email.message import EmailMessage from email.policy import default as default_policy from pathlib import Path HOST = "127.0.0.1" PORT = 1143 SMTP_PORT = 1025 USER = "c@cjennings.net" PASS_FILE = Path.home() / ".config" / ".cmailpass" CERT_FILE = Path.home() / ".config" / "protonbridge.pem" INBOX = "INBOX" TRASH = "Trash" def connect(): if not PASS_FILE.is_file(): sys.exit(f"error: missing password file {PASS_FILE}") if not CERT_FILE.is_file(): sys.exit(f"error: missing bridge cert {CERT_FILE}") ctx = ssl.create_default_context(cafile=str(CERT_FILE)) ctx.check_hostname = False try: M = imaplib.IMAP4(HOST, PORT) except OSError as e: sys.exit(f"error: cannot reach Bridge at {HOST}:{PORT} ({e}). " f"Is protonmail-bridge running? " f"(systemctl --user status protonmail-bridge)") M.starttls(ssl_context=ctx) password = PASS_FILE.read_text().strip() try: M.login(USER, password) except imaplib.IMAP4.error as e: sys.exit(f"error: IMAP login failed for {USER}: {e}") return M def _select(M, mailbox=INBOX, readonly=False): typ, data = M.select(mailbox, readonly=readonly) if typ != "OK": sys.exit(f"error: cannot select {mailbox}: {data}") def _decode_header(value): if value is None: return "" return str(value) def parse_fetch_metadata(meta): """Parse FLAGS and RFC822.SIZE out of an IMAP FETCH metadata string. Returns {"flags": str, "size": int | None}. Tolerates malformed input (returns the defaults rather than raising). """ result = {"flags": "", "size": None} flags_idx = meta.find("FLAGS (") if flags_idx != -1: end = meta.find(")", flags_idx) if end != -1: result["flags"] = meta[flags_idx + 7:end] # Tokenize with parens stripped so RFC822.SIZE matches whether or not # it abuts an opening paren in the raw response (e.g. "(RFC822.SIZE 500)" # would otherwise tokenize as "(RFC822.SIZE" and miss the equality check). tokens = meta.replace("(", " ").replace(")", " ").split() for i, p in enumerate(tokens): if p == "RFC822.SIZE" and i + 1 < len(tokens): try: result["size"] = int(tokens[i + 1]) except ValueError: pass break return result def extract_body(msg): """Pick a printable body out of an email.message.EmailMessage. Multipart: text/plain preferred, text/html fallback. Single-part: returns content directly. Returns None if no body found. """ if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": return part.get_content() for part in msg.walk(): if part.get_content_type() == "text/html": return part.get_content() return None return msg.get_content() def build_message(from_addr, to_addr, subject, body, attachments=None, cc=None, bcc=None, in_reply_to=None, references=None): """Construct an EmailMessage from the given fields and attachments. attachments is a list of (filename, maintype, subtype, content_bytes) tuples — typically the return value of load_attachment per file. cc and bcc accept either a list of addresses or a single string; the Cc/Bcc headers are set when present (smtplib.send_message reads them for delivery and strips Bcc before sending). in_reply_to and references set the In-Reply-To and References headers so a reply threads on the recipient's end. Pure function: no I/O, no SMTP. """ msg = EmailMessage() msg["From"] = from_addr msg["To"] = to_addr if cc: msg["Cc"] = ", ".join(cc) if isinstance(cc, (list, tuple)) else cc if bcc: msg["Bcc"] = ", ".join(bcc) if isinstance(bcc, (list, tuple)) else bcc msg["Subject"] = subject if in_reply_to: msg["In-Reply-To"] = in_reply_to if references: msg["References"] = references msg.set_content(body) for filename, maintype, subtype, content in (attachments or []): msg.add_attachment(content, maintype=maintype, subtype=subtype, filename=filename) return msg def load_attachment(path): """Read a file and return (filename, maintype, subtype, content_bytes). MIME type comes from mimetypes.guess_type; falls back to application/octet-stream when guess returns None. Raises FileNotFoundError for missing paths and IsADirectoryError for directories. """ if not path.exists(): raise FileNotFoundError(f"attachment not found: {path}") if path.is_dir(): raise IsADirectoryError(f"attachment path is a directory: {path}") mime, _ = mimetypes.guess_type(path.name) if mime is None: maintype, subtype = "application", "octet-stream" else: maintype, subtype = mime.split("/", 1) return (path.name, maintype, subtype, path.read_bytes()) def smtp_connect(): """Connect to Proton Bridge's local SMTP submission endpoint. Mirrors connect()'s pattern: STARTTLS against the pinned cert, plaintext password from PASS_FILE. Skipped from unit tests for the same reason connect() is — network + SSL + file I/O. """ if not PASS_FILE.is_file(): sys.exit(f"error: missing password file {PASS_FILE}") if not CERT_FILE.is_file(): sys.exit(f"error: missing bridge cert {CERT_FILE}") ctx = ssl.create_default_context(cafile=str(CERT_FILE)) ctx.check_hostname = False try: smtp = smtplib.SMTP(HOST, SMTP_PORT) except OSError as e: sys.exit(f"error: cannot reach Bridge SMTP at {HOST}:{SMTP_PORT} ({e}). " f"Is protonmail-bridge running?") smtp.starttls(context=ctx) password = PASS_FILE.read_text().strip() try: smtp.login(USER, password) except smtplib.SMTPException as e: sys.exit(f"error: SMTP login failed for {USER}: {e}") return smtp def cmd_list_unread(args): M = connect() try: _select(M, INBOX, readonly=True) typ, data = M.uid("SEARCH", None, "UNSEEN") if typ != "OK": sys.exit(f"error: search failed: {data}") uids = data[0].split() if data and data[0] else [] if args.limit and len(uids) > args.limit: uids = uids[-args.limit:] out = [] for uid in uids: uid_s = uid.decode() typ, data = M.uid( "FETCH", uid, "(BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)] " "FLAGS RFC822.SIZE)" ) if typ != "OK" or not data or not data[0]: continue # FLAGS / RFC822.SIZE may arrive in a non-tuple chunk after # the BODY literal closes. Concatenate all chunks before # parsing so the parser sees the full metadata. hdr_raw = b"" meta_str = "" for chunk in data: if isinstance(chunk, tuple): hdr_raw = chunk[1] meta_str += chunk[0].decode("utf-8", errors="replace") + " " elif isinstance(chunk, (bytes, bytearray)): meta_str += chunk.decode("utf-8", errors="replace") + " " parsed = parse_fetch_metadata(meta_str) msg = email.message_from_bytes(hdr_raw, policy=default_policy) out.append({ "uid": uid_s, "from": _decode_header(msg.get("From")), "to": _decode_header(msg.get("To")), "subject": _decode_header(msg.get("Subject")), "date": _decode_header(msg.get("Date")), "flags": parsed["flags"], "size": parsed["size"], }) print(json.dumps(out, indent=2, ensure_ascii=False)) finally: M.logout() def cmd_read(args): M = connect() try: _select(M, INBOX, readonly=True) typ, data = M.uid("FETCH", str(args.uid).encode(), "(RFC822)") if typ != "OK" or not data or not data[0]: sys.exit(f"error: uid {args.uid} not found in {INBOX}") raw = data[0][1] msg = email.message_from_bytes(raw, policy=default_policy) for h in ("From", "To", "Cc", "Date", "Subject"): v = msg.get(h) if v: print(f"{h}: {v}") print() body = extract_body(msg) print(body if body is not None else "") finally: M.logout() def _store(uids, op, flags): M = connect() try: _select(M, INBOX, readonly=False) for uid in uids: typ, data = M.uid("STORE", str(uid).encode(), op, flags) if typ != "OK": sys.exit(f"error: STORE {op} {flags} on uid {uid} failed: {data}") print(f"ok: STORE {op} {flags} on {len(uids)} uid(s)") finally: M.logout() def cmd_mark_read(args): _store(args.uids, "+FLAGS", r"(\Seen)") def cmd_mark_unread(args): _store(args.uids, "-FLAGS", r"(\Seen)") def cmd_star(args): # Workflow convention: starring also marks read (matches the Gmail flow). _store(args.uids, "+FLAGS", r"(\Flagged \Seen)") def cmd_unstar(args): _store(args.uids, "-FLAGS", r"(\Flagged)") def cmd_trash(args): M = connect() try: _select(M, INBOX, readonly=False) moved = 0 for uid in args.uids: typ, data = M.uid("MOVE", str(uid).encode(), TRASH) if typ != "OK": # Fallback for servers without RFC 6851 MOVE. typ2, data2 = M.uid("COPY", str(uid).encode(), TRASH) if typ2 != "OK": sys.exit(f"error: COPY uid {uid} -> {TRASH} failed: {data2}") M.uid("STORE", str(uid).encode(), "+FLAGS", r"(\Deleted)") moved += 1 M.expunge() print(f"ok: moved {moved} uid(s) to {TRASH}") finally: M.logout() def cmd_send(args): # Resolve attachments first so a missing file fails before SMTP opens. attachments = [load_attachment(Path(p)) for p in (args.attach or [])] if args.body is not None: body = args.body elif args.body_file is not None: body = Path(args.body_file).read_text() else: body = sys.stdin.read() msg = build_message(USER, args.to, args.subject, body, attachments, cc=args.cc, bcc=args.bcc, in_reply_to=args.in_reply_to, references=args.references) smtp = smtp_connect() try: smtp.send_message(msg) print(f"ok: sent to {args.to}") finally: smtp.quit() def cmd_folders(_args): M = connect() try: typ, data = M.list() if typ != "OK": sys.exit(f"error: LIST failed: {data}") for line in data: print(line.decode("utf-8", errors="replace")) finally: M.logout() def main(): p = argparse.ArgumentParser(prog="cmail-action", description="IMAP triage against Proton Bridge") sp = p.add_subparsers(dest="cmd", required=True) p_list = sp.add_parser("list-unread", help="list unread INBOX messages as JSON") p_list.add_argument("--limit", type=int, default=50, help="cap to N most recent (default 50)") p_list.set_defaults(func=cmd_list_unread) p_read = sp.add_parser("read", help="print headers + body of a UID") p_read.add_argument("uid", type=int) p_read.set_defaults(func=cmd_read) p_mr = sp.add_parser("mark-read") p_mr.add_argument("uids", nargs="+", type=int) p_mr.set_defaults(func=cmd_mark_read) p_mu = sp.add_parser("mark-unread") p_mu.add_argument("uids", nargs="+", type=int) p_mu.set_defaults(func=cmd_mark_unread) p_s = sp.add_parser("star", help="star (sets \\Flagged + \\Seen)") p_s.add_argument("uids", nargs="+", type=int) p_s.set_defaults(func=cmd_star) p_us = sp.add_parser("unstar") p_us.add_argument("uids", nargs="+", type=int) p_us.set_defaults(func=cmd_unstar) p_t = sp.add_parser("trash", help="MOVE uid(s) to Trash") p_t.add_argument("uids", nargs="+", type=int) p_t.set_defaults(func=cmd_trash) p_f = sp.add_parser("folders", help="list IMAP folders (debug)") p_f.set_defaults(func=cmd_folders) p_send = sp.add_parser("send", help="send an email via Bridge SMTP") p_send.add_argument("--to", required=True, help="recipient address") p_send.add_argument("--subject", required=True) body_group = p_send.add_mutually_exclusive_group() body_group.add_argument("--body", help="body text inline") body_group.add_argument("--body-file", help="path to a file whose " "contents become the body") p_send.add_argument("--attach", action="append", default=[], help="path to attach (repeatable)") p_send.add_argument("--cc", action="append", default=[], help="Cc address (repeatable)") p_send.add_argument("--bcc", action="append", default=[], help="Bcc address (repeatable)") p_send.add_argument("--in-reply-to", help="Message-ID this replies to (threads on the recipient's end)") p_send.add_argument("--references", help="References header (space-separated Message-IDs)") p_send.set_defaults(func=cmd_send) args = p.parse_args() args.func(args) if __name__ == "__main__": main()