#!/usr/bin/env python3 """Fetch Gmail message attachments via the same OAuth identity the google-docs-mcp servers use. Usage: gmail-fetch-attachments.py --profile {personal,work} \ --message-id --output-dir Reuses: - Refresh token at ~/.config/google-docs-mcp/[/]token.json (the subdir is only present when GOOGLE_MCP_PROFILE is set on the mcpServers entry; otherwise the cache lives at the directory root) - Client ID + secret from ~/.claude.json's mcpServers["google-docs-"].env Stdlib only. Saves each non-inline attachment using its original filename. Skips attachments that already exist in --output-dir (size-matched). """ from __future__ import annotations import argparse import base64 import json import sys import urllib.parse import urllib.request from pathlib import Path OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token" GMAIL_API = "https://gmail.googleapis.com/gmail/v1/users/me" TOKEN_DIR = Path.home() / ".config" / "google-docs-mcp" CLAUDE_CONFIG = Path.home() / ".claude.json" def load_mcp_env(profile: str) -> dict: if not CLAUDE_CONFIG.exists(): sys.exit(f"claude config missing: {CLAUDE_CONFIG}") config = json.loads(CLAUDE_CONFIG.read_text()) server_name = f"google-docs-{profile}" servers = config.get("mcpServers", {}) if server_name not in servers: sys.exit(f"mcpServers.{server_name} not found in {CLAUDE_CONFIG}") return servers[server_name].get("env", {}) or {} def load_refresh_token(env: dict) -> str: # The MCP server keys its token cache by GOOGLE_MCP_PROFILE. When the # var is unset on the mcpServers entry, the cache lives at the root # (TOKEN_DIR/token.json), not under a / subdirectory. mcp_profile = env.get("GOOGLE_MCP_PROFILE") or "" path = TOKEN_DIR / mcp_profile / "token.json" if mcp_profile else TOKEN_DIR / "token.json" if not path.exists(): sys.exit(f"token cache missing: {path}") data = json.loads(path.read_text()) if "refresh_token" not in data: sys.exit(f"no refresh_token in {path}") return data["refresh_token"] def load_client_creds(env: dict) -> tuple[str, str]: cid = env.get("GOOGLE_CLIENT_ID") secret = env.get("GOOGLE_CLIENT_SECRET") if not cid or not secret: sys.exit("GOOGLE_CLIENT_ID/SECRET missing in MCP env") return cid, secret def refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> str: body = urllib.parse.urlencode( { "client_id": client_id, "client_secret": client_secret, "refresh_token": refresh_token, "grant_type": "refresh_token", } ).encode() req = urllib.request.Request( OAUTH_TOKEN_URL, data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) with urllib.request.urlopen(req, timeout=30) as resp: payload = json.loads(resp.read()) if "access_token" not in payload: sys.exit(f"refresh failed: {payload}") return payload["access_token"] def gmail_get(path: str, access_token: str) -> dict: req = urllib.request.Request( f"{GMAIL_API}{path}", headers={"Authorization": f"Bearer {access_token}"}, ) with urllib.request.urlopen(req, timeout=60) as resp: return json.loads(resp.read()) def collect_attachments(payload: dict) -> list[dict]: """Walk the MIME tree and collect parts that have an attachmentId. Returns list of {filename, attachmentId, size, mimeType}. Skips parts without a filename (inline images, etc.). """ results: list[dict] = [] def walk(part: dict) -> None: body = part.get("body", {}) or {} filename = part.get("filename") or "" if filename and "attachmentId" in body: results.append( { "filename": filename, "attachmentId": body["attachmentId"], "size": body.get("size", 0), "mimeType": part.get("mimeType", "application/octet-stream"), } ) for sub in part.get("parts", []) or []: walk(sub) walk(payload) return results def safe_filename(name: str) -> str: """Strip path separators and leading parent-dir markers (..). Path separators become underscores so the filename can't escape the output directory. Leading ".." sequences are stripped so an attachment named "../foo" lands as "_foo" rather than ".._foo". Single leading dots are preserved so dotfiles like ".gitignore" survive intact. """ cleaned = name.replace("/", "_").replace("\\", "_") while cleaned.startswith(".."): cleaned = cleaned[2:] return cleaned def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--profile", choices=["personal", "work"], required=True) ap.add_argument("--message-id", required=True) ap.add_argument("--output-dir", required=True, type=Path) args = ap.parse_args() args.output_dir.mkdir(parents=True, exist_ok=True) env = load_mcp_env(args.profile) refresh_token = load_refresh_token(env) client_id, client_secret = load_client_creds(env) access_token = refresh_access_token(refresh_token, client_id, client_secret) msg = gmail_get( f"/messages/{args.message_id}?format=full", access_token ) attachments = collect_attachments(msg.get("payload", {})) if not attachments: print("no attachments on this message") return 0 print(f"found {len(attachments)} attachment(s):") for att in attachments: target = args.output_dir / safe_filename(att["filename"]) if target.exists() and target.stat().st_size == att["size"]: print(f" skip (already present): {target}") continue data_resp = gmail_get( f"/messages/{args.message_id}/attachments/{att['attachmentId']}", access_token, ) raw = base64.urlsafe_b64decode(data_resp["data"]) target.write_bytes(raw) print(f" saved: {target} ({len(raw):,} bytes)") return 0 if __name__ == "__main__": sys.exit(main())