aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/cmail-action.py
blob: 0acd82d624b869a54c457b71bbae09ac07f0a80a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
#!/usr/bin/env python3
"""
cmail-action — IMAP triage operations against Proton Mail Bridge.

Mirrors the operations the Gmail MCP server provides for gmail/dmail
(list-unread, read, mark-read, star, unstar, trash) so the
process-unread-emails workflow can drive cmail end-to-end the same way.

Connects to local Proton Bridge IMAP at 127.0.0.1:1143 with STARTTLS,
using the Bridge-generated app password at ~/.config/.cmailpass and the
Bridge self-signed certificate at ~/.config/protonbridge.pem. Cert CN
is 127.0.0.1 but lacks a SubjectAltName, so hostname verification is
disabled (connection is to localhost — verifying via the pinned cert
is sufficient).

IMAP -> Proton mapping:
- \\Seen flag    -> Read state
- \\Flagged flag -> Starred label
- MOVE to Trash  -> Trash folder
- COPY to label  -> applies the label (Starred etc.)
"""

import argparse
import email
import imaplib
import json
import mimetypes
import smtplib
import ssl
import sys
from email.message import EmailMessage
from email.policy import default as default_policy
from pathlib import Path

HOST = "127.0.0.1"
PORT = 1143
SMTP_PORT = 1025
USER = "c@cjennings.net"
PASS_FILE = Path.home() / ".config" / ".cmailpass"
CERT_FILE = Path.home() / ".config" / "protonbridge.pem"

INBOX = "INBOX"
TRASH = "Trash"


def connect():
    if not PASS_FILE.is_file():
        sys.exit(f"error: missing password file {PASS_FILE}")
    if not CERT_FILE.is_file():
        sys.exit(f"error: missing bridge cert {CERT_FILE}")
    ctx = ssl.create_default_context(cafile=str(CERT_FILE))
    ctx.check_hostname = False
    try:
        M = imaplib.IMAP4(HOST, PORT)
    except OSError as e:
        sys.exit(f"error: cannot reach Bridge at {HOST}:{PORT} ({e}). "
                 f"Is protonmail-bridge running? "
                 f"(systemctl --user status protonmail-bridge)")
    M.starttls(ssl_context=ctx)
    password = PASS_FILE.read_text().strip()
    try:
        M.login(USER, password)
    except imaplib.IMAP4.error as e:
        sys.exit(f"error: IMAP login failed for {USER}: {e}")
    return M


def _select(M, mailbox=INBOX, readonly=False):
    typ, data = M.select(mailbox, readonly=readonly)
    if typ != "OK":
        sys.exit(f"error: cannot select {mailbox}: {data}")


def _decode_header(value):
    if value is None:
        return ""
    return str(value)


def parse_fetch_metadata(meta):
    """Parse FLAGS and RFC822.SIZE out of an IMAP FETCH metadata string.

    Returns {"flags": str, "size": int | None}. Tolerates malformed input
    (returns the defaults rather than raising).
    """
    result = {"flags": "", "size": None}
    flags_idx = meta.find("FLAGS (")
    if flags_idx != -1:
        end = meta.find(")", flags_idx)
        if end != -1:
            result["flags"] = meta[flags_idx + 7:end]
    # Tokenize with parens stripped so RFC822.SIZE matches whether or not
    # it abuts an opening paren in the raw response (e.g. "(RFC822.SIZE 500)"
    # would otherwise tokenize as "(RFC822.SIZE" and miss the equality check).
    tokens = meta.replace("(", " ").replace(")", " ").split()
    for i, p in enumerate(tokens):
        if p == "RFC822.SIZE" and i + 1 < len(tokens):
            try:
                result["size"] = int(tokens[i + 1])
            except ValueError:
                pass
            break
    return result


def extract_body(msg):
    """Pick a printable body out of an email.message.EmailMessage.

    Multipart: text/plain preferred, text/html fallback. Single-part:
    returns content directly. Returns None if no body found.
    """
    if msg.is_multipart():
        for part in msg.walk():
            if part.get_content_type() == "text/plain":
                return part.get_content()
        for part in msg.walk():
            if part.get_content_type() == "text/html":
                return part.get_content()
        return None
    return msg.get_content()


def build_message(from_addr, to_addr, subject, body, attachments=None,
                  cc=None, bcc=None, in_reply_to=None, references=None):
    """Construct an EmailMessage from the given fields and attachments.

    attachments is a list of (filename, maintype, subtype, content_bytes)
    tuples — typically the return value of load_attachment per file.

    cc and bcc accept either a list of addresses or a single string; the
    Cc/Bcc headers are set when present (smtplib.send_message reads them for
    delivery and strips Bcc before sending). in_reply_to and references set
    the In-Reply-To and References headers so a reply threads on the
    recipient's end. Pure function: no I/O, no SMTP.
    """
    msg = EmailMessage()
    msg["From"] = from_addr
    msg["To"] = to_addr
    if cc:
        msg["Cc"] = ", ".join(cc) if isinstance(cc, (list, tuple)) else cc
    if bcc:
        msg["Bcc"] = ", ".join(bcc) if isinstance(bcc, (list, tuple)) else bcc
    msg["Subject"] = subject
    if in_reply_to:
        msg["In-Reply-To"] = in_reply_to
    if references:
        msg["References"] = references
    msg.set_content(body)
    for filename, maintype, subtype, content in (attachments or []):
        msg.add_attachment(content, maintype=maintype, subtype=subtype,
                           filename=filename)
    return msg


def load_attachment(path):
    """Read a file and return (filename, maintype, subtype, content_bytes).

    MIME type comes from mimetypes.guess_type; falls back to
    application/octet-stream when guess returns None. Raises FileNotFoundError
    for missing paths and IsADirectoryError for directories.
    """
    if not path.exists():
        raise FileNotFoundError(f"attachment not found: {path}")
    if path.is_dir():
        raise IsADirectoryError(f"attachment path is a directory: {path}")
    mime, _ = mimetypes.guess_type(path.name)
    if mime is None:
        maintype, subtype = "application", "octet-stream"
    else:
        maintype, subtype = mime.split("/", 1)
    return (path.name, maintype, subtype, path.read_bytes())


def smtp_connect():
    """Connect to Proton Bridge's local SMTP submission endpoint.

    Mirrors connect()'s pattern: STARTTLS against the pinned cert,
    plaintext password from PASS_FILE. Skipped from unit tests for
    the same reason connect() is — network + SSL + file I/O.
    """
    if not PASS_FILE.is_file():
        sys.exit(f"error: missing password file {PASS_FILE}")
    if not CERT_FILE.is_file():
        sys.exit(f"error: missing bridge cert {CERT_FILE}")
    ctx = ssl.create_default_context(cafile=str(CERT_FILE))
    ctx.check_hostname = False
    try:
        smtp = smtplib.SMTP(HOST, SMTP_PORT)
    except OSError as e:
        sys.exit(f"error: cannot reach Bridge SMTP at {HOST}:{SMTP_PORT} ({e}). "
                 f"Is protonmail-bridge running?")
    smtp.starttls(context=ctx)
    password = PASS_FILE.read_text().strip()
    try:
        smtp.login(USER, password)
    except smtplib.SMTPException as e:
        sys.exit(f"error: SMTP login failed for {USER}: {e}")
    return smtp


def cmd_list_unread(args):
    M = connect()
    try:
        _select(M, INBOX, readonly=True)
        typ, data = M.uid("SEARCH", None, "UNSEEN")
        if typ != "OK":
            sys.exit(f"error: search failed: {data}")
        uids = data[0].split() if data and data[0] else []
        if args.limit and len(uids) > args.limit:
            uids = uids[-args.limit:]
        out = []
        for uid in uids:
            uid_s = uid.decode()
            typ, data = M.uid(
                "FETCH", uid,
                "(BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)] "
                "FLAGS RFC822.SIZE)"
            )
            if typ != "OK" or not data or not data[0]:
                continue
            # FLAGS / RFC822.SIZE may arrive in a non-tuple chunk after
            # the BODY literal closes. Concatenate all chunks before
            # parsing so the parser sees the full metadata.
            hdr_raw = b""
            meta_str = ""
            for chunk in data:
                if isinstance(chunk, tuple):
                    hdr_raw = chunk[1]
                    meta_str += chunk[0].decode("utf-8", errors="replace") + " "
                elif isinstance(chunk, (bytes, bytearray)):
                    meta_str += chunk.decode("utf-8", errors="replace") + " "
            parsed = parse_fetch_metadata(meta_str)
            msg = email.message_from_bytes(hdr_raw, policy=default_policy)
            out.append({
                "uid": uid_s,
                "from": _decode_header(msg.get("From")),
                "to": _decode_header(msg.get("To")),
                "subject": _decode_header(msg.get("Subject")),
                "date": _decode_header(msg.get("Date")),
                "flags": parsed["flags"],
                "size": parsed["size"],
            })
        print(json.dumps(out, indent=2, ensure_ascii=False))
    finally:
        M.logout()


def cmd_read(args):
    M = connect()
    try:
        _select(M, INBOX, readonly=True)
        typ, data = M.uid("FETCH", str(args.uid).encode(), "(RFC822)")
        if typ != "OK" or not data or not data[0]:
            sys.exit(f"error: uid {args.uid} not found in {INBOX}")
        raw = data[0][1]
        msg = email.message_from_bytes(raw, policy=default_policy)
        for h in ("From", "To", "Cc", "Date", "Subject"):
            v = msg.get(h)
            if v:
                print(f"{h}: {v}")
        print()
        body = extract_body(msg)
        print(body if body is not None else "<no body>")
    finally:
        M.logout()


def _store(uids, op, flags):
    M = connect()
    try:
        _select(M, INBOX, readonly=False)
        for uid in uids:
            typ, data = M.uid("STORE", str(uid).encode(), op, flags)
            if typ != "OK":
                sys.exit(f"error: STORE {op} {flags} on uid {uid} failed: {data}")
        print(f"ok: STORE {op} {flags} on {len(uids)} uid(s)")
    finally:
        M.logout()


def cmd_mark_read(args):
    _store(args.uids, "+FLAGS", r"(\Seen)")


def cmd_mark_unread(args):
    _store(args.uids, "-FLAGS", r"(\Seen)")


def cmd_star(args):
    # Workflow convention: starring also marks read (matches the Gmail flow).
    _store(args.uids, "+FLAGS", r"(\Flagged \Seen)")


def cmd_unstar(args):
    _store(args.uids, "-FLAGS", r"(\Flagged)")


def cmd_trash(args):
    M = connect()
    try:
        _select(M, INBOX, readonly=False)
        moved = 0
        for uid in args.uids:
            typ, data = M.uid("MOVE", str(uid).encode(), TRASH)
            if typ != "OK":
                # Fallback for servers without RFC 6851 MOVE.
                typ2, data2 = M.uid("COPY", str(uid).encode(), TRASH)
                if typ2 != "OK":
                    sys.exit(f"error: COPY uid {uid} -> {TRASH} failed: {data2}")
                M.uid("STORE", str(uid).encode(), "+FLAGS", r"(\Deleted)")
            moved += 1
        M.expunge()
        print(f"ok: moved {moved} uid(s) to {TRASH}")
    finally:
        M.logout()


def cmd_send(args):
    # Resolve attachments first so a missing file fails before SMTP opens.
    attachments = [load_attachment(Path(p)) for p in (args.attach or [])]
    if args.body is not None:
        body = args.body
    elif args.body_file is not None:
        body = Path(args.body_file).read_text()
    else:
        body = sys.stdin.read()
    msg = build_message(USER, args.to, args.subject, body, attachments,
                        cc=args.cc, bcc=args.bcc,
                        in_reply_to=args.in_reply_to, references=args.references)
    smtp = smtp_connect()
    try:
        smtp.send_message(msg)
        print(f"ok: sent to {args.to}")
    finally:
        smtp.quit()


def cmd_folders(_args):
    M = connect()
    try:
        typ, data = M.list()
        if typ != "OK":
            sys.exit(f"error: LIST failed: {data}")
        for line in data:
            print(line.decode("utf-8", errors="replace"))
    finally:
        M.logout()


def main():
    p = argparse.ArgumentParser(prog="cmail-action",
                                description="IMAP triage against Proton Bridge")
    sp = p.add_subparsers(dest="cmd", required=True)

    p_list = sp.add_parser("list-unread", help="list unread INBOX messages as JSON")
    p_list.add_argument("--limit", type=int, default=50,
                        help="cap to N most recent (default 50)")
    p_list.set_defaults(func=cmd_list_unread)

    p_read = sp.add_parser("read", help="print headers + body of a UID")
    p_read.add_argument("uid", type=int)
    p_read.set_defaults(func=cmd_read)

    p_mr = sp.add_parser("mark-read")
    p_mr.add_argument("uids", nargs="+", type=int)
    p_mr.set_defaults(func=cmd_mark_read)

    p_mu = sp.add_parser("mark-unread")
    p_mu.add_argument("uids", nargs="+", type=int)
    p_mu.set_defaults(func=cmd_mark_unread)

    p_s = sp.add_parser("star", help="star (sets \\Flagged + \\Seen)")
    p_s.add_argument("uids", nargs="+", type=int)
    p_s.set_defaults(func=cmd_star)

    p_us = sp.add_parser("unstar")
    p_us.add_argument("uids", nargs="+", type=int)
    p_us.set_defaults(func=cmd_unstar)

    p_t = sp.add_parser("trash", help="MOVE uid(s) to Trash")
    p_t.add_argument("uids", nargs="+", type=int)
    p_t.set_defaults(func=cmd_trash)

    p_f = sp.add_parser("folders", help="list IMAP folders (debug)")
    p_f.set_defaults(func=cmd_folders)

    p_send = sp.add_parser("send", help="send an email via Bridge SMTP")
    p_send.add_argument("--to", required=True, help="recipient address")
    p_send.add_argument("--subject", required=True)
    body_group = p_send.add_mutually_exclusive_group()
    body_group.add_argument("--body", help="body text inline")
    body_group.add_argument("--body-file", help="path to a file whose "
                            "contents become the body")
    p_send.add_argument("--attach", action="append", default=[],
                        help="path to attach (repeatable)")
    p_send.add_argument("--cc", action="append", default=[],
                        help="Cc address (repeatable)")
    p_send.add_argument("--bcc", action="append", default=[],
                        help="Bcc address (repeatable)")
    p_send.add_argument("--in-reply-to",
                        help="Message-ID this replies to (threads on the recipient's end)")
    p_send.add_argument("--references",
                        help="References header (space-separated Message-IDs)")
    p_send.set_defaults(func=cmd_send)

    args = p.parse_args()
    args.func(args)


if __name__ == "__main__":
    main()