1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
|
#!/usr/bin/env python3
"""Manage maildir flags (read, starred) across email accounts.
Uses atomic os.rename() for flag operations directly on maildir files.
Safer and more reliable than shell-based approaches (zsh loses PATH in
while-read loops, piped mu move silently fails).
Supports the same flag semantics as mu4e: maildir files in new/ are moved
to cur/ when the Seen flag is added, and flag changes are persisted to the
filesystem so mbsync picks them up on the next sync.
Usage:
# Mark all unread INBOX emails as read
maildir-flag-manager.py mark-read
# Mark specific emails as read (by path)
maildir-flag-manager.py mark-read /path/to/message1 /path/to/message2
# Mark all unread INBOX emails as read, then reindex mu
maildir-flag-manager.py mark-read --reindex
# Star specific emails (by path)
maildir-flag-manager.py star /path/to/message1 /path/to/message2
# Star and mark read
maildir-flag-manager.py star --mark-read /path/to/message1
# Dry run — show what would change without modifying anything
maildir-flag-manager.py mark-read --dry-run
"""
import argparse
import os
import shutil
import subprocess
import sys
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
MAILDIR_ACCOUNTS = {
'gmail': os.path.expanduser('~/.mail/gmail/INBOX'),
'cmail': os.path.expanduser('~/.mail/cmail/Inbox'),
}
# ---------------------------------------------------------------------------
# Core flag operations
# ---------------------------------------------------------------------------
def parse_maildir_flags(filename):
"""Extract flags from a maildir filename.
Maildir filenames follow the pattern: unique:2,FLAGS
where FLAGS is a sorted string of flag characters (e.g., "FS" for
Flagged+Seen).
Returns (base, flags_string). If no flags section, returns (filename, '').
"""
if ':2,' in filename:
base, flags = filename.rsplit(':2,', 1)
return base, flags
return filename, ''
def build_flagged_filename(filename, new_flags):
"""Build a maildir filename with the given flags.
Flags are always sorted alphabetically per maildir spec.
"""
base, _ = parse_maildir_flags(filename)
sorted_flags = ''.join(sorted(set(new_flags)))
return f"{base}:2,{sorted_flags}"
def rename_with_flag(file_path, flag, dry_run=False):
"""Add a flag to a single maildir message file via atomic rename.
Handles moving from new/ to cur/ when adding the Seen flag.
Returns True if the flag was added, False if already present.
"""
dirname = os.path.dirname(file_path)
filename = os.path.basename(file_path)
maildir_root = os.path.dirname(dirname)
subdir = os.path.basename(dirname)
_, current_flags = parse_maildir_flags(filename)
if flag in current_flags:
return False
new_flags = current_flags + flag
new_filename = build_flagged_filename(filename, new_flags)
# Messages with the Seen flag belong in cur/, not new/
if 'S' in new_flags and subdir == 'new':
target_dir = os.path.join(maildir_root, 'cur')
else:
target_dir = dirname
new_path = os.path.join(target_dir, new_filename)
if dry_run:
return True
os.rename(file_path, new_path)
return True
def process_maildir(maildir_path, flag, dry_run=False):
"""Add a flag to all messages in a maildir that don't have it.
Scans both new/ and cur/ subdirectories.
Returns (changed_count, skipped_count, error_count).
"""
if not os.path.isdir(maildir_path):
print(f" Skipping {maildir_path} (not found)", file=sys.stderr)
return 0, 0, 0
changed = 0
skipped = 0
errors = 0
for subdir in ('new', 'cur'):
subdir_path = os.path.join(maildir_path, subdir)
if not os.path.isdir(subdir_path):
continue
for filename in os.listdir(subdir_path):
file_path = os.path.join(subdir_path, filename)
if not os.path.isfile(file_path):
continue
try:
if rename_with_flag(file_path, flag, dry_run):
changed += 1
else:
skipped += 1
except Exception as e:
print(f" Error on {filename}: {e}", file=sys.stderr)
errors += 1
return changed, skipped, errors
def process_specific_files(paths, flag, dry_run=False):
"""Add a flag to specific message files by path.
Returns (changed_count, skipped_count, error_count).
"""
changed = 0
skipped = 0
errors = 0
for path in paths:
path = os.path.abspath(path)
if not os.path.isfile(path):
print(f" File not found: {path}", file=sys.stderr)
errors += 1
continue
# Verify file is inside a maildir (parent should be cur/ or new/)
parent_dir = os.path.basename(os.path.dirname(path))
if parent_dir not in ('cur', 'new'):
print(f" Not in a maildir cur/ or new/ dir: {path}",
file=sys.stderr)
errors += 1
continue
try:
if rename_with_flag(path, flag, dry_run):
changed += 1
else:
skipped += 1
except Exception as e:
print(f" Error on {path}: {e}", file=sys.stderr)
errors += 1
return changed, skipped, errors
def reindex_mu():
"""Run mu index to update the database after flag changes."""
mu_path = shutil.which('mu')
if not mu_path:
print("Warning: mu not found in PATH, skipping reindex",
file=sys.stderr)
return False
try:
result = subprocess.run(
[mu_path, 'index'],
capture_output=True, text=True, timeout=120
)
if result.returncode == 0:
print("mu index: database updated")
return True
else:
print(f"mu index failed: {result.stderr}", file=sys.stderr)
return False
except subprocess.TimeoutExpired:
print("mu index timed out after 120s", file=sys.stderr)
return False
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_mark_read(args):
"""Mark emails as read (add Seen flag)."""
flag = 'S'
action = "Marking as read"
if args.dry_run:
action = "Would mark as read"
total_changed = 0
total_skipped = 0
total_errors = 0
if args.paths:
print(f"{action}: {len(args.paths)} specific message(s)")
c, s, e = process_specific_files(args.paths, flag, args.dry_run)
total_changed += c
total_skipped += s
total_errors += e
else:
for name, maildir_path in MAILDIR_ACCOUNTS.items():
print(f"{action} in {name} ({maildir_path})")
c, s, e = process_maildir(maildir_path, flag, args.dry_run)
total_changed += c
total_skipped += s
total_errors += e
if c > 0:
print(f" {c} message(s) marked as read")
if s > 0:
print(f" {s} already read")
print(f"\nTotal: {total_changed} changed, {total_skipped} already set, "
f"{total_errors} errors")
if args.reindex and not args.dry_run and total_changed > 0:
reindex_mu()
return 0 if total_errors == 0 else 1
def cmd_star(args):
"""Star/flag emails (add Flagged flag)."""
flag = 'F'
action = "Starring"
if args.dry_run:
action = "Would star"
if not args.paths:
print("Error: star requires specific message paths", file=sys.stderr)
return 1
print(f"{action}: {len(args.paths)} message(s)")
total_changed = 0
total_skipped = 0
total_errors = 0
c, s, e = process_specific_files(args.paths, flag, args.dry_run)
total_changed += c
total_skipped += s
total_errors += e
# Also mark as read if requested
if args.mark_read:
print("Also marking as read...")
c2, _, e2 = process_specific_files(args.paths, 'S', args.dry_run)
total_changed += c2
total_errors += e2
print(f"\nTotal: {total_changed} flag(s) changed, {total_skipped} already set, "
f"{total_errors} errors")
if args.reindex and not args.dry_run and total_changed > 0:
reindex_mu()
return 0 if total_errors == 0 else 1
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Manage maildir flags (read, starred) across email accounts."
)
subparsers = parser.add_subparsers(dest='command', required=True)
# mark-read
p_read = subparsers.add_parser(
'mark-read',
help="Mark emails as read (add Seen flag)"
)
p_read.add_argument(
'paths', nargs='*',
help="Specific message file paths. If omitted, marks all unread "
"messages in configured INBOX maildirs."
)
p_read.add_argument(
'--reindex', action='store_true',
help="Run mu index after changing flags"
)
p_read.add_argument(
'--dry-run', action='store_true',
help="Show what would change without modifying anything"
)
p_read.set_defaults(func=cmd_mark_read)
# star
p_star = subparsers.add_parser(
'star',
help="Star/flag emails (add Flagged flag)"
)
p_star.add_argument(
'paths', nargs='+',
help="Message file paths to star"
)
p_star.add_argument(
'--mark-read', action='store_true',
help="Also mark starred messages as read"
)
p_star.add_argument(
'--reindex', action='store_true',
help="Run mu index after changing flags"
)
p_star.add_argument(
'--dry-run', action='store_true',
help="Show what would change without modifying anything"
)
p_star.set_defaults(func=cmd_star)
args = parser.parse_args()
sys.exit(args.func(args))
if __name__ == '__main__':
main()
|