"""Unit tests for scripts/zfs-pre-snapshot. The script snapshots the root dataset before a pacman transaction and prunes to the most recent KEEP pre-pacman snapshots. These tests drive the real script with a fake zfs on PATH (snapshot/destroy logged, list returns a fixture set) and env-rooted state, so nothing touches a real pool. """ import os import shutil import subprocess import tempfile import time import unittest REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) SCRIPT = os.path.join(REPO_ROOT, "scripts/zfs-pre-snapshot") FAKE_ZFS = os.path.join(os.path.dirname(__file__), "fake-zfs") DATASET = "tank/test" # Five pre-pacman snapshots oldest->newest (zfs list -s creation is ascending), # plus one autosnap that the grep filter must ignore. SNAPSHOTS = "\n".join([ f"{DATASET}@autosnap_2026-01-01", f"{DATASET}@pre-pacman_2026-06-01", f"{DATASET}@pre-pacman_2026-06-02", f"{DATASET}@pre-pacman_2026-06-03", f"{DATASET}@pre-pacman_2026-06-04", f"{DATASET}@pre-pacman_2026-06-05", ]) + "\n" class Harness(unittest.TestCase): def setUp(self): self.tmp = tempfile.mkdtemp(prefix="zfs-pre-snap-") self.bin = os.path.join(self.tmp, "bin") os.makedirs(self.bin) shutil.copy(FAKE_ZFS, os.path.join(self.bin, "zfs")) self.log = os.path.join(self.tmp, "zfs.log") self.snaps = os.path.join(self.tmp, "snaps") with open(self.snaps, "w") as f: f.write(SNAPSHOTS) self.lock = os.path.join(self.tmp, "lock") def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def run_script(self, keep="3", fail=False, snaps=None): env = os.environ.copy() env["PATH"] = self.bin + os.pathsep + env["PATH"] env["ZFS_PRE_DATASET"] = DATASET env["ZFS_PRE_LOCKFILE"] = self.lock env["ZFS_PRE_KEEP"] = keep env["FAKE_ZFS_LOG"] = self.log env["FAKE_ZFS_SNAPSHOTS"] = snaps if snaps is not None else self.snaps if fail: env["FAKE_ZFS_SNAPSHOT_FAIL"] = "1" return subprocess.run([SCRIPT], env=env, capture_output=True, text=True, timeout=15) def log_lines(self): try: with open(self.log) as f: return [ln for ln in f.read().splitlines() if ln.strip()] except FileNotFoundError: return [] class TestSnapshot(Harness): def test_creates_a_pre_pacman_snapshot(self): self.run_script() snaps = [ln for ln in self.log_lines() if ln.startswith("snapshot ")] self.assertEqual(len(snaps), 1) self.assertIn(f"snapshot {DATASET}@pre-pacman_", snaps[0]) def test_skips_when_lockfile_is_fresh(self): # A lockfile newer than MIN_INTERVAL → no snapshot this run. open(self.lock, "w").close() os.utime(self.lock, (time.time(), time.time())) self.run_script() self.assertEqual([ln for ln in self.log_lines() if ln.startswith("snapshot ")], []) class TestPrune(Harness): def test_prunes_oldest_beyond_keep(self): # 5 pre-pacman snapshots, KEEP=3 → the two oldest are destroyed. self.run_script(keep="3") destroyed = [ln.split(" ", 1)[1] for ln in self.log_lines() if ln.startswith("destroy ")] self.assertEqual(destroyed, [f"{DATASET}@pre-pacman_2026-06-01", f"{DATASET}@pre-pacman_2026-06-02"]) def test_never_destroys_non_pre_pacman_snapshots(self): self.run_script(keep="1") destroyed = [ln for ln in self.log_lines() if ln.startswith("destroy ")] self.assertFalse(any("autosnap" in ln for ln in destroyed)) def test_no_prune_when_at_or_under_keep(self): # KEEP=5 with exactly 5 pre-pacman snapshots → nothing destroyed. self.run_script(keep="5") self.assertEqual([ln for ln in self.log_lines() if ln.startswith("destroy ")], []) class TestError(Harness): def test_snapshot_failure_skips_prune_and_warns(self): r = self.run_script(fail=True) self.assertIn("Failed to create snapshot", r.stderr) self.assertEqual([ln for ln in self.log_lines() if ln.startswith("destroy ")], []) if __name__ == "__main__": unittest.main()