1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
#!/usr/bin/env python3
"""Point-in-time snapshot of pending cross-agent messages across local projects.
See cross-agent-status.md. Pending = messages in inbox/from-agents/ whose
CONVERSATION_ID has no MESSAGE_TYPE: release at a later #+TIMESTAMP.
HALT: prints a prominent banner before normal output, but continues to enumerate.
"""
from __future__ import annotations
import argparse
import glob
import json
import os
import re
import sys
from pathlib import Path
CONFIG_DIR = Path.home() / ".config" / "cross-agent-comms"
HALT_FILE = CONFIG_DIR / "HALT"
DEFAULT_GLOB = str(Path.home() / "projects" / "*" / "inbox" / "from-agents") + "/"
def parse_frontmatter(path: Path) -> dict[str, str]:
try:
text = path.read_text()
except OSError:
return {}
fm: dict[str, str] = {}
for line in text.splitlines():
line = line.rstrip()
if not line:
if fm:
break
continue
m = re.match(r"#\+([A-Z_]+):\s*(.*)", line)
if m:
fm[m.group(1)] = m.group(2).strip()
elif fm:
break
return fm
def project_name_from_path(path: str) -> str:
"""Walk up from path to find ~/projects/<name>/..."""
home = str(Path.home())
parts = Path(path).parts
for i, part in enumerate(parts):
if part == "projects" and i + 1 < len(parts) and str(Path(*parts[: i + 1])) == os.path.join(home, "projects"):
return parts[i + 1]
# Fallback: dir three levels up from the .org file (project/inbox/from-agents/file.org)
return Path(path).parent.parent.parent.name
def scan_project(inbox_dir: Path) -> tuple[int, str | None, int | None]:
"""Return (pending_count, most_recent_filename_or_None, most_recent_age_seconds_or_None)."""
if not inbox_dir.is_dir():
return 0, None, None
# Group .org files by CONVERSATION_ID, also collect release timestamps per conv.
org_files = sorted(inbox_dir.glob("*.org"))
if not org_files:
return 0, None, None
by_conv: dict[str, list[tuple[str, str, Path]]] = {} # conv_id -> [(timestamp, msg_type, path)]
for f in org_files:
fm = parse_frontmatter(f)
conv = fm.get("CONVERSATION_ID")
ts = fm.get("TIMESTAMP")
mt = fm.get("MESSAGE_TYPE")
if not conv or not ts or not mt:
# Malformed file: count as pending under conv "_unparseable".
by_conv.setdefault("_unparseable", []).append(("", "request", f))
continue
by_conv.setdefault(conv, []).append((ts, mt, f))
pending_files: list[Path] = []
for conv, entries in by_conv.items():
entries.sort(key=lambda e: e[0])
# Find the latest release timestamp.
release_ts = None
for ts, mt, _f in entries:
if mt == "release" and (release_ts is None or ts > release_ts):
release_ts = ts
for ts, mt, f in entries:
if mt == "release":
continue
if release_ts is not None and ts <= release_ts:
continue
pending_files.append(f)
if not pending_files:
return 0, None, None
# Most-recent by mtime (proxy for arrival order).
most_recent = max(pending_files, key=lambda p: p.stat().st_mtime)
import time
age = int(time.time() - most_recent.stat().st_mtime)
return len(pending_files), most_recent.name, age
def fmt_age(seconds: int | None) -> str:
if seconds is None:
return "—"
if seconds < 60:
return f"{seconds}s ago"
if seconds < 3600:
return f"{seconds // 60} min ago"
if seconds < 86400:
return f"{seconds // 3600} hr ago"
return f"{seconds // 86400} day(s) ago"
def render_banner_if_halt() -> None:
if not HALT_FILE.exists():
return
try:
reason = HALT_FILE.read_text().strip()
except OSError:
reason = "(HALT file unreadable; treated as halted)"
print("⚠ HALT ACTIVE — cross-agent comms paused")
if reason:
print(f" reason: {reason}")
print(f" clear: rm {HALT_FILE} (or: cross-agent-resume)")
print()
def main() -> int:
parser = argparse.ArgumentParser(description="Snapshot of pending cross-agent messages across local projects.")
parser.add_argument("--json", action="store_true", help="Emit JSON output")
parser.add_argument("--projects-glob", default=DEFAULT_GLOB,
help=f"Glob for project from-agents dirs (default: {DEFAULT_GLOB})")
args = parser.parse_args()
render_banner_if_halt()
matched = sorted(glob.glob(args.projects_glob))
rows = []
for path in matched:
inbox = Path(path)
if not inbox.is_dir():
continue
proj = project_name_from_path(path)
count, most_recent, age = scan_project(inbox)
rows.append({
"name": proj,
"pending_count": count,
"most_recent": (
{"filename": most_recent, "age_seconds": age}
if most_recent else None
),
})
# Sort: pending-first, then alphabetical by name.
rows.sort(key=lambda r: (-r["pending_count"], r["name"]))
if args.json:
import datetime as _dt
payload = {
"scanned_at": _dt.datetime.now(_dt.timezone.utc).isoformat(),
"halt_active": HALT_FILE.exists(),
"projects": rows,
}
print(json.dumps(payload, indent=2))
return 0
if not rows:
print("No projects with inbox/from-agents/ found — 0 pending.")
return 0
# Human-readable table.
name_w = max(len("project"), max(len(r["name"]) for r in rows))
print(f"{'project':<{name_w}} pending most-recent")
for r in rows:
most_recent_str = "—"
if r["most_recent"]:
most_recent_str = f"{r['most_recent']['filename']} ({fmt_age(r['most_recent']['age_seconds'])})"
print(f"{r['name']:<{name_w}} {r['pending_count']:<7} {most_recent_str}")
return 0
if __name__ == "__main__":
sys.exit(main())
|