1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
"""Unit tests for scripts/zfs-pre-snapshot.
The script snapshots the root dataset before a pacman transaction and prunes to
the most recent KEEP pre-pacman snapshots. These tests drive the real script
with a fake zfs on PATH (snapshot/destroy logged, list returns a fixture set)
and env-rooted state, so nothing touches a real pool.
"""
import os
import shutil
import subprocess
import tempfile
import time
import unittest
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
SCRIPT = os.path.join(REPO_ROOT, "scripts/zfs-pre-snapshot")
FAKE_ZFS = os.path.join(os.path.dirname(__file__), "fake-zfs")
DATASET = "tank/test"
# Five pre-pacman snapshots oldest->newest (zfs list -s creation is ascending),
# plus one autosnap that the grep filter must ignore.
SNAPSHOTS = "\n".join([
f"{DATASET}@autosnap_2026-01-01",
f"{DATASET}@pre-pacman_2026-06-01",
f"{DATASET}@pre-pacman_2026-06-02",
f"{DATASET}@pre-pacman_2026-06-03",
f"{DATASET}@pre-pacman_2026-06-04",
f"{DATASET}@pre-pacman_2026-06-05",
]) + "\n"
class Harness(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="zfs-pre-snap-")
self.bin = os.path.join(self.tmp, "bin")
os.makedirs(self.bin)
shutil.copy(FAKE_ZFS, os.path.join(self.bin, "zfs"))
self.log = os.path.join(self.tmp, "zfs.log")
self.snaps = os.path.join(self.tmp, "snaps")
with open(self.snaps, "w") as f:
f.write(SNAPSHOTS)
self.lock = os.path.join(self.tmp, "lock")
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def run_script(self, keep="3", fail=False, snaps=None):
env = os.environ.copy()
env["PATH"] = self.bin + os.pathsep + env["PATH"]
env["ZFS_PRE_DATASET"] = DATASET
env["ZFS_PRE_LOCKFILE"] = self.lock
env["ZFS_PRE_KEEP"] = keep
env["FAKE_ZFS_LOG"] = self.log
env["FAKE_ZFS_SNAPSHOTS"] = snaps if snaps is not None else self.snaps
if fail:
env["FAKE_ZFS_SNAPSHOT_FAIL"] = "1"
return subprocess.run([SCRIPT], env=env, capture_output=True, text=True,
timeout=15)
def log_lines(self):
try:
with open(self.log) as f:
return [ln for ln in f.read().splitlines() if ln.strip()]
except FileNotFoundError:
return []
class TestSnapshot(Harness):
def test_creates_a_pre_pacman_snapshot(self):
self.run_script()
snaps = [ln for ln in self.log_lines() if ln.startswith("snapshot ")]
self.assertEqual(len(snaps), 1)
self.assertIn(f"snapshot {DATASET}@pre-pacman_", snaps[0])
def test_skips_when_lockfile_is_fresh(self):
# A lockfile newer than MIN_INTERVAL → no snapshot this run.
open(self.lock, "w").close()
os.utime(self.lock, (time.time(), time.time()))
self.run_script()
self.assertEqual([ln for ln in self.log_lines()
if ln.startswith("snapshot ")], [])
class TestPrune(Harness):
def test_prunes_oldest_beyond_keep(self):
# 5 pre-pacman snapshots, KEEP=3 → the two oldest are destroyed.
self.run_script(keep="3")
destroyed = [ln.split(" ", 1)[1] for ln in self.log_lines()
if ln.startswith("destroy ")]
self.assertEqual(destroyed,
[f"{DATASET}@pre-pacman_2026-06-01",
f"{DATASET}@pre-pacman_2026-06-02"])
def test_never_destroys_non_pre_pacman_snapshots(self):
self.run_script(keep="1")
destroyed = [ln for ln in self.log_lines() if ln.startswith("destroy ")]
self.assertFalse(any("autosnap" in ln for ln in destroyed))
def test_no_prune_when_at_or_under_keep(self):
# KEEP=5 with exactly 5 pre-pacman snapshots → nothing destroyed.
self.run_script(keep="5")
self.assertEqual([ln for ln in self.log_lines()
if ln.startswith("destroy ")], [])
class TestError(Harness):
def test_snapshot_failure_skips_prune_and_warns(self):
r = self.run_script(fail=True)
self.assertIn("Failed to create snapshot", r.stderr)
self.assertEqual([ln for ln in self.log_lines()
if ln.startswith("destroy ")], [])
if __name__ == "__main__":
unittest.main()
|