aboutsummaryrefslogtreecommitdiff
path: root/tests/zfs-pre-snapshot/test_zfs_pre_snapshot.py
blob: ed7731bfdf19b5fa187f45de78e1857f01aa8834 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Unit tests for scripts/zfs-pre-snapshot.

The script snapshots the root dataset before a pacman transaction and prunes to
the most recent KEEP pre-pacman snapshots. These tests drive the real script
with a fake zfs on PATH (snapshot/destroy logged, list returns a fixture set)
and env-rooted state, so nothing touches a real pool.
"""

import os
import shutil
import subprocess
import tempfile
import time
import unittest

REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
SCRIPT = os.path.join(REPO_ROOT, "scripts/zfs-pre-snapshot")
FAKE_ZFS = os.path.join(os.path.dirname(__file__), "fake-zfs")

DATASET = "tank/test"
# Five pre-pacman snapshots oldest->newest (zfs list -s creation is ascending),
# plus one autosnap that the grep filter must ignore.
SNAPSHOTS = "\n".join([
    f"{DATASET}@autosnap_2026-01-01",
    f"{DATASET}@pre-pacman_2026-06-01",
    f"{DATASET}@pre-pacman_2026-06-02",
    f"{DATASET}@pre-pacman_2026-06-03",
    f"{DATASET}@pre-pacman_2026-06-04",
    f"{DATASET}@pre-pacman_2026-06-05",
]) + "\n"


class Harness(unittest.TestCase):
    def setUp(self):
        self.tmp = tempfile.mkdtemp(prefix="zfs-pre-snap-")
        self.bin = os.path.join(self.tmp, "bin")
        os.makedirs(self.bin)
        shutil.copy(FAKE_ZFS, os.path.join(self.bin, "zfs"))
        self.log = os.path.join(self.tmp, "zfs.log")
        self.snaps = os.path.join(self.tmp, "snaps")
        with open(self.snaps, "w") as f:
            f.write(SNAPSHOTS)
        self.lock = os.path.join(self.tmp, "lock")

    def tearDown(self):
        shutil.rmtree(self.tmp, ignore_errors=True)

    def run_script(self, keep="3", fail=False, snaps=None):
        env = os.environ.copy()
        env["PATH"] = self.bin + os.pathsep + env["PATH"]
        env["ZFS_PRE_DATASET"] = DATASET
        env["ZFS_PRE_LOCKFILE"] = self.lock
        env["ZFS_PRE_KEEP"] = keep
        env["FAKE_ZFS_LOG"] = self.log
        env["FAKE_ZFS_SNAPSHOTS"] = snaps if snaps is not None else self.snaps
        if fail:
            env["FAKE_ZFS_SNAPSHOT_FAIL"] = "1"
        return subprocess.run([SCRIPT], env=env, capture_output=True, text=True,
                              timeout=15)

    def log_lines(self):
        try:
            with open(self.log) as f:
                return [ln for ln in f.read().splitlines() if ln.strip()]
        except FileNotFoundError:
            return []


class TestSnapshot(Harness):
    def test_creates_a_pre_pacman_snapshot(self):
        self.run_script()
        snaps = [ln for ln in self.log_lines() if ln.startswith("snapshot ")]
        self.assertEqual(len(snaps), 1)
        self.assertIn(f"snapshot {DATASET}@pre-pacman_", snaps[0])

    def test_skips_when_lockfile_is_fresh(self):
        # A lockfile newer than MIN_INTERVAL → no snapshot this run.
        open(self.lock, "w").close()
        os.utime(self.lock, (time.time(), time.time()))
        self.run_script()
        self.assertEqual([ln for ln in self.log_lines()
                          if ln.startswith("snapshot ")], [])


class TestPrune(Harness):
    def test_prunes_oldest_beyond_keep(self):
        # 5 pre-pacman snapshots, KEEP=3 → the two oldest are destroyed.
        self.run_script(keep="3")
        destroyed = [ln.split(" ", 1)[1] for ln in self.log_lines()
                     if ln.startswith("destroy ")]
        self.assertEqual(destroyed,
                         [f"{DATASET}@pre-pacman_2026-06-01",
                          f"{DATASET}@pre-pacman_2026-06-02"])

    def test_never_destroys_non_pre_pacman_snapshots(self):
        self.run_script(keep="1")
        destroyed = [ln for ln in self.log_lines() if ln.startswith("destroy ")]
        self.assertFalse(any("autosnap" in ln for ln in destroyed))

    def test_no_prune_when_at_or_under_keep(self):
        # KEEP=5 with exactly 5 pre-pacman snapshots → nothing destroyed.
        self.run_script(keep="5")
        self.assertEqual([ln for ln in self.log_lines()
                          if ln.startswith("destroy ")], [])


class TestError(Harness):
    def test_snapshot_failure_skips_prune_and_warns(self):
        r = self.run_script(fail=True)
        self.assertIn("Failed to create snapshot", r.stderr)
        self.assertEqual([ln for ln in self.log_lines()
                          if ln.startswith("destroy ")], [])


if __name__ == "__main__":
    unittest.main()