aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/tests/test_gmail_fetch_attachments.py
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
committerCraig Jennings <c@cjennings.net>2026-05-08 23:23:31 -0500
commit65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch)
treea64d7a8bd466cb08e77829ca86e25a221aa64710 /.ai/scripts/tests/test_gmail_fetch_attachments.py
parent1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff)
downloadrulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz
rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail Bridge (list-unread, read, mark-read, star, unstar, trash, send, folders) mirroring the Gmail MCP workflow. Also add comprehensive tests for cmail-action, gmail-fetch-attachments, and maildir-flag-manager scripts.
Diffstat (limited to '.ai/scripts/tests/test_gmail_fetch_attachments.py')
-rw-r--r--.ai/scripts/tests/test_gmail_fetch_attachments.py420
1 files changed, 420 insertions, 0 deletions
diff --git a/.ai/scripts/tests/test_gmail_fetch_attachments.py b/.ai/scripts/tests/test_gmail_fetch_attachments.py
new file mode 100644
index 0000000..b4fba41
--- /dev/null
+++ b/.ai/scripts/tests/test_gmail_fetch_attachments.py
@@ -0,0 +1,420 @@
+"""Tests for gmail-fetch-attachments.py.
+
+Covers:
+- Pure helpers: safe_filename, collect_attachments, load_client_creds
+- File I/O: load_mcp_env, load_refresh_token (tmp_path + monkeypatch on
+ module-level constants CLAUDE_CONFIG and TOKEN_DIR)
+- HTTP wrappers: refresh_access_token, gmail_get (monkeypatch on
+ urllib.request.urlopen)
+- Argparse: --help / missing-args via subprocess
+
+Strategy mirrors test_cmail_action.py: import the script via importlib
+(filename has hyphens), mock at external boundaries, no integration
+test for main() — the components are tested individually.
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import json
+import subprocess
+import sys
+from pathlib import Path
+from unittest.mock import MagicMock
+
+import pytest
+
+SCRIPT_PATH = Path(__file__).resolve().parent.parent / "gmail-fetch-attachments.py"
+
+
+def _load_module():
+ spec = importlib.util.spec_from_file_location(
+ "gmail_fetch_attachments", str(SCRIPT_PATH)
+ )
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
+
+
+@pytest.fixture(scope="module")
+def gfa():
+ return _load_module()
+
+
+def _mock_urlopen_response(payload):
+ """Build a MagicMock mimicking urllib.request.urlopen()'s context-manager response."""
+ mock_resp = MagicMock()
+ mock_resp.read.return_value = json.dumps(payload).encode()
+ mock_resp.__enter__ = MagicMock(return_value=mock_resp)
+ mock_resp.__exit__ = MagicMock(return_value=False)
+ return mock_resp
+
+
+# ---------------------------------------------------------------------------
+# safe_filename — pure
+# ---------------------------------------------------------------------------
+
+class TestSafeFilename:
+
+ def test_normal_clean_filename(self, gfa):
+ assert gfa.safe_filename("report.pdf") == "report.pdf"
+
+ def test_boundary_forward_slash_replaced_with_underscore(self, gfa):
+ assert gfa.safe_filename("foo/bar.txt") == "foo_bar.txt"
+
+ def test_boundary_backslash_replaced_with_underscore(self, gfa):
+ assert gfa.safe_filename("foo\\bar.txt") == "foo_bar.txt"
+
+ def test_boundary_path_traversal_stripped(self, gfa):
+ # "../etc/passwd" -> after slash replace: ".._etc_passwd"
+ # While loop strips leading "..": "_etc_passwd"
+ assert gfa.safe_filename("../etc/passwd") == "_etc_passwd"
+
+ def test_boundary_dotfile_preserved(self, gfa):
+ # The fix Craig requested: single-dot prefixes survive so dotfiles
+ # like .gitignore aren't silently renamed.
+ assert gfa.safe_filename(".gitignore") == ".gitignore"
+ assert gfa.safe_filename(".env.local") == ".env.local"
+
+ def test_boundary_empty_string(self, gfa):
+ assert gfa.safe_filename("") == ""
+
+ @pytest.mark.parametrize("input_name,expected", [
+ ("..", ""), # single ".." stripped, leaves empty
+ ("...", "."), # one strip leaves a single dot
+ ("....", ""), # two strips leave empty
+ (".....", "."), # two strips leave one dot
+ ])
+ def test_boundary_only_dots(self, gfa, input_name, expected):
+ assert gfa.safe_filename(input_name) == expected
+
+ def test_boundary_double_dot_followed_by_name_stripped(self, gfa):
+ assert gfa.safe_filename("..foo") == "foo"
+
+ def test_boundary_middle_dotdot_preserved(self, gfa):
+ # Only LEADING ".." gets stripped. Mid-string ".." stays.
+ # "foo..bar" has no leading dots, so it's preserved as-is.
+ assert gfa.safe_filename("foo..bar") == "foo..bar"
+
+
+# ---------------------------------------------------------------------------
+# collect_attachments — pure
+# ---------------------------------------------------------------------------
+
+class TestCollectAttachments:
+
+ def test_normal_single_attachment(self, gfa):
+ payload = {
+ "parts": [
+ {"mimeType": "text/plain", "body": {"size": 100}},
+ {"filename": "doc.pdf", "mimeType": "application/pdf",
+ "body": {"attachmentId": "abc123", "size": 5000}},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ assert result == [{
+ "filename": "doc.pdf",
+ "attachmentId": "abc123",
+ "size": 5000,
+ "mimeType": "application/pdf",
+ }]
+
+ def test_boundary_nested_multipart_recursion(self, gfa):
+ payload = {
+ "parts": [
+ {"mimeType": "multipart/mixed", "parts": [
+ {"mimeType": "multipart/alternative", "parts": [
+ {"filename": "deep.pdf", "mimeType": "application/pdf",
+ "body": {"attachmentId": "deep1", "size": 100}},
+ ]},
+ ]},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ assert len(result) == 1
+ assert result[0]["filename"] == "deep.pdf"
+ assert result[0]["attachmentId"] == "deep1"
+
+ def test_boundary_no_attachments_returns_empty(self, gfa):
+ payload = {
+ "parts": [
+ {"mimeType": "text/plain", "body": {"size": 100}},
+ {"mimeType": "text/html", "body": {"size": 200}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_inline_image_no_filename_skipped(self, gfa):
+ # Inline images embedded via cid: typically have an attachmentId
+ # but no filename. The "user-visible attachments" heuristic skips
+ # them so they don't litter the output dir as image001.png.
+ payload = {
+ "parts": [
+ {"mimeType": "image/png",
+ "body": {"attachmentId": "inline1", "size": 500}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_empty_filename_skipped(self, gfa):
+ # Empty-string filename also skipped (truthy check).
+ payload = {
+ "parts": [
+ {"filename": "", "mimeType": "image/png",
+ "body": {"attachmentId": "empty1", "size": 500}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_filename_without_attachment_id_skipped(self, gfa):
+ # A part with a filename but no attachmentId isn't a separately
+ # downloadable attachment — it's inline content with a name.
+ payload = {
+ "parts": [
+ {"filename": "fake.txt", "mimeType": "text/plain",
+ "body": {"size": 100}},
+ ]
+ }
+ assert gfa.collect_attachments(payload) == []
+
+ def test_boundary_multiple_attachments_at_different_depths(self, gfa):
+ payload = {
+ "parts": [
+ {"filename": "top.pdf", "mimeType": "application/pdf",
+ "body": {"attachmentId": "top1", "size": 100}},
+ {"mimeType": "multipart/mixed", "parts": [
+ {"filename": "nested.txt", "mimeType": "text/plain",
+ "body": {"attachmentId": "nested1", "size": 50}},
+ ]},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ names = sorted(r["filename"] for r in result)
+ assert names == ["nested.txt", "top.pdf"]
+
+ def test_boundary_default_mimetype_when_missing(self, gfa):
+ payload = {
+ "parts": [
+ {"filename": "x.bin",
+ "body": {"attachmentId": "x1", "size": 10}},
+ ]
+ }
+ result = gfa.collect_attachments(payload)
+ assert result[0]["mimeType"] == "application/octet-stream"
+
+ def test_error_empty_payload(self, gfa):
+ assert gfa.collect_attachments({}) == []
+
+ def test_error_payload_with_null_parts(self, gfa):
+ # Defensive: parts = None falls through to empty list via `or []`.
+ payload = {"parts": None}
+ assert gfa.collect_attachments(payload) == []
+
+
+# ---------------------------------------------------------------------------
+# load_client_creds — pure
+# ---------------------------------------------------------------------------
+
+class TestLoadClientCreds:
+
+ def test_normal_both_credentials_present(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": "secret456"}
+ assert gfa.load_client_creds(env) == ("cid123", "secret456")
+
+ def test_error_missing_client_id(self, gfa):
+ env = {"GOOGLE_CLIENT_SECRET": "secret456"}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+ def test_error_missing_client_secret(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "cid123"}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+ def test_error_empty_client_id(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "", "GOOGLE_CLIENT_SECRET": "secret456"}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+ def test_error_empty_client_secret(self, gfa):
+ env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": ""}
+ with pytest.raises(SystemExit):
+ gfa.load_client_creds(env)
+
+
+# ---------------------------------------------------------------------------
+# load_mcp_env — file I/O via tmp_path + monkeypatch CLAUDE_CONFIG
+# ---------------------------------------------------------------------------
+
+class TestLoadMcpEnv:
+
+ @staticmethod
+ def _write_config(tmp_path, monkeypatch, gfa, content):
+ config_path = tmp_path / ".claude.json"
+ config_path.write_text(json.dumps(content))
+ monkeypatch.setattr(gfa, "CLAUDE_CONFIG", config_path)
+ return config_path
+
+ def test_normal_personal_profile_with_env(self, monkeypatch, gfa, tmp_path):
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {
+ "google-docs-personal": {
+ "env": {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"}
+ }
+ }
+ })
+ env = gfa.load_mcp_env("personal")
+ assert env == {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"}
+
+ def test_boundary_server_present_no_env_key(self, monkeypatch, gfa, tmp_path):
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {"google-docs-work": {}}
+ })
+ assert gfa.load_mcp_env("work") == {}
+
+ def test_boundary_env_explicitly_null(self, monkeypatch, gfa, tmp_path):
+ # The `or {}` defends against null env. Returns empty dict, not None.
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {"google-docs-personal": {"env": None}}
+ })
+ assert gfa.load_mcp_env("personal") == {}
+
+ def test_error_config_file_missing(self, monkeypatch, gfa, tmp_path):
+ monkeypatch.setattr(gfa, "CLAUDE_CONFIG", tmp_path / "nope.json")
+ with pytest.raises(SystemExit):
+ gfa.load_mcp_env("personal")
+
+ def test_error_server_not_in_config(self, monkeypatch, gfa, tmp_path):
+ self._write_config(tmp_path, monkeypatch, gfa, {
+ "mcpServers": {"google-docs-personal": {"env": {}}}
+ })
+ with pytest.raises(SystemExit):
+ gfa.load_mcp_env("work")
+
+
+# ---------------------------------------------------------------------------
+# load_refresh_token — file I/O via tmp_path + monkeypatch TOKEN_DIR
+# ---------------------------------------------------------------------------
+
+class TestLoadRefreshToken:
+
+ @staticmethod
+ def _setup_token(tmp_path, monkeypatch, gfa, profile=None, content=None):
+ token_dir = tmp_path / "google-docs-mcp"
+ token_dir.mkdir()
+ if profile:
+ (token_dir / profile).mkdir()
+ token_path = token_dir / profile / "token.json"
+ else:
+ token_path = token_dir / "token.json"
+ if content is not None:
+ token_path.write_text(json.dumps(content))
+ monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir)
+ return token_path
+
+ def test_normal_no_profile_token_at_root(self, monkeypatch, gfa, tmp_path):
+ self._setup_token(tmp_path, monkeypatch, gfa,
+ content={"refresh_token": "rt-root"})
+ assert gfa.load_refresh_token({}) == "rt-root"
+
+ def test_boundary_with_profile_subdir(self, monkeypatch, gfa, tmp_path):
+ self._setup_token(tmp_path, monkeypatch, gfa, profile="personal",
+ content={"refresh_token": "rt-personal"})
+ assert gfa.load_refresh_token(
+ {"GOOGLE_MCP_PROFILE": "personal"}
+ ) == "rt-personal"
+
+ def test_boundary_explicit_empty_profile_falls_back_to_root(
+ self, monkeypatch, gfa, tmp_path):
+ # GOOGLE_MCP_PROFILE="" is treated the same as the key being missing —
+ # both fall back to TOKEN_DIR/token.json. Pinning both shapes so a
+ # future refactor that drops `or ""` doesn't silently break this.
+ self._setup_token(tmp_path, monkeypatch, gfa,
+ content={"refresh_token": "rt-root"})
+ assert gfa.load_refresh_token({"GOOGLE_MCP_PROFILE": ""}) == "rt-root"
+
+ def test_error_token_file_missing(self, monkeypatch, gfa, tmp_path):
+ token_dir = tmp_path / "google-docs-mcp"
+ token_dir.mkdir()
+ monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir)
+ with pytest.raises(SystemExit):
+ gfa.load_refresh_token({})
+
+ def test_error_no_refresh_token_field_in_file(self, monkeypatch, gfa, tmp_path):
+ self._setup_token(tmp_path, monkeypatch, gfa,
+ content={"access_token": "at-only"})
+ with pytest.raises(SystemExit):
+ gfa.load_refresh_token({})
+
+
+# ---------------------------------------------------------------------------
+# refresh_access_token — mocked urllib
+# ---------------------------------------------------------------------------
+
+class TestRefreshAccessToken:
+
+ def test_normal_returns_access_token(self, monkeypatch, gfa):
+ mock_urlopen = MagicMock(
+ return_value=_mock_urlopen_response({"access_token": "at-new"})
+ )
+ monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
+ result = gfa.refresh_access_token("rt-val", "cid-val", "sec-val")
+ assert result == "at-new"
+ # Verify the request shape: URL, body grant_type and refresh_token.
+ req = mock_urlopen.call_args[0][0]
+ assert req.full_url == gfa.OAUTH_TOKEN_URL
+ body = req.data.decode()
+ assert "grant_type=refresh_token" in body
+ assert "refresh_token=rt-val" in body
+ assert "client_id=cid-val" in body
+
+ def test_error_response_missing_access_token(self, monkeypatch, gfa):
+ mock_urlopen = MagicMock(
+ return_value=_mock_urlopen_response({"error": "invalid_grant"})
+ )
+ monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
+ with pytest.raises(SystemExit):
+ gfa.refresh_access_token("rt", "cid", "sec")
+
+
+# ---------------------------------------------------------------------------
+# gmail_get — mocked urllib
+# ---------------------------------------------------------------------------
+
+class TestGmailGet:
+
+ def test_normal_returns_parsed_json_with_bearer_header(self, monkeypatch, gfa):
+ mock_urlopen = MagicMock(
+ return_value=_mock_urlopen_response({"id": "msg123", "snippet": "hi"})
+ )
+ monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
+ result = gfa.gmail_get("/messages/msg123", "at-token")
+ assert result == {"id": "msg123", "snippet": "hi"}
+ req = mock_urlopen.call_args[0][0]
+ assert req.full_url == f"{gfa.GMAIL_API}/messages/msg123"
+ # urllib.request.Request lowercases header names except the first
+ # char via .capitalize() → "Authorization" stays as "Authorization".
+ assert req.headers["Authorization"] == "Bearer at-token"
+
+
+# ---------------------------------------------------------------------------
+# Argparse — black-box subprocess sanity check
+# ---------------------------------------------------------------------------
+
+class TestArgparseShape:
+
+ def test_normal_help_lists_all_required_args(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH), "--help"],
+ capture_output=True, text=True,
+ )
+ assert result.returncode == 0
+ for flag in ("--profile", "--message-id", "--output-dir"):
+ assert flag in result.stdout
+
+ def test_error_no_args_exits_nonzero(self):
+ result = subprocess.run(
+ [sys.executable, str(SCRIPT_PATH)],
+ capture_output=True, text=True,
+ )
+ assert result.returncode != 0