diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-15 16:56:39 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-15 16:56:39 -0500 |
| commit | c1d4e3c4a42abd01bc7ef83b1d6ae036ee32ef1d (patch) | |
| tree | 3e6dcc682cbf2311409e7f71d83a7d4088392068 /claude-templates/.ai/scripts/gmail-fetch-attachments.py | |
| parent | 2b471da4bab014a2e096f63edc7aac235fc40fdd (diff) | |
| parent | 69c5e4ace81586c05dea6a9a3afd54dafa61a73b (diff) | |
| download | rulesets-c1d4e3c4a42abd01bc7ef83b1d6ae036ee32ef1d.tar.gz rulesets-c1d4e3c4a42abd01bc7ef83b1d6ae036ee32ef1d.zip | |
Merge commit '69c5e4ace81586c05dea6a9a3afd54dafa61a73b' as 'claude-templates'
Diffstat (limited to 'claude-templates/.ai/scripts/gmail-fetch-attachments.py')
| -rwxr-xr-x | claude-templates/.ai/scripts/gmail-fetch-attachments.py | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/claude-templates/.ai/scripts/gmail-fetch-attachments.py b/claude-templates/.ai/scripts/gmail-fetch-attachments.py new file mode 100755 index 0000000..b42101c --- /dev/null +++ b/claude-templates/.ai/scripts/gmail-fetch-attachments.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +"""Fetch Gmail message attachments via the same OAuth identity the +google-docs-mcp servers use. + +Usage: + gmail-fetch-attachments.py --profile {personal,work} \ + --message-id <ID> --output-dir <PATH> + +Reuses: + - Refresh token at ~/.config/google-docs-mcp/[<GOOGLE_MCP_PROFILE>/]token.json + (the subdir is only present when GOOGLE_MCP_PROFILE is set on the + mcpServers entry; otherwise the cache lives at the directory root) + - Client ID + secret from ~/.claude.json's + mcpServers["google-docs-<profile>"].env + +Stdlib only. Saves each non-inline attachment using its original filename. +Skips attachments that already exist in --output-dir (size-matched). +""" +from __future__ import annotations + +import argparse +import base64 +import json +import sys +import urllib.parse +import urllib.request +from pathlib import Path + +OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token" +GMAIL_API = "https://gmail.googleapis.com/gmail/v1/users/me" +TOKEN_DIR = Path.home() / ".config" / "google-docs-mcp" +CLAUDE_CONFIG = Path.home() / ".claude.json" + + +def load_mcp_env(profile: str) -> dict: + if not CLAUDE_CONFIG.exists(): + sys.exit(f"claude config missing: {CLAUDE_CONFIG}") + config = json.loads(CLAUDE_CONFIG.read_text()) + server_name = f"google-docs-{profile}" + servers = config.get("mcpServers", {}) + if server_name not in servers: + sys.exit(f"mcpServers.{server_name} not found in {CLAUDE_CONFIG}") + return servers[server_name].get("env", {}) or {} + + +def load_refresh_token(env: dict) -> str: + # The MCP server keys its token cache by GOOGLE_MCP_PROFILE. When the + # var is unset on the mcpServers entry, the cache lives at the root + # (TOKEN_DIR/token.json), not under a <profile>/ subdirectory. + mcp_profile = env.get("GOOGLE_MCP_PROFILE") or "" + path = TOKEN_DIR / mcp_profile / "token.json" if mcp_profile else TOKEN_DIR / "token.json" + if not path.exists(): + sys.exit(f"token cache missing: {path}") + data = json.loads(path.read_text()) + if "refresh_token" not in data: + sys.exit(f"no refresh_token in {path}") + return data["refresh_token"] + + +def load_client_creds(env: dict) -> tuple[str, str]: + cid = env.get("GOOGLE_CLIENT_ID") + secret = env.get("GOOGLE_CLIENT_SECRET") + if not cid or not secret: + sys.exit("GOOGLE_CLIENT_ID/SECRET missing in MCP env") + return cid, secret + + +def refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> str: + body = urllib.parse.urlencode( + { + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + "grant_type": "refresh_token", + } + ).encode() + req = urllib.request.Request( + OAUTH_TOKEN_URL, + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + payload = json.loads(resp.read()) + if "access_token" not in payload: + sys.exit(f"refresh failed: {payload}") + return payload["access_token"] + + +def gmail_get(path: str, access_token: str) -> dict: + req = urllib.request.Request( + f"{GMAIL_API}{path}", + headers={"Authorization": f"Bearer {access_token}"}, + ) + with urllib.request.urlopen(req, timeout=60) as resp: + return json.loads(resp.read()) + + +def collect_attachments(payload: dict) -> list[dict]: + """Walk the MIME tree and collect parts that have an attachmentId. + + Returns list of {filename, attachmentId, size, mimeType}. + Skips parts without a filename (inline images, etc.). + """ + results: list[dict] = [] + + def walk(part: dict) -> None: + body = part.get("body", {}) or {} + filename = part.get("filename") or "" + if filename and "attachmentId" in body: + results.append( + { + "filename": filename, + "attachmentId": body["attachmentId"], + "size": body.get("size", 0), + "mimeType": part.get("mimeType", "application/octet-stream"), + } + ) + for sub in part.get("parts", []) or []: + walk(sub) + + walk(payload) + return results + + +def safe_filename(name: str) -> str: + """Strip path separators and leading parent-dir markers (..). + + Path separators become underscores so the filename can't escape the + output directory. Leading ".." sequences are stripped so an attachment + named "../foo" lands as "_foo" rather than ".._foo". Single leading + dots are preserved so dotfiles like ".gitignore" survive intact. + """ + cleaned = name.replace("/", "_").replace("\\", "_") + while cleaned.startswith(".."): + cleaned = cleaned[2:] + return cleaned + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--profile", choices=["personal", "work"], required=True) + ap.add_argument("--message-id", required=True) + ap.add_argument("--output-dir", required=True, type=Path) + args = ap.parse_args() + + args.output_dir.mkdir(parents=True, exist_ok=True) + + env = load_mcp_env(args.profile) + refresh_token = load_refresh_token(env) + client_id, client_secret = load_client_creds(env) + access_token = refresh_access_token(refresh_token, client_id, client_secret) + + msg = gmail_get( + f"/messages/{args.message_id}?format=full", access_token + ) + attachments = collect_attachments(msg.get("payload", {})) + + if not attachments: + print("no attachments on this message") + return 0 + + print(f"found {len(attachments)} attachment(s):") + for att in attachments: + target = args.output_dir / safe_filename(att["filename"]) + if target.exists() and target.stat().st_size == att["size"]: + print(f" skip (already present): {target}") + continue + data_resp = gmail_get( + f"/messages/{args.message_id}/attachments/{att['attachmentId']}", + access_token, + ) + raw = base64.urlsafe_b64decode(data_resp["data"]) + target.write_bytes(raw) + print(f" saved: {target} ({len(raw):,} bytes)") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) |
