aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/backup-system-file/test_backup_system_file.py161
-rw-r--r--tests/hypr-live-update-guard/test_hypr_live_update_guard.py95
-rw-r--r--tests/installer-steps/test_orchestrators.py117
-rw-r--r--tests/run-task/test_run_task.py172
4 files changed, 545 insertions, 0 deletions
diff --git a/tests/backup-system-file/test_backup_system_file.py b/tests/backup-system-file/test_backup_system_file.py
new file mode 100644
index 0000000..5d48d03
--- /dev/null
+++ b/tests/backup-system-file/test_backup_system_file.py
@@ -0,0 +1,161 @@
+"""Tests for the backup_system_file helper in the archsetup installer.
+
+backup_system_file snapshots a pre-existing system file to
+`<path>.archsetup.bak` before archsetup edits it in place, so a botched
+in-place edit (fstab, mkinitcpio.conf, sudoers, ...) is recoverable. It is
+idempotent: it never overwrites an existing backup, so the pristine original
+survives repeated edits within a run and across re-runs of the installer. It
+no-ops (success) when the target does not exist.
+
+These tests exercise the REAL function body, extracted from the `archsetup`
+script at run time (not a copy), so the production code path is what runs.
+Edits run against real temp files the test creates and tears down.
+
+Run from repo root:
+ python3 -m unittest tests.backup-system-file.test_backup_system_file
+"""
+
+import os
+import shutil
+import stat
+import subprocess
+import tempfile
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+ARCHSETUP = os.path.join(REPO_ROOT, "archsetup")
+
+
+class BackupHarness(unittest.TestCase):
+ """Source backup_system_file out of the real archsetup script and invoke it."""
+
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp(prefix="backup-system-file-test-")
+ # A bash wrapper that extracts just the backup_system_file function from
+ # the real installer and invokes it with the test's arg. Sourcing the
+ # sed-extracted function means we test the production code path, not a
+ # reimplementation. The helper is self-contained (prints its own
+ # warnings), so no logger stub is needed.
+ self.wrapper = os.path.join(self.tmp, "run.sh")
+ with open(self.wrapper, "w") as f:
+ f.write(
+ "#!/bin/bash\n"
+ 'ARCHSETUP="$1"; shift\n'
+ "source <(sed -n '/^backup_system_file() {/,/^}/p' \"$ARCHSETUP\")\n"
+ 'backup_system_file "$@"\n'
+ )
+ os.chmod(self.wrapper, 0o755)
+
+ def tearDown(self):
+ # Restore writability in case a test made a dir read-only.
+ for root, dirs, _ in os.walk(self.tmp):
+ for d in dirs:
+ os.chmod(os.path.join(root, d), 0o755)
+ shutil.rmtree(self.tmp, ignore_errors=True)
+
+ def run_backup(self, target):
+ return subprocess.run(
+ ["bash", self.wrapper, ARCHSETUP, target],
+ capture_output=True, text=True, timeout=10,
+ )
+
+ def write(self, name, content, mode=None):
+ path = os.path.join(self.tmp, name)
+ with open(path, "w") as f:
+ f.write(content)
+ if mode is not None:
+ os.chmod(path, mode)
+ return path
+
+
+# -----------------------------------------------------------------------------
+# Normal cases
+# -----------------------------------------------------------------------------
+
+class TestBackupNormal(BackupHarness):
+
+ def test_existing_file_is_backed_up_with_same_content(self):
+ target = self.write("fstab", "UUID=abc / ext4 defaults 0 1\n")
+ result = self.run_backup(target)
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ backup = target + ".archsetup.bak"
+ self.assertTrue(os.path.isfile(backup), "backup should be created")
+ with open(backup) as f:
+ self.assertEqual(f.read(), "UUID=abc / ext4 defaults 0 1\n")
+
+ def test_backup_preserves_mode(self):
+ # sudoers ships 0440; a restored backup must keep restrictive perms.
+ target = self.write("sudoers", "root ALL=(ALL) ALL\n", mode=0o440)
+ result = self.run_backup(target)
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ backup = target + ".archsetup.bak"
+ self.assertEqual(stat.S_IMODE(os.stat(backup).st_mode), 0o440)
+
+
+# -----------------------------------------------------------------------------
+# Boundary cases
+# -----------------------------------------------------------------------------
+
+class TestBackupBoundary(BackupHarness):
+
+ def test_existing_backup_is_not_overwritten(self):
+ # The pristine original must survive a later edit + second backup call.
+ target = self.write("pacman.conf", "PRISTINE\n")
+ self.assertEqual(self.run_backup(target).returncode, 0)
+ # Simulate archsetup editing the file in place, then backing up again.
+ with open(target, "w") as f:
+ f.write("EDITED\n")
+ result = self.run_backup(target)
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ with open(target + ".archsetup.bak") as f:
+ self.assertEqual(f.read(), "PRISTINE\n", "backup must stay pristine")
+
+ def test_missing_target_is_a_quiet_noop(self):
+ target = os.path.join(self.tmp, "never-existed.conf")
+ result = self.run_backup(target)
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ self.assertFalse(os.path.exists(target + ".archsetup.bak"))
+
+ def test_second_call_same_run_is_a_noop(self):
+ # A file edited twice in one run (e.g. mkinitcpio MODULES then HOOKS)
+ # gets backed up once; the second call must not error or re-copy.
+ target = self.write("mkinitcpio.conf", "HOOKS=(base udev)\n")
+ self.assertEqual(self.run_backup(target).returncode, 0)
+ backup = target + ".archsetup.bak"
+ first_mtime = os.stat(backup).st_mtime_ns
+ result = self.run_backup(target)
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ self.assertEqual(os.stat(backup).st_mtime_ns, first_mtime,
+ "backup must not be rewritten on the second call")
+
+
+# -----------------------------------------------------------------------------
+# Error cases
+# -----------------------------------------------------------------------------
+
+class TestBackupErrors(BackupHarness):
+
+ def test_empty_target_is_refused(self):
+ result = self.run_backup("")
+ self.assertNotEqual(result.returncode, 0)
+
+ def test_copy_failure_returns_nonzero(self):
+ # Target exists but its directory is read-only, so the .bak can't be
+ # written. The helper must report failure rather than silently skip.
+ subdir = os.path.join(self.tmp, "ro")
+ os.makedirs(subdir)
+ target = os.path.join(subdir, "fstab")
+ with open(target, "w") as f:
+ f.write("data\n")
+ os.chmod(subdir, 0o500) # r-x: owner cannot create the .bak here
+ try:
+ result = self.run_backup(target)
+ finally:
+ os.chmod(subdir, 0o755)
+ self.assertNotEqual(result.returncode, 0)
+ self.assertFalse(os.path.exists(target + ".archsetup.bak"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/hypr-live-update-guard/test_hypr_live_update_guard.py b/tests/hypr-live-update-guard/test_hypr_live_update_guard.py
new file mode 100644
index 0000000..5ec5ce8
--- /dev/null
+++ b/tests/hypr-live-update-guard/test_hypr_live_update_guard.py
@@ -0,0 +1,95 @@
+"""Tests for the hypr-live-update-guard pacman PreTransaction hook script.
+
+The guard aborts a live pacman upgrade of GPU/compositor runtime libraries
+(mesa, hyprland, wayland, GPU drivers) while a Hyprland session is running,
+so the compositor doesn't SIGABRT when a now-"(deleted)" library is next
+called. It reads the triggering package names on stdin (pacman NeedsTargets)
+and exits non-zero to abort the transaction (AbortOnFail) before any package
+is swapped. When Hyprland isn't running, or an override is set, it exits 0
+and the upgrade proceeds.
+
+Test seams (env vars the production script honors):
+ HYPR_GUARD_RUNNING 1/0 forces the Hyprland-running check (default: pgrep)
+ HYPR_ALLOW_LIVE_UPDATE 1 overrides the guard (proceed anyway)
+ HYPR_GUARD_SENTINEL path whose existence also overrides the guard
+
+Run from repo root:
+ python3 -m unittest tests.hypr-live-update-guard.test_hypr_live_update_guard
+"""
+
+import os
+import subprocess
+import tempfile
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+GUARD = os.path.join(REPO_ROOT, "scripts", "hypr-live-update-guard")
+
+
+def run_guard(stdin="mesa\n", running="1", allow=None, sentinel=None):
+ env = dict(os.environ)
+ env["HYPR_GUARD_RUNNING"] = running
+ if allow is not None:
+ env["HYPR_ALLOW_LIVE_UPDATE"] = allow
+ # Point the sentinel at a path that does not exist unless a test sets one,
+ # so the host's real /run state can't leak into the result.
+ env["HYPR_GUARD_SENTINEL"] = sentinel if sentinel else "/nonexistent/guard-sentinel"
+ return subprocess.run(
+ ["sh", GUARD],
+ input=stdin, capture_output=True, text=True, timeout=10, env=env,
+ )
+
+
+class HyprLiveUpdateGuard(unittest.TestCase):
+ # --- Normal cases ---------------------------------------------------
+
+ def test_running_with_dangerous_pkg_aborts(self):
+ r = run_guard(stdin="mesa\n", running="1")
+ self.assertEqual(r.returncode, 1, r.stderr)
+
+ def test_abort_message_names_the_package_and_tty_remedy(self):
+ r = run_guard(stdin="mesa\n", running="1")
+ self.assertIn("mesa", r.stderr)
+ self.assertIn("TTY", r.stderr)
+
+ def test_not_running_allows(self):
+ r = run_guard(stdin="mesa\n", running="0")
+ self.assertEqual(r.returncode, 0, r.stderr)
+
+ def test_not_running_is_silent(self):
+ r = run_guard(stdin="mesa\nhyprland\n", running="0")
+ self.assertEqual(r.stderr.strip(), "")
+
+ # --- Boundary cases -------------------------------------------------
+
+ def test_multiple_packages_all_listed(self):
+ r = run_guard(stdin="mesa\nhyprland\nvulkan-radeon\n", running="1")
+ self.assertEqual(r.returncode, 1)
+ for pkg in ("mesa", "hyprland", "vulkan-radeon"):
+ self.assertIn(pkg, r.stderr)
+
+ def test_running_with_empty_stdin_still_guards(self):
+ # The hook only fires when dangerous targets exist, so an empty target
+ # list shouldn't normally happen; if Hyprland is up, stay safe (abort).
+ r = run_guard(stdin="", running="1")
+ self.assertEqual(r.returncode, 1)
+
+ # --- Override / error cases -----------------------------------------
+
+ def test_env_override_proceeds_even_when_running(self):
+ r = run_guard(stdin="mesa\n", running="1", allow="1")
+ self.assertEqual(r.returncode, 0, r.stderr)
+
+ def test_sentinel_file_override_proceeds(self):
+ with tempfile.NamedTemporaryFile(prefix="guard-allow-") as f:
+ r = run_guard(stdin="mesa\n", running="1", sentinel=f.name)
+ self.assertEqual(r.returncode, 0, r.stderr)
+
+ def test_override_env_zero_does_not_bypass(self):
+ r = run_guard(stdin="mesa\n", running="1", allow="0")
+ self.assertEqual(r.returncode, 1, r.stderr)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/installer-steps/test_orchestrators.py b/tests/installer-steps/test_orchestrators.py
new file mode 100644
index 0000000..e62c198
--- /dev/null
+++ b/tests/installer-steps/test_orchestrators.py
@@ -0,0 +1,117 @@
+"""Characterization tests for the decomposed installer step orchestrators.
+
+The 2026 decomposition turned the giant step functions into thin
+orchestrators that call one named sub-function per concern. These tests pin
+the call SEQUENCE of each orchestrator: a dropped, added, or reordered
+sub-step call fails the test. They guard the wiring, not the sub-functions'
+own behavior (those mutate the system and are exercised by the VM harness).
+
+Method: sed-extract the orchestrator from the real `archsetup` (its body is
+now just `display` + sub-function calls), source it with `display` silenced
+and every sub-function replaced by a recorder that echoes its own name, run
+it, and assert stdout is the expected ordered list.
+
+Run from repo root:
+ python3 -m unittest tests.installer-steps.test_orchestrators
+"""
+
+import os
+import subprocess
+import textwrap
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+ARCHSETUP = os.path.join(REPO_ROOT, "archsetup")
+
+# orchestrator -> exact ordered sub-step calls
+ORCHESTRATORS = {
+ "essential_services": [
+ "configure_randomness", "configure_networking", "configure_power",
+ "configure_ssh_server", "configure_fail2ban", "configure_firewall",
+ "configure_service_discovery", "configure_job_scheduling",
+ "configure_package_cache", "configure_snapshots",
+ "configure_user_lingering",
+ ],
+ "prerequisites": [
+ "bootstrap_pacman_keyring", "install_required_software",
+ "configure_build_environment", "configure_package_mirrors",
+ ],
+ "developer_workstation": [
+ "install_programming_languages", "install_editors",
+ "install_android_utilities", "install_vpn_tools",
+ "install_devops_utilities",
+ ],
+ "boot_ux": [
+ "tighten_efi_permissions", "add_nvme_early_module",
+ "configure_initramfs_hook", "configure_encrypted_autologin",
+ "configure_tlp_power", "trim_firmware", "configure_grub",
+ ],
+ "user_customizations": [
+ "clone_user_repos", "stow_dotfiles", "prune_waybar_battery",
+ "refresh_desktop_caches", "configure_dconf_defaults",
+ "finalize_dotfiles", "create_user_directories",
+ ],
+}
+
+
+def run_orchestrator(func, stubs, extra_defs=""):
+ """Source `func` from archsetup with `stubs` recording their names."""
+ stub_defs = "\n".join(f"{s}() {{ echo {s}; }}" for s in stubs)
+ script = textwrap.dedent(f"""\
+ display() {{ :; }}
+ {stub_defs}
+ {extra_defs}
+ source <(sed -n '/^{func}() {{/,/^}}/p' "{ARCHSETUP}")
+ {func}
+ """)
+ result = subprocess.run(
+ ["bash", "-c", script],
+ capture_output=True, text=True, timeout=10,
+ )
+ return result
+
+
+class OrchestratorSequence(unittest.TestCase):
+ def test_each_orchestrator_calls_substeps_in_order(self):
+ for func, expected in ORCHESTRATORS.items():
+ with self.subTest(orchestrator=func):
+ result = run_orchestrator(func, expected)
+ self.assertEqual(result.returncode, 0, result.stderr)
+ got = result.stdout.split()
+ self.assertEqual(got, expected,
+ f"{func} call sequence drifted")
+
+
+class SnapshotDispatch(unittest.TestCase):
+ """configure_snapshots branches on filesystem; pin each branch."""
+
+ SUBS = ["configure_zfs_snapshots", "configure_btrfs_snapshots"]
+
+ def test_zfs_root_runs_zfs_snapshots(self):
+ result = run_orchestrator(
+ "configure_snapshots", self.SUBS,
+ extra_defs="is_zfs_root() { return 0; }\nis_btrfs_root() { return 1; }",
+ )
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertEqual(result.stdout.split(), ["configure_zfs_snapshots"])
+
+ def test_btrfs_root_runs_btrfs_snapshots(self):
+ result = run_orchestrator(
+ "configure_snapshots", self.SUBS,
+ extra_defs="is_zfs_root() { return 1; }\nis_btrfs_root() { return 0; }",
+ )
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertEqual(result.stdout.split(), ["configure_btrfs_snapshots"])
+
+ def test_other_filesystem_runs_neither(self):
+ result = run_orchestrator(
+ "configure_snapshots", self.SUBS,
+ extra_defs="is_zfs_root() { return 1; }\nis_btrfs_root() { return 1; }",
+ )
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertEqual(result.stdout.split(), [])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/run-task/test_run_task.py b/tests/run-task/test_run_task.py
new file mode 100644
index 0000000..35036dd
--- /dev/null
+++ b/tests/run-task/test_run_task.py
@@ -0,0 +1,172 @@
+"""Tests for the run_task / enable_service helpers in the archsetup installer.
+
+run_task is the installer's describe-run-warn primitive. It replaces the
+hand-written idiom that recurs ~100 times across the script:
+
+ action="enabling rngd service" && display "task" "$action"
+ systemctl enable rngd >> "$logfile" 2>&1 || error_warn "$action" "$?"
+
+as a single call:
+
+ run_task "enabling rngd service" systemctl enable rngd
+
+It announces the task via display, runs the command with stdout+stderr
+appended to $logfile, and on failure calls error_warn with the command's
+real exit code (non-fatal). enable_service is a thin wrapper that enables
+one or more systemd units with the conventional "enabling <unit> service"
+wording.
+
+These tests exercise the REAL function bodies, extracted from the
+`archsetup` script at run time (not a copy), with recording stubs standing
+in for display, error_warn, and systemctl. The command run by run_task is
+genuinely executed.
+
+Run from repo root:
+ python3 -m unittest tests.run-task.test_run_task
+"""
+
+import os
+import shutil
+import subprocess
+import tempfile
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+ARCHSETUP = os.path.join(REPO_ROOT, "archsetup")
+
+# A bash harness that sources the real run_task + enable_service out of the
+# installer, with recording stubs for their dependencies. Each stub appends a
+# tab-separated record to a file named by an env var, so the Python side can
+# assert what was called. The real command passed to run_task still runs.
+WRAPPER = r"""#!/bin/bash
+ARCHSETUP="$1"; shift
+logfile="$LOGFILE"
+
+display() { printf '%s\t%s\n' "$1" "$2" >> "$DISPLAY_LOG"; }
+error_warn() { printf '%s\t%s\n' "$1" "$2" >> "$ERRWARN_LOG"; return 1; }
+systemctl() { printf 'systemctl %s\n' "$*"; }
+
+source <(sed -n '/^run_task() {/,/^}/p' "$ARCHSETUP")
+source <(sed -n '/^enable_service() {/,/^}/p' "$ARCHSETUP")
+
+"$@"
+"""
+
+
+class RunTaskHarness(unittest.TestCase):
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp(prefix="run-task-test-")
+ self.wrapper = os.path.join(self.tmp, "run.sh")
+ with open(self.wrapper, "w") as f:
+ f.write(WRAPPER)
+ os.chmod(self.wrapper, 0o755)
+ self.logfile = os.path.join(self.tmp, "install.log")
+ self.display_log = os.path.join(self.tmp, "display.log")
+ self.errwarn_log = os.path.join(self.tmp, "errwarn.log")
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp, ignore_errors=True)
+
+ def call(self, *args):
+ env = dict(os.environ)
+ env["LOGFILE"] = self.logfile
+ env["DISPLAY_LOG"] = self.display_log
+ env["ERRWARN_LOG"] = self.errwarn_log
+ return subprocess.run(
+ ["bash", self.wrapper, ARCHSETUP, *args],
+ capture_output=True, text=True, timeout=10, env=env,
+ )
+
+ def read(self, path):
+ if not os.path.exists(path):
+ return ""
+ with open(path) as f:
+ return f.read()
+
+ # --- Normal cases -----------------------------------------------------
+
+ def test_run_task_success_announces_and_runs(self):
+ result = self.call("run_task", "doing a thing", "true")
+ self.assertEqual(result.returncode, 0, result.stderr)
+ # Announced as a "task" with the exact description.
+ self.assertEqual(self.read(self.display_log), "task\tdoing a thing\n")
+ # No warning on success.
+ self.assertEqual(self.read(self.errwarn_log), "")
+
+ def test_run_task_captures_command_output_to_logfile(self):
+ result = self.call("run_task", "echo something", "echo", "hello-from-cmd")
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertIn("hello-from-cmd", self.read(self.logfile))
+ # Command output is logged, not printed to the terminal.
+ self.assertNotIn("hello-from-cmd", result.stdout)
+
+ def test_run_task_captures_stderr_to_logfile(self):
+ # `ls` of a missing path writes to stderr; it must land in the logfile.
+ missing = os.path.join(self.tmp, "no-such-path")
+ self.call("run_task", "listing", "ls", missing)
+ self.assertIn("no-such-path", self.read(self.logfile))
+
+ def test_run_task_preserves_multiple_arguments(self):
+ self.call("run_task", "multi-arg", "printf", "%s|%s|%s", "a", "b", "c")
+ self.assertIn("a|b|c", self.read(self.logfile))
+
+ def test_run_task_preserves_arguments_with_spaces(self):
+ self.call("run_task", "spacey", "printf", "[%s]", "two words")
+ self.assertIn("[two words]", self.read(self.logfile))
+
+ # --- enable_service ---------------------------------------------------
+
+ def test_enable_service_single_unit(self):
+ self.call("enable_service", "rngd")
+ self.assertEqual(self.read(self.display_log), "task\tenabling rngd service\n")
+ self.assertIn("systemctl enable rngd", self.read(self.logfile))
+
+ def test_enable_service_multiple_units(self):
+ self.call("enable_service", "foo", "bar", "baz")
+ disp = self.read(self.display_log)
+ self.assertIn("task\tenabling foo service\n", disp)
+ self.assertIn("task\tenabling bar service\n", disp)
+ self.assertIn("task\tenabling baz service\n", disp)
+ log = self.read(self.logfile)
+ self.assertIn("systemctl enable foo", log)
+ self.assertIn("systemctl enable bar", log)
+ self.assertIn("systemctl enable baz", log)
+
+ # --- Error cases ------------------------------------------------------
+
+ def test_run_task_failure_warns_with_description(self):
+ result = self.call("run_task", "failing thing", "false")
+ self.assertNotEqual(result.returncode, 0)
+ self.assertEqual(self.read(self.errwarn_log), "failing thing\t1\n")
+
+ def test_run_task_failure_propagates_real_exit_code(self):
+ # `bash -c 'exit 42'` must surface 42 to error_warn, not a clobbered 0.
+ self.call("run_task", "exit-42", "bash", "-c", "exit 42")
+ self.assertEqual(self.read(self.errwarn_log), "exit-42\t42\n")
+
+ def test_enable_service_failure_warns_per_unit(self):
+ # Override systemctl to fail; each unit should produce a warning.
+ env = dict(os.environ)
+ env["LOGFILE"] = self.logfile
+ env["DISPLAY_LOG"] = self.display_log
+ env["ERRWARN_LOG"] = self.errwarn_log
+ # Re-create wrapper with a failing systemctl stub for this case.
+ failing = os.path.join(self.tmp, "run-fail.sh")
+ with open(failing, "w") as f:
+ f.write(WRAPPER.replace(
+ "systemctl() { printf 'systemctl %s\\n' \"$*\"; }",
+ "systemctl() { printf 'systemctl %s\\n' \"$*\"; return 1; }",
+ ))
+ os.chmod(failing, 0o755)
+ subprocess.run(
+ ["bash", failing, ARCHSETUP, "enable_service", "alpha", "beta"],
+ capture_output=True, text=True, timeout=10, env=env,
+ )
+ warns = self.read(self.errwarn_log)
+ self.assertIn("enabling alpha service\t1\n", warns)
+ self.assertIn("enabling beta service\t1\n", warns)
+
+
+if __name__ == "__main__":
+ unittest.main()