diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-06 21:59:52 -0500 |
| commit | d81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch) | |
| tree | 2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/gmail-fetch-attachments.py | |
| parent | 201377f57430ef28d02e703a2191434bbee55c75 (diff) | |
| download | rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip | |
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/gmail-fetch-attachments.py')
| -rwxr-xr-x | .ai/scripts/gmail-fetch-attachments.py | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/.ai/scripts/gmail-fetch-attachments.py b/.ai/scripts/gmail-fetch-attachments.py new file mode 100755 index 0000000..8aa2789 --- /dev/null +++ b/.ai/scripts/gmail-fetch-attachments.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +"""Fetch Gmail message attachments via the same OAuth identity the +google-docs-mcp servers use. + +Usage: + gmail-fetch-attachments.py --profile {personal,work} \ + --message-id <ID> --output-dir <PATH> + +Reuses: + - Refresh token at ~/.config/google-docs-mcp/[<GOOGLE_MCP_PROFILE>/]token.json + (the subdir is only present when GOOGLE_MCP_PROFILE is set on the + mcpServers entry; otherwise the cache lives at the directory root) + - Client ID + secret from ~/.claude.json's + mcpServers["google-docs-<profile>"].env + +Stdlib only. Saves each non-inline attachment using its original filename. +Skips attachments that already exist in --output-dir (size-matched). +""" +from __future__ import annotations + +import argparse +import base64 +import json +import sys +import urllib.parse +import urllib.request +from pathlib import Path + +OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token" +GMAIL_API = "https://gmail.googleapis.com/gmail/v1/users/me" +TOKEN_DIR = Path.home() / ".config" / "google-docs-mcp" +CLAUDE_CONFIG = Path.home() / ".claude.json" + + +def load_mcp_env(profile: str) -> dict: + if not CLAUDE_CONFIG.exists(): + sys.exit(f"claude config missing: {CLAUDE_CONFIG}") + config = json.loads(CLAUDE_CONFIG.read_text()) + server_name = f"google-docs-{profile}" + servers = config.get("mcpServers", {}) + if server_name not in servers: + sys.exit(f"mcpServers.{server_name} not found in {CLAUDE_CONFIG}") + return servers[server_name].get("env", {}) or {} + + +def load_refresh_token(env: dict) -> str: + # The MCP server keys its token cache by GOOGLE_MCP_PROFILE. When the + # var is unset on the mcpServers entry, the cache lives at the root + # (TOKEN_DIR/token.json), not under a <profile>/ subdirectory. + mcp_profile = env.get("GOOGLE_MCP_PROFILE") or "" + path = TOKEN_DIR / mcp_profile / "token.json" if mcp_profile else TOKEN_DIR / "token.json" + if not path.exists(): + sys.exit(f"token cache missing: {path}") + data = json.loads(path.read_text()) + if "refresh_token" not in data: + sys.exit(f"no refresh_token in {path}") + return data["refresh_token"] + + +def load_client_creds(env: dict) -> tuple[str, str]: + cid = env.get("GOOGLE_CLIENT_ID") + secret = env.get("GOOGLE_CLIENT_SECRET") + if not cid or not secret: + sys.exit("GOOGLE_CLIENT_ID/SECRET missing in MCP env") + return cid, secret + + +def refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> str: + body = urllib.parse.urlencode( + { + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + "grant_type": "refresh_token", + } + ).encode() + req = urllib.request.Request( + OAUTH_TOKEN_URL, + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + payload = json.loads(resp.read()) + if "access_token" not in payload: + sys.exit(f"refresh failed: {payload}") + return payload["access_token"] + + +def gmail_get(path: str, access_token: str) -> dict: + req = urllib.request.Request( + f"{GMAIL_API}{path}", + headers={"Authorization": f"Bearer {access_token}"}, + ) + with urllib.request.urlopen(req, timeout=60) as resp: + return json.loads(resp.read()) + + +def collect_attachments(payload: dict) -> list[dict]: + """Walk the MIME tree and collect parts that have an attachmentId. + + Returns list of {filename, attachmentId, size, mimeType}. + Skips parts without a filename (inline images, etc.). + """ + results: list[dict] = [] + + def walk(part: dict) -> None: + body = part.get("body", {}) or {} + filename = part.get("filename") or "" + if filename and "attachmentId" in body: + results.append( + { + "filename": filename, + "attachmentId": body["attachmentId"], + "size": body.get("size", 0), + "mimeType": part.get("mimeType", "application/octet-stream"), + } + ) + for sub in part.get("parts", []) or []: + walk(sub) + + walk(payload) + return results + + +def safe_filename(name: str) -> str: + """Strip path separators. Preserve everything else.""" + return name.replace("/", "_").replace("\\", "_").lstrip(".") + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--profile", choices=["personal", "work"], required=True) + ap.add_argument("--message-id", required=True) + ap.add_argument("--output-dir", required=True, type=Path) + args = ap.parse_args() + + args.output_dir.mkdir(parents=True, exist_ok=True) + + env = load_mcp_env(args.profile) + refresh_token = load_refresh_token(env) + client_id, client_secret = load_client_creds(env) + access_token = refresh_access_token(refresh_token, client_id, client_secret) + + msg = gmail_get( + f"/messages/{args.message_id}?format=full", access_token + ) + attachments = collect_attachments(msg.get("payload", {})) + + if not attachments: + print("no attachments on this message") + return 0 + + print(f"found {len(attachments)} attachment(s):") + for att in attachments: + target = args.output_dir / safe_filename(att["filename"]) + if target.exists() and target.stat().st_size == att["size"]: + print(f" skip (already present): {target}") + continue + data_resp = gmail_get( + f"/messages/{args.message_id}/attachments/{att['attachmentId']}", + access_token, + ) + raw = base64.urlsafe_b64decode(data_resp["data"]) + target.write_bytes(raw) + print(f" saved: {target} ({len(raw):,} bytes)") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) |
