diff options
| author | Craig Jennings <c@cjennings.net> | 2026-05-08 23:23:31 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2026-05-08 23:23:31 -0500 |
| commit | 65c962e01f0ec7cefe70157f4f11f54638cdc995 (patch) | |
| tree | a64d7a8bd466cb08e77829ca86e25a221aa64710 /.ai/scripts/tests/test_gmail_fetch_attachments.py | |
| parent | 1abae87acaba85453ef9b7e1eafe0d6e8e22c4e5 (diff) | |
| download | rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.tar.gz rulesets-65c962e01f0ec7cefe70157f4f11f54638cdc995.zip | |
feat: Add cmail IMAP action script and test suite
Add cmail-action.py for IMAP triage operations against Proton Mail
Bridge (list-unread, read, mark-read, star, unstar, trash, send,
folders) mirroring the Gmail MCP workflow. Also add comprehensive
tests for cmail-action, gmail-fetch-attachments, and
maildir-flag-manager scripts.
Diffstat (limited to '.ai/scripts/tests/test_gmail_fetch_attachments.py')
| -rw-r--r-- | .ai/scripts/tests/test_gmail_fetch_attachments.py | 420 |
1 files changed, 420 insertions, 0 deletions
diff --git a/.ai/scripts/tests/test_gmail_fetch_attachments.py b/.ai/scripts/tests/test_gmail_fetch_attachments.py new file mode 100644 index 0000000..b4fba41 --- /dev/null +++ b/.ai/scripts/tests/test_gmail_fetch_attachments.py @@ -0,0 +1,420 @@ +"""Tests for gmail-fetch-attachments.py. + +Covers: +- Pure helpers: safe_filename, collect_attachments, load_client_creds +- File I/O: load_mcp_env, load_refresh_token (tmp_path + monkeypatch on + module-level constants CLAUDE_CONFIG and TOKEN_DIR) +- HTTP wrappers: refresh_access_token, gmail_get (monkeypatch on + urllib.request.urlopen) +- Argparse: --help / missing-args via subprocess + +Strategy mirrors test_cmail_action.py: import the script via importlib +(filename has hyphens), mock at external boundaries, no integration +test for main() — the components are tested individually. +""" + +from __future__ import annotations + +import importlib.util +import json +import subprocess +import sys +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +SCRIPT_PATH = Path(__file__).resolve().parent.parent / "gmail-fetch-attachments.py" + + +def _load_module(): + spec = importlib.util.spec_from_file_location( + "gmail_fetch_attachments", str(SCRIPT_PATH) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def gfa(): + return _load_module() + + +def _mock_urlopen_response(payload): + """Build a MagicMock mimicking urllib.request.urlopen()'s context-manager response.""" + mock_resp = MagicMock() + mock_resp.read.return_value = json.dumps(payload).encode() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + return mock_resp + + +# --------------------------------------------------------------------------- +# safe_filename — pure +# --------------------------------------------------------------------------- + +class TestSafeFilename: + + def test_normal_clean_filename(self, gfa): + assert gfa.safe_filename("report.pdf") == "report.pdf" + + def test_boundary_forward_slash_replaced_with_underscore(self, gfa): + assert gfa.safe_filename("foo/bar.txt") == "foo_bar.txt" + + def test_boundary_backslash_replaced_with_underscore(self, gfa): + assert gfa.safe_filename("foo\\bar.txt") == "foo_bar.txt" + + def test_boundary_path_traversal_stripped(self, gfa): + # "../etc/passwd" -> after slash replace: ".._etc_passwd" + # While loop strips leading "..": "_etc_passwd" + assert gfa.safe_filename("../etc/passwd") == "_etc_passwd" + + def test_boundary_dotfile_preserved(self, gfa): + # The fix Craig requested: single-dot prefixes survive so dotfiles + # like .gitignore aren't silently renamed. + assert gfa.safe_filename(".gitignore") == ".gitignore" + assert gfa.safe_filename(".env.local") == ".env.local" + + def test_boundary_empty_string(self, gfa): + assert gfa.safe_filename("") == "" + + @pytest.mark.parametrize("input_name,expected", [ + ("..", ""), # single ".." stripped, leaves empty + ("...", "."), # one strip leaves a single dot + ("....", ""), # two strips leave empty + (".....", "."), # two strips leave one dot + ]) + def test_boundary_only_dots(self, gfa, input_name, expected): + assert gfa.safe_filename(input_name) == expected + + def test_boundary_double_dot_followed_by_name_stripped(self, gfa): + assert gfa.safe_filename("..foo") == "foo" + + def test_boundary_middle_dotdot_preserved(self, gfa): + # Only LEADING ".." gets stripped. Mid-string ".." stays. + # "foo..bar" has no leading dots, so it's preserved as-is. + assert gfa.safe_filename("foo..bar") == "foo..bar" + + +# --------------------------------------------------------------------------- +# collect_attachments — pure +# --------------------------------------------------------------------------- + +class TestCollectAttachments: + + def test_normal_single_attachment(self, gfa): + payload = { + "parts": [ + {"mimeType": "text/plain", "body": {"size": 100}}, + {"filename": "doc.pdf", "mimeType": "application/pdf", + "body": {"attachmentId": "abc123", "size": 5000}}, + ] + } + result = gfa.collect_attachments(payload) + assert result == [{ + "filename": "doc.pdf", + "attachmentId": "abc123", + "size": 5000, + "mimeType": "application/pdf", + }] + + def test_boundary_nested_multipart_recursion(self, gfa): + payload = { + "parts": [ + {"mimeType": "multipart/mixed", "parts": [ + {"mimeType": "multipart/alternative", "parts": [ + {"filename": "deep.pdf", "mimeType": "application/pdf", + "body": {"attachmentId": "deep1", "size": 100}}, + ]}, + ]}, + ] + } + result = gfa.collect_attachments(payload) + assert len(result) == 1 + assert result[0]["filename"] == "deep.pdf" + assert result[0]["attachmentId"] == "deep1" + + def test_boundary_no_attachments_returns_empty(self, gfa): + payload = { + "parts": [ + {"mimeType": "text/plain", "body": {"size": 100}}, + {"mimeType": "text/html", "body": {"size": 200}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_inline_image_no_filename_skipped(self, gfa): + # Inline images embedded via cid: typically have an attachmentId + # but no filename. The "user-visible attachments" heuristic skips + # them so they don't litter the output dir as image001.png. + payload = { + "parts": [ + {"mimeType": "image/png", + "body": {"attachmentId": "inline1", "size": 500}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_empty_filename_skipped(self, gfa): + # Empty-string filename also skipped (truthy check). + payload = { + "parts": [ + {"filename": "", "mimeType": "image/png", + "body": {"attachmentId": "empty1", "size": 500}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_filename_without_attachment_id_skipped(self, gfa): + # A part with a filename but no attachmentId isn't a separately + # downloadable attachment — it's inline content with a name. + payload = { + "parts": [ + {"filename": "fake.txt", "mimeType": "text/plain", + "body": {"size": 100}}, + ] + } + assert gfa.collect_attachments(payload) == [] + + def test_boundary_multiple_attachments_at_different_depths(self, gfa): + payload = { + "parts": [ + {"filename": "top.pdf", "mimeType": "application/pdf", + "body": {"attachmentId": "top1", "size": 100}}, + {"mimeType": "multipart/mixed", "parts": [ + {"filename": "nested.txt", "mimeType": "text/plain", + "body": {"attachmentId": "nested1", "size": 50}}, + ]}, + ] + } + result = gfa.collect_attachments(payload) + names = sorted(r["filename"] for r in result) + assert names == ["nested.txt", "top.pdf"] + + def test_boundary_default_mimetype_when_missing(self, gfa): + payload = { + "parts": [ + {"filename": "x.bin", + "body": {"attachmentId": "x1", "size": 10}}, + ] + } + result = gfa.collect_attachments(payload) + assert result[0]["mimeType"] == "application/octet-stream" + + def test_error_empty_payload(self, gfa): + assert gfa.collect_attachments({}) == [] + + def test_error_payload_with_null_parts(self, gfa): + # Defensive: parts = None falls through to empty list via `or []`. + payload = {"parts": None} + assert gfa.collect_attachments(payload) == [] + + +# --------------------------------------------------------------------------- +# load_client_creds — pure +# --------------------------------------------------------------------------- + +class TestLoadClientCreds: + + def test_normal_both_credentials_present(self, gfa): + env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": "secret456"} + assert gfa.load_client_creds(env) == ("cid123", "secret456") + + def test_error_missing_client_id(self, gfa): + env = {"GOOGLE_CLIENT_SECRET": "secret456"} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + def test_error_missing_client_secret(self, gfa): + env = {"GOOGLE_CLIENT_ID": "cid123"} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + def test_error_empty_client_id(self, gfa): + env = {"GOOGLE_CLIENT_ID": "", "GOOGLE_CLIENT_SECRET": "secret456"} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + def test_error_empty_client_secret(self, gfa): + env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": ""} + with pytest.raises(SystemExit): + gfa.load_client_creds(env) + + +# --------------------------------------------------------------------------- +# load_mcp_env — file I/O via tmp_path + monkeypatch CLAUDE_CONFIG +# --------------------------------------------------------------------------- + +class TestLoadMcpEnv: + + @staticmethod + def _write_config(tmp_path, monkeypatch, gfa, content): + config_path = tmp_path / ".claude.json" + config_path.write_text(json.dumps(content)) + monkeypatch.setattr(gfa, "CLAUDE_CONFIG", config_path) + return config_path + + def test_normal_personal_profile_with_env(self, monkeypatch, gfa, tmp_path): + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": { + "google-docs-personal": { + "env": {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"} + } + } + }) + env = gfa.load_mcp_env("personal") + assert env == {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"} + + def test_boundary_server_present_no_env_key(self, monkeypatch, gfa, tmp_path): + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": {"google-docs-work": {}} + }) + assert gfa.load_mcp_env("work") == {} + + def test_boundary_env_explicitly_null(self, monkeypatch, gfa, tmp_path): + # The `or {}` defends against null env. Returns empty dict, not None. + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": {"google-docs-personal": {"env": None}} + }) + assert gfa.load_mcp_env("personal") == {} + + def test_error_config_file_missing(self, monkeypatch, gfa, tmp_path): + monkeypatch.setattr(gfa, "CLAUDE_CONFIG", tmp_path / "nope.json") + with pytest.raises(SystemExit): + gfa.load_mcp_env("personal") + + def test_error_server_not_in_config(self, monkeypatch, gfa, tmp_path): + self._write_config(tmp_path, monkeypatch, gfa, { + "mcpServers": {"google-docs-personal": {"env": {}}} + }) + with pytest.raises(SystemExit): + gfa.load_mcp_env("work") + + +# --------------------------------------------------------------------------- +# load_refresh_token — file I/O via tmp_path + monkeypatch TOKEN_DIR +# --------------------------------------------------------------------------- + +class TestLoadRefreshToken: + + @staticmethod + def _setup_token(tmp_path, monkeypatch, gfa, profile=None, content=None): + token_dir = tmp_path / "google-docs-mcp" + token_dir.mkdir() + if profile: + (token_dir / profile).mkdir() + token_path = token_dir / profile / "token.json" + else: + token_path = token_dir / "token.json" + if content is not None: + token_path.write_text(json.dumps(content)) + monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir) + return token_path + + def test_normal_no_profile_token_at_root(self, monkeypatch, gfa, tmp_path): + self._setup_token(tmp_path, monkeypatch, gfa, + content={"refresh_token": "rt-root"}) + assert gfa.load_refresh_token({}) == "rt-root" + + def test_boundary_with_profile_subdir(self, monkeypatch, gfa, tmp_path): + self._setup_token(tmp_path, monkeypatch, gfa, profile="personal", + content={"refresh_token": "rt-personal"}) + assert gfa.load_refresh_token( + {"GOOGLE_MCP_PROFILE": "personal"} + ) == "rt-personal" + + def test_boundary_explicit_empty_profile_falls_back_to_root( + self, monkeypatch, gfa, tmp_path): + # GOOGLE_MCP_PROFILE="" is treated the same as the key being missing — + # both fall back to TOKEN_DIR/token.json. Pinning both shapes so a + # future refactor that drops `or ""` doesn't silently break this. + self._setup_token(tmp_path, monkeypatch, gfa, + content={"refresh_token": "rt-root"}) + assert gfa.load_refresh_token({"GOOGLE_MCP_PROFILE": ""}) == "rt-root" + + def test_error_token_file_missing(self, monkeypatch, gfa, tmp_path): + token_dir = tmp_path / "google-docs-mcp" + token_dir.mkdir() + monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir) + with pytest.raises(SystemExit): + gfa.load_refresh_token({}) + + def test_error_no_refresh_token_field_in_file(self, monkeypatch, gfa, tmp_path): + self._setup_token(tmp_path, monkeypatch, gfa, + content={"access_token": "at-only"}) + with pytest.raises(SystemExit): + gfa.load_refresh_token({}) + + +# --------------------------------------------------------------------------- +# refresh_access_token — mocked urllib +# --------------------------------------------------------------------------- + +class TestRefreshAccessToken: + + def test_normal_returns_access_token(self, monkeypatch, gfa): + mock_urlopen = MagicMock( + return_value=_mock_urlopen_response({"access_token": "at-new"}) + ) + monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen) + result = gfa.refresh_access_token("rt-val", "cid-val", "sec-val") + assert result == "at-new" + # Verify the request shape: URL, body grant_type and refresh_token. + req = mock_urlopen.call_args[0][0] + assert req.full_url == gfa.OAUTH_TOKEN_URL + body = req.data.decode() + assert "grant_type=refresh_token" in body + assert "refresh_token=rt-val" in body + assert "client_id=cid-val" in body + + def test_error_response_missing_access_token(self, monkeypatch, gfa): + mock_urlopen = MagicMock( + return_value=_mock_urlopen_response({"error": "invalid_grant"}) + ) + monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen) + with pytest.raises(SystemExit): + gfa.refresh_access_token("rt", "cid", "sec") + + +# --------------------------------------------------------------------------- +# gmail_get — mocked urllib +# --------------------------------------------------------------------------- + +class TestGmailGet: + + def test_normal_returns_parsed_json_with_bearer_header(self, monkeypatch, gfa): + mock_urlopen = MagicMock( + return_value=_mock_urlopen_response({"id": "msg123", "snippet": "hi"}) + ) + monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen) + result = gfa.gmail_get("/messages/msg123", "at-token") + assert result == {"id": "msg123", "snippet": "hi"} + req = mock_urlopen.call_args[0][0] + assert req.full_url == f"{gfa.GMAIL_API}/messages/msg123" + # urllib.request.Request lowercases header names except the first + # char via .capitalize() → "Authorization" stays as "Authorization". + assert req.headers["Authorization"] == "Bearer at-token" + + +# --------------------------------------------------------------------------- +# Argparse — black-box subprocess sanity check +# --------------------------------------------------------------------------- + +class TestArgparseShape: + + def test_normal_help_lists_all_required_args(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH), "--help"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + for flag in ("--profile", "--message-id", "--output-dir"): + assert flag in result.stdout + + def test_error_no_args_exits_nonzero(self): + result = subprocess.run( + [sys.executable, str(SCRIPT_PATH)], + capture_output=True, text=True, + ) + assert result.returncode != 0 |
