aboutsummaryrefslogtreecommitdiff
path: root/docs/scripts/eml-view-and-extract-attachments.py
diff options
context:
space:
mode:
authorCraig Jennings <c@cjennings.net>2026-02-07 21:41:19 -0600
committerCraig Jennings <c@cjennings.net>2026-02-07 21:41:19 -0600
commit24a681c0696fbdad9c32073ffd24cf7218296ed2 (patch)
treee5b43c8c62e027b7cabffa31b43238027ec284d0 /docs/scripts/eml-view-and-extract-attachments.py
parentbf6eef6183df6051b2423c7850c230406861f927 (diff)
downloadarchangel-24a681c0696fbdad9c32073ffd24cf7218296ed2.tar.gz
archangel-24a681c0696fbdad9c32073ffd24cf7218296ed2.zip
docs: sync templates, rename workflows and notes.org
Sync from templates. Rename NOTES.org to notes.org, session-wrap-up to wrap-it-up, retrospective-workflow to retrospective, session-start to startup. Update all references.
Diffstat (limited to 'docs/scripts/eml-view-and-extract-attachments.py')
-rw-r--r--docs/scripts/eml-view-and-extract-attachments.py401
1 files changed, 373 insertions, 28 deletions
diff --git a/docs/scripts/eml-view-and-extract-attachments.py b/docs/scripts/eml-view-and-extract-attachments.py
index f498b83..3201c99 100644
--- a/docs/scripts/eml-view-and-extract-attachments.py
+++ b/docs/scripts/eml-view-and-extract-attachments.py
@@ -1,34 +1,343 @@
#!/usr/bin/env python3
+"""Extract email content and attachments from EML files.
+
+Without --output-dir: parse and print to stdout (backwards compatible).
+With --output-dir: full pipeline — extract, auto-rename, refile, clean up.
+"""
+
+import argparse
import email
-import sys
+import email.utils
import os
+import re
+import shutil
+import sys
+import tempfile
-def extract_attachments(eml_file):
- with open(eml_file, 'rb') as f:
- msg = email.message_from_binary_file(f)
- # Extract plain text body
- body_text = ""
+# ---------------------------------------------------------------------------
+# Parsing functions (no I/O beyond reading the input file)
+# ---------------------------------------------------------------------------
+
+def parse_received_headers(msg):
+ """Parse Received headers to extract sent/received times and servers."""
+ received_headers = msg.get_all('Received', [])
+
+ sent_server = None
+ sent_time = None
+ received_server = None
+ received_time = None
+
+ for header in received_headers:
+ header = ' '.join(header.split())
+
+ time_match = re.search(r';\s*(.+)$', header)
+ timestamp = time_match.group(1).strip() if time_match else None
+
+ from_match = re.search(r'from\s+([\w.-]+)', header)
+ by_match = re.search(r'by\s+([\w.-]+)', header)
+
+ if from_match and by_match and received_server is None:
+ received_time = timestamp
+ received_server = by_match.group(1)
+ sent_server = from_match.group(1)
+ sent_time = timestamp
+
+ if received_server is None and received_headers:
+ header = ' '.join(received_headers[0].split())
+ time_match = re.search(r';\s*(.+)$', header)
+ received_time = time_match.group(1).strip() if time_match else None
+ by_match = re.search(r'by\s+([\w.-]+)', header)
+ received_server = by_match.group(1) if by_match else "unknown"
+
+ return {
+ 'sent_time': sent_time,
+ 'sent_server': sent_server,
+ 'received_time': received_time,
+ 'received_server': received_server
+ }
+
+
+def extract_body(msg):
+ """Walk MIME parts, prefer text/plain, fall back to html2text on text/html.
+
+ Returns body text string.
+ """
+ plain_text = None
+ html_text = None
+
+ for part in msg.walk():
+ content_type = part.get_content_type()
+ if content_type == "text/plain" and plain_text is None:
+ payload = part.get_payload(decode=True)
+ if payload is not None:
+ plain_text = payload.decode('utf-8', errors='ignore')
+ elif content_type == "text/html" and html_text is None:
+ payload = part.get_payload(decode=True)
+ if payload is not None:
+ html_text = payload.decode('utf-8', errors='ignore')
+
+ if plain_text is not None:
+ return plain_text
+
+ if html_text is not None:
+ try:
+ import html2text
+ h = html2text.HTML2Text()
+ h.body_width = 0
+ return h.handle(html_text)
+ except ImportError:
+ # Strip HTML tags as fallback if html2text not installed
+ return re.sub(r'<[^>]+>', '', html_text)
+
+ return ""
+
+
+def extract_metadata(msg):
+ """Extract email metadata from headers.
+
+ Returns dict with from, to, subject, date, and timing info.
+ """
+ return {
+ 'from': msg.get('From'),
+ 'to': msg.get('To'),
+ 'subject': msg.get('Subject'),
+ 'date': msg.get('Date'),
+ 'timing': parse_received_headers(msg),
+ }
+
+
+def generate_basename(metadata):
+ """Generate date-sender prefix from metadata.
+
+ Returns e.g. "2026-02-05-1136-Jonathan".
+ Falls back to "unknown" for missing/malformed Date or From.
+ """
+ # Parse date
+ date_str = metadata.get('date')
+ date_prefix = "unknown"
+ if date_str:
+ try:
+ parsed = email.utils.parsedate_to_datetime(date_str)
+ date_prefix = parsed.strftime('%Y-%m-%d-%H%M')
+ except (ValueError, TypeError):
+ pass
+
+ # Parse sender first name
+ from_str = metadata.get('from')
+ sender = "unknown"
+ if from_str:
+ # Extract display name or email local part
+ display_name, addr = email.utils.parseaddr(from_str)
+ if display_name:
+ sender = display_name.split()[0]
+ elif addr:
+ sender = addr.split('@')[0]
+
+ return f"{date_prefix}-{sender}"
+
+
+def _clean_for_filename(text, max_length=80):
+ """Clean text for use in a filename.
+
+ Replace spaces with hyphens, strip chars unsafe for filenames,
+ collapse multiple hyphens.
+ """
+ text = text.strip()
+ text = text.replace(' ', '-')
+ # Keep alphanumeric, hyphens, dots, underscores
+ text = re.sub(r'[^\w\-.]', '', text)
+ # Collapse multiple hyphens
+ text = re.sub(r'-{2,}', '-', text)
+ # Strip leading/trailing hyphens
+ text = text.strip('-')
+ if len(text) > max_length:
+ text = text[:max_length].rstrip('-')
+ return text
+
+
+def generate_email_filename(basename, subject):
+ """Generate email filename from basename and subject.
+
+ Returns e.g. "2026-02-05-1136-Jonathan-EMAIL-Re-Fw-4319-Danneel-Street"
+ (without extension — caller adds .eml or .txt).
+ """
+ if subject:
+ clean_subject = _clean_for_filename(subject)
+ else:
+ clean_subject = "no-subject"
+ return f"{basename}-EMAIL-{clean_subject}"
+
+
+def generate_attachment_filename(basename, original_filename):
+ """Generate attachment filename from basename and original filename.
+
+ Returns e.g. "2026-02-05-1136-Jonathan-ATTACH-Ltr-Carrollton.pdf".
+ Preserves original extension.
+ """
+ if not original_filename:
+ return f"{basename}-ATTACH-unnamed"
+
+ name, ext = os.path.splitext(original_filename)
+ clean_name = _clean_for_filename(name)
+ return f"{basename}-ATTACH-{clean_name}{ext}"
+
+
+# ---------------------------------------------------------------------------
+# I/O functions (file operations)
+# ---------------------------------------------------------------------------
+
+def save_attachments(msg, output_dir, basename):
+ """Write attachment files to output_dir with auto-renamed filenames.
+
+ Returns list of dicts: {original_name, renamed_name, path}.
+ """
+ results = []
for part in msg.walk():
- if part.get_content_type() == "text/plain":
- body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
- break
- elif part.get_content_type() == "text/html":
- # Fallback to HTML if no plain text
- if not body_text:
- body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore')
-
- # Print email metadata and body
- print(f"From: {msg.get('From')}")
- print(f"To: {msg.get('To')}")
- print(f"Subject: {msg.get('Subject')}")
- print(f"Date: {msg.get('Date')}")
+ if part.get_content_maintype() == 'multipart':
+ continue
+ if part.get('Content-Disposition') is None:
+ continue
+
+ filename = part.get_filename()
+ if filename:
+ renamed = generate_attachment_filename(basename, filename)
+ filepath = os.path.join(output_dir, renamed)
+ with open(filepath, 'wb') as f:
+ f.write(part.get_payload(decode=True))
+ results.append({
+ 'original_name': filename,
+ 'renamed_name': renamed,
+ 'path': filepath,
+ })
+
+ return results
+
+
+def save_text(text, filepath):
+ """Write body text to a .txt file."""
+ with open(filepath, 'w', encoding='utf-8') as f:
+ f.write(text)
+
+
+# ---------------------------------------------------------------------------
+# Pipeline function
+# ---------------------------------------------------------------------------
+
+def process_eml(eml_path, output_dir):
+ """Full extraction pipeline.
+
+ 1. Create temp extraction dir
+ 2. Copy EML into temp dir
+ 3. Parse email (metadata, body, attachments)
+ 4. Generate filenames from headers
+ 5. Save renamed .eml, .txt, and attachments to temp dir
+ 6. Check for collisions in output_dir
+ 7. Move all files to output_dir
+ 8. Clean up temp dir
+ 9. Return results dict
+ """
+ eml_path = os.path.abspath(eml_path)
+ output_dir = os.path.abspath(output_dir)
+ os.makedirs(output_dir, exist_ok=True)
+
+ # Create temp dir as sibling of the EML file
+ eml_dir = os.path.dirname(eml_path)
+ temp_dir = tempfile.mkdtemp(prefix='extract-', dir=eml_dir)
+
+ try:
+ # Copy EML to temp dir
+ temp_eml = os.path.join(temp_dir, os.path.basename(eml_path))
+ shutil.copy2(eml_path, temp_eml)
+
+ # Parse
+ with open(eml_path, 'rb') as f:
+ msg = email.message_from_binary_file(f)
+
+ metadata = extract_metadata(msg)
+ body = extract_body(msg)
+ basename = generate_basename(metadata)
+ email_stem = generate_email_filename(basename, metadata['subject'])
+
+ # Save renamed EML
+ renamed_eml = f"{email_stem}.eml"
+ renamed_eml_path = os.path.join(temp_dir, renamed_eml)
+ os.rename(temp_eml, renamed_eml_path)
+
+ # Save .txt
+ renamed_txt = f"{email_stem}.txt"
+ renamed_txt_path = os.path.join(temp_dir, renamed_txt)
+ save_text(body, renamed_txt_path)
+
+ # Save attachments
+ attachment_results = save_attachments(msg, temp_dir, basename)
+
+ # Build file list
+ files = [
+ {'type': 'eml', 'name': renamed_eml, 'path': None},
+ {'type': 'txt', 'name': renamed_txt, 'path': None},
+ ]
+ for att in attachment_results:
+ files.append({
+ 'type': 'attach',
+ 'name': att['renamed_name'],
+ 'path': None,
+ })
+
+ # Check for collisions in output_dir
+ for file_info in files:
+ dest = os.path.join(output_dir, file_info['name'])
+ if os.path.exists(dest):
+ raise FileExistsError(
+ f"Collision: '{file_info['name']}' already exists in {output_dir}"
+ )
+
+ # Move all files to output_dir
+ for file_info in files:
+ src = os.path.join(temp_dir, file_info['name'])
+ dest = os.path.join(output_dir, file_info['name'])
+ shutil.move(src, dest)
+ file_info['path'] = dest
+
+ return {
+ 'metadata': metadata,
+ 'body': body,
+ 'files': files,
+ }
+
+ finally:
+ # Clean up temp dir
+ if os.path.exists(temp_dir):
+ shutil.rmtree(temp_dir)
+
+
+# ---------------------------------------------------------------------------
+# Stdout display (backwards-compatible mode)
+# ---------------------------------------------------------------------------
+
+def print_email(eml_path):
+ """Parse and print email to stdout. Extract attachments alongside EML.
+
+ This preserves the original script behavior when --output-dir is not given.
+ """
+ with open(eml_path, 'rb') as f:
+ msg = email.message_from_binary_file(f)
+
+ metadata = extract_metadata(msg)
+ body = extract_body(msg)
+ timing = metadata['timing']
+
+ print(f"From: {metadata['from']}")
+ print(f"To: {metadata['to']}")
+ print(f"Subject: {metadata['subject']}")
+ print(f"Date: {metadata['date']}")
+ print(f"Sent: {timing['sent_time']} (via {timing['sent_server']})")
+ print(f"Received: {timing['received_time']} (at {timing['received_server']})")
print()
- print(body_text)
+ print(body)
print()
- # Extract attachments
- attachments = []
+ # Extract attachments alongside the EML file
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
@@ -37,17 +346,53 @@ def extract_attachments(eml_file):
filename = part.get_filename()
if filename:
- filepath = os.path.join(os.path.dirname(eml_file), filename)
+ filepath = os.path.join(os.path.dirname(eml_path), filename)
with open(filepath, 'wb') as f:
f.write(part.get_payload(decode=True))
- attachments.append(filename)
print(f"Extracted attachment: {filename}")
- return attachments
+
+def print_pipeline_summary(result):
+ """Print summary after pipeline extraction."""
+ metadata = result['metadata']
+ timing = metadata['timing']
+
+ print(f"From: {metadata['from']}")
+ print(f"To: {metadata['to']}")
+ print(f"Subject: {metadata['subject']}")
+ print(f"Date: {metadata['date']}")
+ print(f"Sent: {timing['sent_time']} (via {timing['sent_server']})")
+ print(f"Received: {timing['received_time']} (at {timing['received_server']})")
+ print()
+ print("Files created:")
+ for f in result['files']:
+ print(f" [{f['type']:>6}] {f['name']}")
+ print(f"\nOutput directory: {os.path.dirname(result['files'][0]['path'])}")
+
+
+# ---------------------------------------------------------------------------
+# CLI
+# ---------------------------------------------------------------------------
if __name__ == "__main__":
- if len(sys.argv) < 2:
- print("Usage: extract_attachments.py <eml_file>")
+ parser = argparse.ArgumentParser(
+ description="Extract email content and attachments from EML files."
+ )
+ parser.add_argument('eml_path', help="Path to source EML file")
+ parser.add_argument(
+ '--output-dir',
+ help="Destination directory for extracted files. "
+ "Without this flag, prints to stdout only (backwards compatible)."
+ )
+
+ args = parser.parse_args()
+
+ if not os.path.isfile(args.eml_path):
+ print(f"Error: '{args.eml_path}' not found or is not a file.", file=sys.stderr)
sys.exit(1)
- extract_attachments(sys.argv[1])
+ if args.output_dir:
+ result = process_eml(args.eml_path, args.output_dir)
+ print_pipeline_summary(result)
+ else:
+ print_email(args.eml_path)