aboutsummaryrefslogtreecommitdiff
path: root/.ai/scripts/tests/test_gmail_fetch_attachments.py
blob: b4fba417f29173f07508bdacf57b00dbf9951adc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""Tests for gmail-fetch-attachments.py.

Covers:
- Pure helpers: safe_filename, collect_attachments, load_client_creds
- File I/O: load_mcp_env, load_refresh_token (tmp_path + monkeypatch on
  module-level constants CLAUDE_CONFIG and TOKEN_DIR)
- HTTP wrappers: refresh_access_token, gmail_get (monkeypatch on
  urllib.request.urlopen)
- Argparse: --help / missing-args via subprocess

Strategy mirrors test_cmail_action.py: import the script via importlib
(filename has hyphens), mock at external boundaries, no integration
test for main() — the components are tested individually.
"""

from __future__ import annotations

import importlib.util
import json
import subprocess
import sys
from pathlib import Path
from unittest.mock import MagicMock

import pytest

SCRIPT_PATH = Path(__file__).resolve().parent.parent / "gmail-fetch-attachments.py"


def _load_module():
    spec = importlib.util.spec_from_file_location(
        "gmail_fetch_attachments", str(SCRIPT_PATH)
    )
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)
    return mod


@pytest.fixture(scope="module")
def gfa():
    return _load_module()


def _mock_urlopen_response(payload):
    """Build a MagicMock mimicking urllib.request.urlopen()'s context-manager response."""
    mock_resp = MagicMock()
    mock_resp.read.return_value = json.dumps(payload).encode()
    mock_resp.__enter__ = MagicMock(return_value=mock_resp)
    mock_resp.__exit__ = MagicMock(return_value=False)
    return mock_resp


# ---------------------------------------------------------------------------
# safe_filename — pure
# ---------------------------------------------------------------------------

class TestSafeFilename:

    def test_normal_clean_filename(self, gfa):
        assert gfa.safe_filename("report.pdf") == "report.pdf"

    def test_boundary_forward_slash_replaced_with_underscore(self, gfa):
        assert gfa.safe_filename("foo/bar.txt") == "foo_bar.txt"

    def test_boundary_backslash_replaced_with_underscore(self, gfa):
        assert gfa.safe_filename("foo\\bar.txt") == "foo_bar.txt"

    def test_boundary_path_traversal_stripped(self, gfa):
        # "../etc/passwd" -> after slash replace: ".._etc_passwd"
        # While loop strips leading "..": "_etc_passwd"
        assert gfa.safe_filename("../etc/passwd") == "_etc_passwd"

    def test_boundary_dotfile_preserved(self, gfa):
        # The fix Craig requested: single-dot prefixes survive so dotfiles
        # like .gitignore aren't silently renamed.
        assert gfa.safe_filename(".gitignore") == ".gitignore"
        assert gfa.safe_filename(".env.local") == ".env.local"

    def test_boundary_empty_string(self, gfa):
        assert gfa.safe_filename("") == ""

    @pytest.mark.parametrize("input_name,expected", [
        ("..", ""),         # single ".." stripped, leaves empty
        ("...", "."),       # one strip leaves a single dot
        ("....", ""),       # two strips leave empty
        (".....", "."),     # two strips leave one dot
    ])
    def test_boundary_only_dots(self, gfa, input_name, expected):
        assert gfa.safe_filename(input_name) == expected

    def test_boundary_double_dot_followed_by_name_stripped(self, gfa):
        assert gfa.safe_filename("..foo") == "foo"

    def test_boundary_middle_dotdot_preserved(self, gfa):
        # Only LEADING ".." gets stripped. Mid-string ".." stays.
        # "foo..bar" has no leading dots, so it's preserved as-is.
        assert gfa.safe_filename("foo..bar") == "foo..bar"


# ---------------------------------------------------------------------------
# collect_attachments — pure
# ---------------------------------------------------------------------------

class TestCollectAttachments:

    def test_normal_single_attachment(self, gfa):
        payload = {
            "parts": [
                {"mimeType": "text/plain", "body": {"size": 100}},
                {"filename": "doc.pdf", "mimeType": "application/pdf",
                 "body": {"attachmentId": "abc123", "size": 5000}},
            ]
        }
        result = gfa.collect_attachments(payload)
        assert result == [{
            "filename": "doc.pdf",
            "attachmentId": "abc123",
            "size": 5000,
            "mimeType": "application/pdf",
        }]

    def test_boundary_nested_multipart_recursion(self, gfa):
        payload = {
            "parts": [
                {"mimeType": "multipart/mixed", "parts": [
                    {"mimeType": "multipart/alternative", "parts": [
                        {"filename": "deep.pdf", "mimeType": "application/pdf",
                         "body": {"attachmentId": "deep1", "size": 100}},
                    ]},
                ]},
            ]
        }
        result = gfa.collect_attachments(payload)
        assert len(result) == 1
        assert result[0]["filename"] == "deep.pdf"
        assert result[0]["attachmentId"] == "deep1"

    def test_boundary_no_attachments_returns_empty(self, gfa):
        payload = {
            "parts": [
                {"mimeType": "text/plain", "body": {"size": 100}},
                {"mimeType": "text/html", "body": {"size": 200}},
            ]
        }
        assert gfa.collect_attachments(payload) == []

    def test_boundary_inline_image_no_filename_skipped(self, gfa):
        # Inline images embedded via cid: typically have an attachmentId
        # but no filename. The "user-visible attachments" heuristic skips
        # them so they don't litter the output dir as image001.png.
        payload = {
            "parts": [
                {"mimeType": "image/png",
                 "body": {"attachmentId": "inline1", "size": 500}},
            ]
        }
        assert gfa.collect_attachments(payload) == []

    def test_boundary_empty_filename_skipped(self, gfa):
        # Empty-string filename also skipped (truthy check).
        payload = {
            "parts": [
                {"filename": "", "mimeType": "image/png",
                 "body": {"attachmentId": "empty1", "size": 500}},
            ]
        }
        assert gfa.collect_attachments(payload) == []

    def test_boundary_filename_without_attachment_id_skipped(self, gfa):
        # A part with a filename but no attachmentId isn't a separately
        # downloadable attachment — it's inline content with a name.
        payload = {
            "parts": [
                {"filename": "fake.txt", "mimeType": "text/plain",
                 "body": {"size": 100}},
            ]
        }
        assert gfa.collect_attachments(payload) == []

    def test_boundary_multiple_attachments_at_different_depths(self, gfa):
        payload = {
            "parts": [
                {"filename": "top.pdf", "mimeType": "application/pdf",
                 "body": {"attachmentId": "top1", "size": 100}},
                {"mimeType": "multipart/mixed", "parts": [
                    {"filename": "nested.txt", "mimeType": "text/plain",
                     "body": {"attachmentId": "nested1", "size": 50}},
                ]},
            ]
        }
        result = gfa.collect_attachments(payload)
        names = sorted(r["filename"] for r in result)
        assert names == ["nested.txt", "top.pdf"]

    def test_boundary_default_mimetype_when_missing(self, gfa):
        payload = {
            "parts": [
                {"filename": "x.bin",
                 "body": {"attachmentId": "x1", "size": 10}},
            ]
        }
        result = gfa.collect_attachments(payload)
        assert result[0]["mimeType"] == "application/octet-stream"

    def test_error_empty_payload(self, gfa):
        assert gfa.collect_attachments({}) == []

    def test_error_payload_with_null_parts(self, gfa):
        # Defensive: parts = None falls through to empty list via `or []`.
        payload = {"parts": None}
        assert gfa.collect_attachments(payload) == []


# ---------------------------------------------------------------------------
# load_client_creds — pure
# ---------------------------------------------------------------------------

class TestLoadClientCreds:

    def test_normal_both_credentials_present(self, gfa):
        env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": "secret456"}
        assert gfa.load_client_creds(env) == ("cid123", "secret456")

    def test_error_missing_client_id(self, gfa):
        env = {"GOOGLE_CLIENT_SECRET": "secret456"}
        with pytest.raises(SystemExit):
            gfa.load_client_creds(env)

    def test_error_missing_client_secret(self, gfa):
        env = {"GOOGLE_CLIENT_ID": "cid123"}
        with pytest.raises(SystemExit):
            gfa.load_client_creds(env)

    def test_error_empty_client_id(self, gfa):
        env = {"GOOGLE_CLIENT_ID": "", "GOOGLE_CLIENT_SECRET": "secret456"}
        with pytest.raises(SystemExit):
            gfa.load_client_creds(env)

    def test_error_empty_client_secret(self, gfa):
        env = {"GOOGLE_CLIENT_ID": "cid123", "GOOGLE_CLIENT_SECRET": ""}
        with pytest.raises(SystemExit):
            gfa.load_client_creds(env)


# ---------------------------------------------------------------------------
# load_mcp_env — file I/O via tmp_path + monkeypatch CLAUDE_CONFIG
# ---------------------------------------------------------------------------

class TestLoadMcpEnv:

    @staticmethod
    def _write_config(tmp_path, monkeypatch, gfa, content):
        config_path = tmp_path / ".claude.json"
        config_path.write_text(json.dumps(content))
        monkeypatch.setattr(gfa, "CLAUDE_CONFIG", config_path)
        return config_path

    def test_normal_personal_profile_with_env(self, monkeypatch, gfa, tmp_path):
        self._write_config(tmp_path, monkeypatch, gfa, {
            "mcpServers": {
                "google-docs-personal": {
                    "env": {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"}
                }
            }
        })
        env = gfa.load_mcp_env("personal")
        assert env == {"GOOGLE_CLIENT_ID": "cid", "GOOGLE_CLIENT_SECRET": "sec"}

    def test_boundary_server_present_no_env_key(self, monkeypatch, gfa, tmp_path):
        self._write_config(tmp_path, monkeypatch, gfa, {
            "mcpServers": {"google-docs-work": {}}
        })
        assert gfa.load_mcp_env("work") == {}

    def test_boundary_env_explicitly_null(self, monkeypatch, gfa, tmp_path):
        # The `or {}` defends against null env. Returns empty dict, not None.
        self._write_config(tmp_path, monkeypatch, gfa, {
            "mcpServers": {"google-docs-personal": {"env": None}}
        })
        assert gfa.load_mcp_env("personal") == {}

    def test_error_config_file_missing(self, monkeypatch, gfa, tmp_path):
        monkeypatch.setattr(gfa, "CLAUDE_CONFIG", tmp_path / "nope.json")
        with pytest.raises(SystemExit):
            gfa.load_mcp_env("personal")

    def test_error_server_not_in_config(self, monkeypatch, gfa, tmp_path):
        self._write_config(tmp_path, monkeypatch, gfa, {
            "mcpServers": {"google-docs-personal": {"env": {}}}
        })
        with pytest.raises(SystemExit):
            gfa.load_mcp_env("work")


# ---------------------------------------------------------------------------
# load_refresh_token — file I/O via tmp_path + monkeypatch TOKEN_DIR
# ---------------------------------------------------------------------------

class TestLoadRefreshToken:

    @staticmethod
    def _setup_token(tmp_path, monkeypatch, gfa, profile=None, content=None):
        token_dir = tmp_path / "google-docs-mcp"
        token_dir.mkdir()
        if profile:
            (token_dir / profile).mkdir()
            token_path = token_dir / profile / "token.json"
        else:
            token_path = token_dir / "token.json"
        if content is not None:
            token_path.write_text(json.dumps(content))
        monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir)
        return token_path

    def test_normal_no_profile_token_at_root(self, monkeypatch, gfa, tmp_path):
        self._setup_token(tmp_path, monkeypatch, gfa,
                          content={"refresh_token": "rt-root"})
        assert gfa.load_refresh_token({}) == "rt-root"

    def test_boundary_with_profile_subdir(self, monkeypatch, gfa, tmp_path):
        self._setup_token(tmp_path, monkeypatch, gfa, profile="personal",
                          content={"refresh_token": "rt-personal"})
        assert gfa.load_refresh_token(
            {"GOOGLE_MCP_PROFILE": "personal"}
        ) == "rt-personal"

    def test_boundary_explicit_empty_profile_falls_back_to_root(
            self, monkeypatch, gfa, tmp_path):
        # GOOGLE_MCP_PROFILE="" is treated the same as the key being missing —
        # both fall back to TOKEN_DIR/token.json. Pinning both shapes so a
        # future refactor that drops `or ""` doesn't silently break this.
        self._setup_token(tmp_path, monkeypatch, gfa,
                          content={"refresh_token": "rt-root"})
        assert gfa.load_refresh_token({"GOOGLE_MCP_PROFILE": ""}) == "rt-root"

    def test_error_token_file_missing(self, monkeypatch, gfa, tmp_path):
        token_dir = tmp_path / "google-docs-mcp"
        token_dir.mkdir()
        monkeypatch.setattr(gfa, "TOKEN_DIR", token_dir)
        with pytest.raises(SystemExit):
            gfa.load_refresh_token({})

    def test_error_no_refresh_token_field_in_file(self, monkeypatch, gfa, tmp_path):
        self._setup_token(tmp_path, monkeypatch, gfa,
                          content={"access_token": "at-only"})
        with pytest.raises(SystemExit):
            gfa.load_refresh_token({})


# ---------------------------------------------------------------------------
# refresh_access_token — mocked urllib
# ---------------------------------------------------------------------------

class TestRefreshAccessToken:

    def test_normal_returns_access_token(self, monkeypatch, gfa):
        mock_urlopen = MagicMock(
            return_value=_mock_urlopen_response({"access_token": "at-new"})
        )
        monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
        result = gfa.refresh_access_token("rt-val", "cid-val", "sec-val")
        assert result == "at-new"
        # Verify the request shape: URL, body grant_type and refresh_token.
        req = mock_urlopen.call_args[0][0]
        assert req.full_url == gfa.OAUTH_TOKEN_URL
        body = req.data.decode()
        assert "grant_type=refresh_token" in body
        assert "refresh_token=rt-val" in body
        assert "client_id=cid-val" in body

    def test_error_response_missing_access_token(self, monkeypatch, gfa):
        mock_urlopen = MagicMock(
            return_value=_mock_urlopen_response({"error": "invalid_grant"})
        )
        monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
        with pytest.raises(SystemExit):
            gfa.refresh_access_token("rt", "cid", "sec")


# ---------------------------------------------------------------------------
# gmail_get — mocked urllib
# ---------------------------------------------------------------------------

class TestGmailGet:

    def test_normal_returns_parsed_json_with_bearer_header(self, monkeypatch, gfa):
        mock_urlopen = MagicMock(
            return_value=_mock_urlopen_response({"id": "msg123", "snippet": "hi"})
        )
        monkeypatch.setattr(gfa.urllib.request, "urlopen", mock_urlopen)
        result = gfa.gmail_get("/messages/msg123", "at-token")
        assert result == {"id": "msg123", "snippet": "hi"}
        req = mock_urlopen.call_args[0][0]
        assert req.full_url == f"{gfa.GMAIL_API}/messages/msg123"
        # urllib.request.Request lowercases header names except the first
        # char via .capitalize() → "Authorization" stays as "Authorization".
        assert req.headers["Authorization"] == "Bearer at-token"


# ---------------------------------------------------------------------------
# Argparse — black-box subprocess sanity check
# ---------------------------------------------------------------------------

class TestArgparseShape:

    def test_normal_help_lists_all_required_args(self):
        result = subprocess.run(
            [sys.executable, str(SCRIPT_PATH), "--help"],
            capture_output=True, text=True,
        )
        assert result.returncode == 0
        for flag in ("--profile", "--message-id", "--output-dir"):
            assert flag in result.stdout

    def test_error_no_args_exits_nonzero(self):
        result = subprocess.run(
            [sys.executable, str(SCRIPT_PATH)],
            capture_output=True, text=True,
        )
        assert result.returncode != 0