aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/gmail-fetch-attachments.py
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-06 21:59:52 -0500
committerCraig Jennings <c@cjennings.net>2026-05-06 21:59:52 -0500
commitd81b23ad6b6e437dfe3c338a00a4be39bc555146 (patch)
tree2d4b0d7890fd1fc70d81282b81fed2808c28a106 /.ai/scripts/gmail-fetch-attachments.py
parent201377f57430ef28d02e703a2191434bbee55c75 (diff)
downloadrulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.tar.gz
rulesets-d81b23ad6b6e437dfe3c338a00a4be39bc555146.zip
chore(ai): initialize project notes and Claude tooling surfaces
Replace the seed notes.org with project-specific context (layout, install modes, task tracker location, recent inflection point). Bring in the synced template surfaces (protocols, workflows, scripts, references, retrospectives, someday-maybe) as tracked content for this content/documentation project.
Diffstat (limited to '.ai/scripts/gmail-fetch-attachments.py')
-rwxr-xr-x.ai/scripts/gmail-fetch-attachments.py171
1 files changed, 171 insertions, 0 deletions
diff --git a/.ai/scripts/gmail-fetch-attachments.py b/.ai/scripts/gmail-fetch-attachments.py
new file mode 100755
index 0000000..8aa2789
--- /dev/null
+++ b/.ai/scripts/gmail-fetch-attachments.py
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+"""Fetch Gmail message attachments via the same OAuth identity the
+google-docs-mcp servers use.
+
+Usage:
+ gmail-fetch-attachments.py --profile {personal,work} \
+ --message-id <ID> --output-dir <PATH>
+
+Reuses:
+ - Refresh token at ~/.config/google-docs-mcp/[<GOOGLE_MCP_PROFILE>/]token.json
+ (the subdir is only present when GOOGLE_MCP_PROFILE is set on the
+ mcpServers entry; otherwise the cache lives at the directory root)
+ - Client ID + secret from ~/.claude.json's
+ mcpServers["google-docs-<profile>"].env
+
+Stdlib only. Saves each non-inline attachment using its original filename.
+Skips attachments that already exist in --output-dir (size-matched).
+"""
+from __future__ import annotations
+
+import argparse
+import base64
+import json
+import sys
+import urllib.parse
+import urllib.request
+from pathlib import Path
+
+OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"
+GMAIL_API = "https://gmail.googleapis.com/gmail/v1/users/me"
+TOKEN_DIR = Path.home() / ".config" / "google-docs-mcp"
+CLAUDE_CONFIG = Path.home() / ".claude.json"
+
+
+def load_mcp_env(profile: str) -> dict:
+ if not CLAUDE_CONFIG.exists():
+ sys.exit(f"claude config missing: {CLAUDE_CONFIG}")
+ config = json.loads(CLAUDE_CONFIG.read_text())
+ server_name = f"google-docs-{profile}"
+ servers = config.get("mcpServers", {})
+ if server_name not in servers:
+ sys.exit(f"mcpServers.{server_name} not found in {CLAUDE_CONFIG}")
+ return servers[server_name].get("env", {}) or {}
+
+
+def load_refresh_token(env: dict) -> str:
+ # The MCP server keys its token cache by GOOGLE_MCP_PROFILE. When the
+ # var is unset on the mcpServers entry, the cache lives at the root
+ # (TOKEN_DIR/token.json), not under a <profile>/ subdirectory.
+ mcp_profile = env.get("GOOGLE_MCP_PROFILE") or ""
+ path = TOKEN_DIR / mcp_profile / "token.json" if mcp_profile else TOKEN_DIR / "token.json"
+ if not path.exists():
+ sys.exit(f"token cache missing: {path}")
+ data = json.loads(path.read_text())
+ if "refresh_token" not in data:
+ sys.exit(f"no refresh_token in {path}")
+ return data["refresh_token"]
+
+
+def load_client_creds(env: dict) -> tuple[str, str]:
+ cid = env.get("GOOGLE_CLIENT_ID")
+ secret = env.get("GOOGLE_CLIENT_SECRET")
+ if not cid or not secret:
+ sys.exit("GOOGLE_CLIENT_ID/SECRET missing in MCP env")
+ return cid, secret
+
+
+def refresh_access_token(refresh_token: str, client_id: str, client_secret: str) -> str:
+ body = urllib.parse.urlencode(
+ {
+ "client_id": client_id,
+ "client_secret": client_secret,
+ "refresh_token": refresh_token,
+ "grant_type": "refresh_token",
+ }
+ ).encode()
+ req = urllib.request.Request(
+ OAUTH_TOKEN_URL,
+ data=body,
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ )
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ payload = json.loads(resp.read())
+ if "access_token" not in payload:
+ sys.exit(f"refresh failed: {payload}")
+ return payload["access_token"]
+
+
+def gmail_get(path: str, access_token: str) -> dict:
+ req = urllib.request.Request(
+ f"{GMAIL_API}{path}",
+ headers={"Authorization": f"Bearer {access_token}"},
+ )
+ with urllib.request.urlopen(req, timeout=60) as resp:
+ return json.loads(resp.read())
+
+
+def collect_attachments(payload: dict) -> list[dict]:
+ """Walk the MIME tree and collect parts that have an attachmentId.
+
+ Returns list of {filename, attachmentId, size, mimeType}.
+ Skips parts without a filename (inline images, etc.).
+ """
+ results: list[dict] = []
+
+ def walk(part: dict) -> None:
+ body = part.get("body", {}) or {}
+ filename = part.get("filename") or ""
+ if filename and "attachmentId" in body:
+ results.append(
+ {
+ "filename": filename,
+ "attachmentId": body["attachmentId"],
+ "size": body.get("size", 0),
+ "mimeType": part.get("mimeType", "application/octet-stream"),
+ }
+ )
+ for sub in part.get("parts", []) or []:
+ walk(sub)
+
+ walk(payload)
+ return results
+
+
+def safe_filename(name: str) -> str:
+ """Strip path separators. Preserve everything else."""
+ return name.replace("/", "_").replace("\\", "_").lstrip(".")
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description=__doc__)
+ ap.add_argument("--profile", choices=["personal", "work"], required=True)
+ ap.add_argument("--message-id", required=True)
+ ap.add_argument("--output-dir", required=True, type=Path)
+ args = ap.parse_args()
+
+ args.output_dir.mkdir(parents=True, exist_ok=True)
+
+ env = load_mcp_env(args.profile)
+ refresh_token = load_refresh_token(env)
+ client_id, client_secret = load_client_creds(env)
+ access_token = refresh_access_token(refresh_token, client_id, client_secret)
+
+ msg = gmail_get(
+ f"/messages/{args.message_id}?format=full", access_token
+ )
+ attachments = collect_attachments(msg.get("payload", {}))
+
+ if not attachments:
+ print("no attachments on this message")
+ return 0
+
+ print(f"found {len(attachments)} attachment(s):")
+ for att in attachments:
+ target = args.output_dir / safe_filename(att["filename"])
+ if target.exists() and target.stat().st_size == att["size"]:
+ print(f" skip (already present): {target}")
+ continue
+ data_resp = gmail_get(
+ f"/messages/{args.message_id}/attachments/{att['attachmentId']}",
+ access_token,
+ )
+ raw = base64.urlsafe_b64decode(data_resp["data"])
+ target.write_bytes(raw)
+ print(f" saved: {target} ({len(raw):,} bytes)")
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())