aboutsummaryrefslogtreecommitdiff
path: root/scripts/update-skills.py
blob: 364559654d778dc5aa8c8e15bb30aefd06702e7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#!/usr/bin/env python3
"""Keep forked skills and commands in sync with their upstreams.

Each fork has a manifest at upstreams/<name>/manifest.json:

    url                upstream GitHub (or any git) URL
    ref                branch or tag to track
    subpath            path inside the upstream repo ("" = repo root)
    target             repo-relative path the fork lives at
    files              optional map of upstream-relative -> target-relative
                       paths; when present only mapped files are tracked
                       (a key starting with "/" is upstream-repo-root-relative)
    license            upstream license identifier (informational)
    last_synced_commit upstream commit of the last completed sync

The committed baseline snapshot at upstreams/<name>/baseline/ mirrors the
*target* layout and is the 3-way merge base. The script never writes a fork's
target files: check classifies, merge-file merges to stdout; only bootstrap
and mark-synced write, and only to the manifest and baseline.

Commands:
    list                       show forks and sync state
    bootstrap NAME             snapshot upstream@ref as the baseline
    check NAME [--json]        clone upstream to cache, classify every file
    merge-file NAME PATH       3-way merge one file to stdout (exit 1 = conflict)
    mark-synced NAME           refresh baseline + manifest from the checked cache
"""
from __future__ import annotations

import argparse
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path

EXCLUDE_DIRS = {".git", "node_modules", "__pycache__", ".pytest_cache"}
EXCLUDE_SUFFIXES = {".pyc"}


def fail(msg: str, code: int = 2) -> None:
    print(f"error: {msg}", file=sys.stderr)
    sys.exit(code)


def walk_files(base: Path) -> list[str]:
    """Relative paths of all files under base, skipping excluded dirs/suffixes."""
    out: list[str] = []
    if not base.is_dir():
        return out
    stack = [base]
    while stack:
        d = stack.pop()
        for p in sorted(d.iterdir()):
            if p.is_dir():
                if p.name not in EXCLUDE_DIRS:
                    stack.append(p)
            elif p.is_file() and p.suffix not in EXCLUDE_SUFFIXES:
                out.append(str(p.relative_to(base)))
    return sorted(out)


class Fork:
    def __init__(self, root: Path, name: str):
        self.root = root
        self.name = name
        self.dir = root / "upstreams" / name
        self.manifest_path = self.dir / "manifest.json"
        if not self.manifest_path.is_file():
            fail(f"no manifest for fork '{name}' at {self.manifest_path}")
        m = json.loads(self.manifest_path.read_text())
        self.url: str = m["url"]
        self.ref: str = m["ref"]
        self.subpath: str = m.get("subpath", "")
        self.target: Path = root / m["target"]
        self.files: dict[str, str] | None = m.get("files")
        self.manifest = m
        self.baseline = self.dir / "baseline"

    def checkout(self, cache: Path) -> Path:
        return cache / self.name

    def clone(self, cache: Path) -> Path:
        co = self.checkout(cache)
        if co.exists():
            shutil.rmtree(co)
        co.parent.mkdir(parents=True, exist_ok=True)
        r = subprocess.run(
            ["git", "clone", "--quiet", "--depth", "1",
             "--branch", self.ref, self.url, str(co)],
            capture_output=True, text=True,
        )
        if r.returncode != 0:
            fail(f"could not clone {self.name} upstream from {self.url}: "
                 f"{r.stderr.strip()}")
        return co

    def upstream_commit(self, cache: Path) -> str:
        r = subprocess.run(["git", "-C", str(self.checkout(cache)),
                            "rev-parse", "HEAD"],
                           capture_output=True, text=True, check=True)
        return r.stdout.strip()

    def upstream_files(self, co: Path) -> dict[str, Path]:
        """Map of target-relative path -> absolute upstream source path."""
        src = co / self.subpath if self.subpath else co
        if self.files is not None:
            out = {}
            for ukey, trel in self.files.items():
                upath = co / ukey[1:] if ukey.startswith("/") else src / ukey
                if upath.is_file():
                    out[trel] = upath
            return out
        return {rel: src / rel for rel in walk_files(src)}

    def target_files(self) -> dict[str, Path]:
        if self.files is not None:
            return {trel: self.target / trel for trel in self.files.values()
                    if (self.target / trel).is_file()}
        return {rel: self.target / rel for rel in walk_files(self.target)}

    def baseline_files(self) -> dict[str, Path]:
        return {rel: self.baseline / rel for rel in walk_files(self.baseline)}

    def save_manifest(self) -> None:
        self.manifest_path.write_text(json.dumps(self.manifest, indent=2) + "\n")


def classify(fork: Fork, co: Path) -> list[dict[str, str]]:
    up = fork.upstream_files(co)
    tg = fork.target_files()
    bl = fork.baseline_files()
    have_baseline = fork.baseline.is_dir()

    def read(p: Path | None) -> bytes | None:
        return p.read_bytes() if p is not None and p.is_file() else None

    rows = []
    for rel in sorted(set(up) | set(tg) | set(bl)):
        u, t, b = read(up.get(rel)), read(tg.get(rel)), read(bl.get(rel))
        if not have_baseline:
            if u is not None and t is not None:
                status = "unchanged" if u == t else "no-baseline"
            elif u is not None:
                status = "upstream-new"
            else:
                status = "local-new"
        elif u is None and b is None:
            status = "local-new"
        elif t is None and b is None:
            status = "upstream-new"
        elif u is None and b is not None:
            status = "upstream-deleted"
        elif t is None and b is not None:
            status = "local-deleted"
        elif u == b and t == b:
            status = "unchanged"
        elif u != b and t == b:
            status = "upstream-changed"
        elif u == b and t != b:
            status = "local-only"
        elif u == t:
            status = "unchanged"  # converged independently
        else:
            status = "both-changed"
        rows.append({"path": rel, "status": status})
    return rows


def cmd_list(root: Path, _cache: Path, _args: argparse.Namespace) -> int:
    updir = root / "upstreams"
    manifests = sorted(updir.glob("*/manifest.json")) if updir.is_dir() else []
    if not manifests:
        print("no forks registered (no upstreams/*/manifest.json)")
        return 0
    for mp in manifests:
        m = json.loads(mp.read_text())
        sha = m.get("last_synced_commit")
        state = f"last synced {sha[:10]}" if sha else "never synced"
        print(f"{m['name']}: {state}  ({m['url']} @ {m['ref']})")
    return 0


def cmd_bootstrap(root: Path, cache: Path, args: argparse.Namespace) -> int:
    fork = Fork(root, args.name)
    co = fork.clone(cache)
    up = fork.upstream_files(co)
    if fork.baseline.exists():
        shutil.rmtree(fork.baseline)
    for rel, src in up.items():
        dest = fork.baseline / rel
        dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(src, dest)
    sha = fork.upstream_commit(cache)
    fork.manifest["last_synced_commit"] = sha
    fork.save_manifest()
    print(f"bootstrapped {fork.name} at {sha} ({len(up)} files)")
    return 0


def cmd_check(root: Path, cache: Path, args: argparse.Namespace) -> int:
    fork = Fork(root, args.name)
    co = fork.clone(cache)
    rows = classify(fork, co)
    sha = fork.upstream_commit(cache)
    if args.json:
        print(json.dumps({"name": fork.name, "upstream_commit": sha,
                          "files": rows}, indent=2))
    else:
        print(f"{fork.name} @ upstream {sha}")
        for r in rows:
            print(f"  {r['status']:<17} {r['path']}")
        pending = sum(r["status"] != "unchanged" for r in rows)
        print(f"{pending} file(s) need attention" if pending
              else "everything in sync")
    return 0


def cmd_merge_file(root: Path, cache: Path, args: argparse.Namespace) -> int:
    fork = Fork(root, args.name)
    co = fork.checkout(cache)
    if not co.is_dir():
        fail(f"no cached checkout for {fork.name} — run check first")
    up = fork.upstream_files(co)
    if args.path not in up:
        fail(f"{args.path} not present upstream for {fork.name}")
    local = fork.target / args.path
    if not local.is_file():
        fail(f"{args.path} not present locally under {fork.target}")
    base = fork.baseline / args.path
    with tempfile.NamedTemporaryFile() as empty:
        base_arg = str(base) if base.is_file() else empty.name
        r = subprocess.run(
            ["git", "merge-file", "--stdout",
             "-L", "local", "-L", "baseline", "-L", "upstream",
             str(local), base_arg, str(up[args.path])],
            capture_output=True, text=True,
        )
    # git merge-file exits with the conflict count, or a negative value on
    # hard error — which subprocess reports as >=128 (255 observed for -1).
    if r.returncode >= 128 or r.returncode < 0:
        fail(f"git merge-file failed on {args.path}: {r.stderr.strip()}")
    sys.stdout.write(r.stdout)
    return 0 if r.returncode == 0 else 1


def cmd_mark_synced(root: Path, cache: Path, args: argparse.Namespace) -> int:
    fork = Fork(root, args.name)
    co = fork.checkout(cache)
    if not co.is_dir():
        fail(f"no cached checkout for {fork.name} — run check first")
    up = fork.upstream_files(co)
    if fork.baseline.exists():
        shutil.rmtree(fork.baseline)
    for rel, src in up.items():
        dest = fork.baseline / rel
        dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(src, dest)
    sha = fork.upstream_commit(cache)
    fork.manifest["last_synced_commit"] = sha
    fork.save_manifest()
    print(f"{fork.name} synced at {sha}")
    return 0


def main() -> int:
    ap = argparse.ArgumentParser(description=__doc__)
    ap.add_argument("--root", type=Path,
                    default=Path(__file__).resolve().parents[1],
                    help="repo root (default: this script's repo)")
    ap.add_argument("--cache", type=Path,
                    default=Path(tempfile.gettempdir()) / "update-skills",
                    help="dir for upstream checkouts")
    sub = ap.add_subparsers(dest="cmd", required=True)
    sub.add_parser("list")
    p = sub.add_parser("bootstrap")
    p.add_argument("name")
    p = sub.add_parser("check")
    p.add_argument("name")
    p.add_argument("--json", action="store_true")
    p = sub.add_parser("merge-file")
    p.add_argument("name")
    p.add_argument("path")
    p = sub.add_parser("mark-synced")
    p.add_argument("name")
    args = ap.parse_args()
    handlers = {"list": cmd_list, "bootstrap": cmd_bootstrap,
                "check": cmd_check, "merge-file": cmd_merge_file,
                "mark-synced": cmd_mark_synced}
    return handlers[args.cmd](args.root.resolve(), args.cache, args)


if __name__ == "__main__":
    sys.exit(main())