aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cmail-action.py
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
committerCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
commit65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch)
treea64d7a8bd466cb08e77829ca86e25a221aa64710 /.ai/scripts/cmail-action.py
parent1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff)
downloadrulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz
rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail Bridge (list-unread, read, mark-read, star, unstar, trash, send, folders) mirroring the Gmail MCP workflow. Also add comprehensive tests for cmail-action, gmail-fetch-attachments, and maildir-flag-manager scripts.
Diffstat (limited to '.ai/scripts/cmail-action.py')
-rwxr-xr-x.ai/scripts/cmail-action.py387
1 files changed, 387 insertions, 0 deletions
diff --git a/.ai/scripts/cmail-action.py b/.ai/scripts/cmail-action.py
new file mode 100755
index 0000000..10eb215
--- /dev/null
+++ b/.ai/scripts/cmail-action.py
@@ -0,0 +1,387 @@
+#!/usr/bin/env python3
+"""
+cmail-action — IMAP triage operations against Proton Mail Bridge.
+
+Mirrors the operations the Gmail MCP server provides for gmail/dmail
+(list-unread, read, mark-read, star, unstar, trash) so the
+process-unread-emails workflow can drive cmail end-to-end the same way.
+
+Connects to local Proton Bridge IMAP at 127.0.0.1:1143 with STARTTLS,
+using the Bridge-generated app password at ~/.config/.cmailpass and the
+Bridge self-signed certificate at ~/.config/protonbridge.pem. Cert CN
+is 127.0.0.1 but lacks a SubjectAltName, so hostname verification is
+disabled (connection is to localhost — verifying via the pinned cert
+is sufficient).
+
+IMAP -> Proton mapping:
+- \\Seen flag -> Read state
+- \\Flagged flag -> Starred label
+- MOVE to Trash -> Trash folder
+- COPY to label -> applies the label (Starred etc.)
+"""
+
+import argparse
+import email
+import imaplib
+import json
+import mimetypes
+import smtplib
+import ssl
+import sys
+from email.message import EmailMessage
+from email.policy import default as default_policy
+from pathlib import Path
+
+HOST = "127.0.0.1"
+PORT = 1143
+SMTP_PORT = 1025
+USER = "c@cjennings.net"
+PASS_FILE = Path.home() / ".config" / ".cmailpass"
+CERT_FILE = Path.home() / ".config" / "protonbridge.pem"
+
+INBOX = "INBOX"
+TRASH = "Trash"
+
+
+def connect():
+ if not PASS_FILE.is_file():
+ sys.exit(f"error: missing password file {PASS_FILE}")
+ if not CERT_FILE.is_file():
+ sys.exit(f"error: missing bridge cert {CERT_FILE}")
+ ctx = ssl.create_default_context(cafile=str(CERT_FILE))
+ ctx.check_hostname = False
+ try:
+ M = imaplib.IMAP4(HOST, PORT)
+ except OSError as e:
+ sys.exit(f"error: cannot reach Bridge at {HOST}:{PORT} ({e}). "
+ f"Is protonmail-bridge running? "
+ f"(systemctl --user status protonmail-bridge)")
+ M.starttls(ssl_context=ctx)
+ password = PASS_FILE.read_text().strip()
+ try:
+ M.login(USER, password)
+ except imaplib.IMAP4.error as e:
+ sys.exit(f"error: IMAP login failed for {USER}: {e}")
+ return M
+
+
+def _select(M, mailbox=INBOX, readonly=False):
+ typ, data = M.select(mailbox, readonly=readonly)
+ if typ != "OK":
+ sys.exit(f"error: cannot select {mailbox}: {data}")
+
+
+def _decode_header(value):
+ if value is None:
+ return ""
+ return str(value)
+
+
+def parse_fetch_metadata(meta):
+ """Parse FLAGS and RFC822.SIZE out of an IMAP FETCH metadata string.
+
+ Returns {"flags": str, "size": int | None}. Tolerates malformed input
+ (returns the defaults rather than raising).
+ """
+ result = {"flags": "", "size": None}
+ flags_idx = meta.find("FLAGS (")
+ if flags_idx != -1:
+ end = meta.find(")", flags_idx)
+ if end != -1:
+ result["flags"] = meta[flags_idx + 7:end]
+ # Tokenize with parens stripped so RFC822.SIZE matches whether or not
+ # it abuts an opening paren in the raw response (e.g. "(RFC822.SIZE 500)"
+ # would otherwise tokenize as "(RFC822.SIZE" and miss the equality check).
+ tokens = meta.replace("(", " ").replace(")", " ").split()
+ for i, p in enumerate(tokens):
+ if p == "RFC822.SIZE" and i + 1 < len(tokens):
+ try:
+ result["size"] = int(tokens[i + 1])
+ except ValueError:
+ pass
+ break
+ return result
+
+
+def extract_body(msg):
+ """Pick a printable body out of an email.message.EmailMessage.
+
+ Multipart: text/plain preferred, text/html fallback. Single-part:
+ returns content directly. Returns None if no body found.
+ """
+ if msg.is_multipart():
+ for part in msg.walk():
+ if part.get_content_type() == "text/plain":
+ return part.get_content()
+ for part in msg.walk():
+ if part.get_content_type() == "text/html":
+ return part.get_content()
+ return None
+ return msg.get_content()
+
+
+def build_message(from_addr, to_addr, subject, body, attachments=None):
+ """Construct an EmailMessage from the given fields and attachments.
+
+ attachments is a list of (filename, maintype, subtype, content_bytes)
+ tuples — typically the return value of load_attachment per file. Pure
+ function: no I/O, no SMTP.
+ """
+ msg = EmailMessage()
+ msg["From"] = from_addr
+ msg["To"] = to_addr
+ msg["Subject"] = subject
+ msg.set_content(body)
+ for filename, maintype, subtype, content in (attachments or []):
+ msg.add_attachment(content, maintype=maintype, subtype=subtype,
+ filename=filename)
+ return msg
+
+
+def load_attachment(path):
+ """Read a file and return (filename, maintype, subtype, content_bytes).
+
+ MIME type comes from mimetypes.guess_type; falls back to
+ application/octet-stream when guess returns None. Raises FileNotFoundError
+ for missing paths and IsADirectoryError for directories.
+ """
+ if not path.exists():
+ raise FileNotFoundError(f"attachment not found: {path}")
+ if path.is_dir():
+ raise IsADirectoryError(f"attachment path is a directory: {path}")
+ mime, _ = mimetypes.guess_type(path.name)
+ if mime is None:
+ maintype, subtype = "application", "octet-stream"
+ else:
+ maintype, subtype = mime.split("/", 1)
+ return (path.name, maintype, subtype, path.read_bytes())
+
+
+def smtp_connect():
+ """Connect to Proton Bridge's local SMTP submission endpoint.
+
+ Mirrors connect()'s pattern: STARTTLS against the pinned cert,
+ plaintext password from PASS_FILE. Skipped from unit tests for
+ the same reason connect() is — network + SSL + file I/O.
+ """
+ if not PASS_FILE.is_file():
+ sys.exit(f"error: missing password file {PASS_FILE}")
+ if not CERT_FILE.is_file():
+ sys.exit(f"error: missing bridge cert {CERT_FILE}")
+ ctx = ssl.create_default_context(cafile=str(CERT_FILE))
+ ctx.check_hostname = False
+ try:
+ smtp = smtplib.SMTP(HOST, SMTP_PORT)
+ except OSError as e:
+ sys.exit(f"error: cannot reach Bridge SMTP at {HOST}:{SMTP_PORT} ({e}). "
+ f"Is protonmail-bridge running?")
+ smtp.starttls(context=ctx)
+ password = PASS_FILE.read_text().strip()
+ try:
+ smtp.login(USER, password)
+ except smtplib.SMTPException as e:
+ sys.exit(f"error: SMTP login failed for {USER}: {e}")
+ return smtp
+
+
+def cmd_list_unread(args):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=True)
+ typ, data = M.uid("SEARCH", None, "UNSEEN")
+ if typ != "OK":
+ sys.exit(f"error: search failed: {data}")
+ uids = data[0].split() if data and data[0] else []
+ if args.limit and len(uids) > args.limit:
+ uids = uids[-args.limit:]
+ out = []
+ for uid in uids:
+ uid_s = uid.decode()
+ typ, data = M.uid(
+ "FETCH", uid,
+ "(BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)] "
+ "FLAGS RFC822.SIZE)"
+ )
+ if typ != "OK" or not data or not data[0]:
+ continue
+ # FLAGS / RFC822.SIZE may arrive in a non-tuple chunk after
+ # the BODY literal closes. Concatenate all chunks before
+ # parsing so the parser sees the full metadata.
+ hdr_raw = b""
+ meta_str = ""
+ for chunk in data:
+ if isinstance(chunk, tuple):
+ hdr_raw = chunk[1]
+ meta_str += chunk[0].decode("utf-8", errors="replace") + " "
+ elif isinstance(chunk, (bytes, bytearray)):
+ meta_str += chunk.decode("utf-8", errors="replace") + " "
+ parsed = parse_fetch_metadata(meta_str)
+ msg = email.message_from_bytes(hdr_raw, policy=default_policy)
+ out.append({
+ "uid": uid_s,
+ "from": _decode_header(msg.get("From")),
+ "to": _decode_header(msg.get("To")),
+ "subject": _decode_header(msg.get("Subject")),
+ "date": _decode_header(msg.get("Date")),
+ "flags": parsed["flags"],
+ "size": parsed["size"],
+ })
+ print(json.dumps(out, indent=2, ensure_ascii=False))
+ finally:
+ M.logout()
+
+
+def cmd_read(args):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=True)
+ typ, data = M.uid("FETCH", str(args.uid).encode(), "(RFC822)")
+ if typ != "OK" or not data or not data[0]:
+ sys.exit(f"error: uid {args.uid} not found in {INBOX}")
+ raw = data[0][1]
+ msg = email.message_from_bytes(raw, policy=default_policy)
+ for h in ("From", "To", "Cc", "Date", "Subject"):
+ v = msg.get(h)
+ if v:
+ print(f"{h}: {v}")
+ print()
+ body = extract_body(msg)
+ print(body if body is not None else "<no body>")
+ finally:
+ M.logout()
+
+
+def _store(uids, op, flags):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=False)
+ for uid in uids:
+ typ, data = M.uid("STORE", str(uid).encode(), op, flags)
+ if typ != "OK":
+ sys.exit(f"error: STORE {op} {flags} on uid {uid} failed: {data}")
+ print(f"ok: STORE {op} {flags} on {len(uids)} uid(s)")
+ finally:
+ M.logout()
+
+
+def cmd_mark_read(args):
+ _store(args.uids, "+FLAGS", r"(\Seen)")
+
+
+def cmd_mark_unread(args):
+ _store(args.uids, "-FLAGS", r"(\Seen)")
+
+
+def cmd_star(args):
+ # Workflow convention: starring also marks read (matches the Gmail flow).
+ _store(args.uids, "+FLAGS", r"(\Flagged \Seen)")
+
+
+def cmd_unstar(args):
+ _store(args.uids, "-FLAGS", r"(\Flagged)")
+
+
+def cmd_trash(args):
+ M = connect()
+ try:
+ _select(M, INBOX, readonly=False)
+ moved = 0
+ for uid in args.uids:
+ typ, data = M.uid("MOVE", str(uid).encode(), TRASH)
+ if typ != "OK":
+ # Fallback for servers without RFC 6851 MOVE.
+ typ2, data2 = M.uid("COPY", str(uid).encode(), TRASH)
+ if typ2 != "OK":
+ sys.exit(f"error: COPY uid {uid} -> {TRASH} failed: {data2}")
+ M.uid("STORE", str(uid).encode(), "+FLAGS", r"(\Deleted)")
+ moved += 1
+ M.expunge()
+ print(f"ok: moved {moved} uid(s) to {TRASH}")
+ finally:
+ M.logout()
+
+
+def cmd_send(args):
+ # Resolve attachments first so a missing file fails before SMTP opens.
+ attachments = [load_attachment(Path(p)) for p in (args.attach or [])]
+ if args.body is not None:
+ body = args.body
+ elif args.body_file is not None:
+ body = Path(args.body_file).read_text()
+ else:
+ body = sys.stdin.read()
+ msg = build_message(USER, args.to, args.subject, body, attachments)
+ smtp = smtp_connect()
+ try:
+ smtp.send_message(msg)
+ print(f"ok: sent to {args.to}")
+ finally:
+ smtp.quit()
+
+
+def cmd_folders(_args):
+ M = connect()
+ try:
+ typ, data = M.list()
+ if typ != "OK":
+ sys.exit(f"error: LIST failed: {data}")
+ for line in data:
+ print(line.decode("utf-8", errors="replace"))
+ finally:
+ M.logout()
+
+
+def main():
+ p = argparse.ArgumentParser(prog="cmail-action",
+ description="IMAP triage against Proton Bridge")
+ sp = p.add_subparsers(dest="cmd", required=True)
+
+ p_list = sp.add_parser("list-unread", help="list unread INBOX messages as JSON")
+ p_list.add_argument("--limit", type=int, default=50,
+ help="cap to N most recent (default 50)")
+ p_list.set_defaults(func=cmd_list_unread)
+
+ p_read = sp.add_parser("read", help="print headers + body of a UID")
+ p_read.add_argument("uid", type=int)
+ p_read.set_defaults(func=cmd_read)
+
+ p_mr = sp.add_parser("mark-read")
+ p_mr.add_argument("uids", nargs="+", type=int)
+ p_mr.set_defaults(func=cmd_mark_read)
+
+ p_mu = sp.add_parser("mark-unread")
+ p_mu.add_argument("uids", nargs="+", type=int)
+ p_mu.set_defaults(func=cmd_mark_unread)
+
+ p_s = sp.add_parser("star", help="star (sets \\Flagged + \\Seen)")
+ p_s.add_argument("uids", nargs="+", type=int)
+ p_s.set_defaults(func=cmd_star)
+
+ p_us = sp.add_parser("unstar")
+ p_us.add_argument("uids", nargs="+", type=int)
+ p_us.set_defaults(func=cmd_unstar)
+
+ p_t = sp.add_parser("trash", help="MOVE uid(s) to Trash")
+ p_t.add_argument("uids", nargs="+", type=int)
+ p_t.set_defaults(func=cmd_trash)
+
+ p_f = sp.add_parser("folders", help="list IMAP folders (debug)")
+ p_f.set_defaults(func=cmd_folders)
+
+ p_send = sp.add_parser("send", help="send an email via Bridge SMTP")
+ p_send.add_argument("--to", required=True, help="recipient address")
+ p_send.add_argument("--subject", required=True)
+ body_group = p_send.add_mutually_exclusive_group()
+ body_group.add_argument("--body", help="body text inline")
+ body_group.add_argument("--body-file", help="path to a file whose "
+ "contents become the body")
+ p_send.add_argument("--attach", action="append", default=[],
+ help="path to attach (repeatable)")
+ p_send.set_defaults(func=cmd_send)
+
+ args = p.parse_args()
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()