diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/hypr-live-update-guard/test_hypr_live_update_guard.py | 95 | ||||
| -rw-r--r-- | tests/installer-steps/test_orchestrators.py | 117 | ||||
| -rw-r--r-- | tests/run-task/test_run_task.py | 172 |
3 files changed, 384 insertions, 0 deletions
diff --git a/tests/hypr-live-update-guard/test_hypr_live_update_guard.py b/tests/hypr-live-update-guard/test_hypr_live_update_guard.py new file mode 100644 index 0000000..5ec5ce8 --- /dev/null +++ b/tests/hypr-live-update-guard/test_hypr_live_update_guard.py @@ -0,0 +1,95 @@ +"""Tests for the hypr-live-update-guard pacman PreTransaction hook script. + +The guard aborts a live pacman upgrade of GPU/compositor runtime libraries +(mesa, hyprland, wayland, GPU drivers) while a Hyprland session is running, +so the compositor doesn't SIGABRT when a now-"(deleted)" library is next +called. It reads the triggering package names on stdin (pacman NeedsTargets) +and exits non-zero to abort the transaction (AbortOnFail) before any package +is swapped. When Hyprland isn't running, or an override is set, it exits 0 +and the upgrade proceeds. + +Test seams (env vars the production script honors): + HYPR_GUARD_RUNNING 1/0 forces the Hyprland-running check (default: pgrep) + HYPR_ALLOW_LIVE_UPDATE 1 overrides the guard (proceed anyway) + HYPR_GUARD_SENTINEL path whose existence also overrides the guard + +Run from repo root: + python3 -m unittest tests.hypr-live-update-guard.test_hypr_live_update_guard +""" + +import os +import subprocess +import tempfile +import unittest + + +REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +GUARD = os.path.join(REPO_ROOT, "scripts", "hypr-live-update-guard") + + +def run_guard(stdin="mesa\n", running="1", allow=None, sentinel=None): + env = dict(os.environ) + env["HYPR_GUARD_RUNNING"] = running + if allow is not None: + env["HYPR_ALLOW_LIVE_UPDATE"] = allow + # Point the sentinel at a path that does not exist unless a test sets one, + # so the host's real /run state can't leak into the result. + env["HYPR_GUARD_SENTINEL"] = sentinel if sentinel else "/nonexistent/guard-sentinel" + return subprocess.run( + ["sh", GUARD], + input=stdin, capture_output=True, text=True, timeout=10, env=env, + ) + + +class HyprLiveUpdateGuard(unittest.TestCase): + # --- Normal cases --------------------------------------------------- + + def test_running_with_dangerous_pkg_aborts(self): + r = run_guard(stdin="mesa\n", running="1") + self.assertEqual(r.returncode, 1, r.stderr) + + def test_abort_message_names_the_package_and_tty_remedy(self): + r = run_guard(stdin="mesa\n", running="1") + self.assertIn("mesa", r.stderr) + self.assertIn("TTY", r.stderr) + + def test_not_running_allows(self): + r = run_guard(stdin="mesa\n", running="0") + self.assertEqual(r.returncode, 0, r.stderr) + + def test_not_running_is_silent(self): + r = run_guard(stdin="mesa\nhyprland\n", running="0") + self.assertEqual(r.stderr.strip(), "") + + # --- Boundary cases ------------------------------------------------- + + def test_multiple_packages_all_listed(self): + r = run_guard(stdin="mesa\nhyprland\nvulkan-radeon\n", running="1") + self.assertEqual(r.returncode, 1) + for pkg in ("mesa", "hyprland", "vulkan-radeon"): + self.assertIn(pkg, r.stderr) + + def test_running_with_empty_stdin_still_guards(self): + # The hook only fires when dangerous targets exist, so an empty target + # list shouldn't normally happen; if Hyprland is up, stay safe (abort). + r = run_guard(stdin="", running="1") + self.assertEqual(r.returncode, 1) + + # --- Override / error cases ----------------------------------------- + + def test_env_override_proceeds_even_when_running(self): + r = run_guard(stdin="mesa\n", running="1", allow="1") + self.assertEqual(r.returncode, 0, r.stderr) + + def test_sentinel_file_override_proceeds(self): + with tempfile.NamedTemporaryFile(prefix="guard-allow-") as f: + r = run_guard(stdin="mesa\n", running="1", sentinel=f.name) + self.assertEqual(r.returncode, 0, r.stderr) + + def test_override_env_zero_does_not_bypass(self): + r = run_guard(stdin="mesa\n", running="1", allow="0") + self.assertEqual(r.returncode, 1, r.stderr) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/installer-steps/test_orchestrators.py b/tests/installer-steps/test_orchestrators.py new file mode 100644 index 0000000..e62c198 --- /dev/null +++ b/tests/installer-steps/test_orchestrators.py @@ -0,0 +1,117 @@ +"""Characterization tests for the decomposed installer step orchestrators. + +The 2026 decomposition turned the giant step functions into thin +orchestrators that call one named sub-function per concern. These tests pin +the call SEQUENCE of each orchestrator: a dropped, added, or reordered +sub-step call fails the test. They guard the wiring, not the sub-functions' +own behavior (those mutate the system and are exercised by the VM harness). + +Method: sed-extract the orchestrator from the real `archsetup` (its body is +now just `display` + sub-function calls), source it with `display` silenced +and every sub-function replaced by a recorder that echoes its own name, run +it, and assert stdout is the expected ordered list. + +Run from repo root: + python3 -m unittest tests.installer-steps.test_orchestrators +""" + +import os +import subprocess +import textwrap +import unittest + + +REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +ARCHSETUP = os.path.join(REPO_ROOT, "archsetup") + +# orchestrator -> exact ordered sub-step calls +ORCHESTRATORS = { + "essential_services": [ + "configure_randomness", "configure_networking", "configure_power", + "configure_ssh_server", "configure_fail2ban", "configure_firewall", + "configure_service_discovery", "configure_job_scheduling", + "configure_package_cache", "configure_snapshots", + "configure_user_lingering", + ], + "prerequisites": [ + "bootstrap_pacman_keyring", "install_required_software", + "configure_build_environment", "configure_package_mirrors", + ], + "developer_workstation": [ + "install_programming_languages", "install_editors", + "install_android_utilities", "install_vpn_tools", + "install_devops_utilities", + ], + "boot_ux": [ + "tighten_efi_permissions", "add_nvme_early_module", + "configure_initramfs_hook", "configure_encrypted_autologin", + "configure_tlp_power", "trim_firmware", "configure_grub", + ], + "user_customizations": [ + "clone_user_repos", "stow_dotfiles", "prune_waybar_battery", + "refresh_desktop_caches", "configure_dconf_defaults", + "finalize_dotfiles", "create_user_directories", + ], +} + + +def run_orchestrator(func, stubs, extra_defs=""): + """Source `func` from archsetup with `stubs` recording their names.""" + stub_defs = "\n".join(f"{s}() {{ echo {s}; }}" for s in stubs) + script = textwrap.dedent(f"""\ + display() {{ :; }} + {stub_defs} + {extra_defs} + source <(sed -n '/^{func}() {{/,/^}}/p' "{ARCHSETUP}") + {func} + """) + result = subprocess.run( + ["bash", "-c", script], + capture_output=True, text=True, timeout=10, + ) + return result + + +class OrchestratorSequence(unittest.TestCase): + def test_each_orchestrator_calls_substeps_in_order(self): + for func, expected in ORCHESTRATORS.items(): + with self.subTest(orchestrator=func): + result = run_orchestrator(func, expected) + self.assertEqual(result.returncode, 0, result.stderr) + got = result.stdout.split() + self.assertEqual(got, expected, + f"{func} call sequence drifted") + + +class SnapshotDispatch(unittest.TestCase): + """configure_snapshots branches on filesystem; pin each branch.""" + + SUBS = ["configure_zfs_snapshots", "configure_btrfs_snapshots"] + + def test_zfs_root_runs_zfs_snapshots(self): + result = run_orchestrator( + "configure_snapshots", self.SUBS, + extra_defs="is_zfs_root() { return 0; }\nis_btrfs_root() { return 1; }", + ) + self.assertEqual(result.returncode, 0, result.stderr) + self.assertEqual(result.stdout.split(), ["configure_zfs_snapshots"]) + + def test_btrfs_root_runs_btrfs_snapshots(self): + result = run_orchestrator( + "configure_snapshots", self.SUBS, + extra_defs="is_zfs_root() { return 1; }\nis_btrfs_root() { return 0; }", + ) + self.assertEqual(result.returncode, 0, result.stderr) + self.assertEqual(result.stdout.split(), ["configure_btrfs_snapshots"]) + + def test_other_filesystem_runs_neither(self): + result = run_orchestrator( + "configure_snapshots", self.SUBS, + extra_defs="is_zfs_root() { return 1; }\nis_btrfs_root() { return 1; }", + ) + self.assertEqual(result.returncode, 0, result.stderr) + self.assertEqual(result.stdout.split(), []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/run-task/test_run_task.py b/tests/run-task/test_run_task.py new file mode 100644 index 0000000..35036dd --- /dev/null +++ b/tests/run-task/test_run_task.py @@ -0,0 +1,172 @@ +"""Tests for the run_task / enable_service helpers in the archsetup installer. + +run_task is the installer's describe-run-warn primitive. It replaces the +hand-written idiom that recurs ~100 times across the script: + + action="enabling rngd service" && display "task" "$action" + systemctl enable rngd >> "$logfile" 2>&1 || error_warn "$action" "$?" + +as a single call: + + run_task "enabling rngd service" systemctl enable rngd + +It announces the task via display, runs the command with stdout+stderr +appended to $logfile, and on failure calls error_warn with the command's +real exit code (non-fatal). enable_service is a thin wrapper that enables +one or more systemd units with the conventional "enabling <unit> service" +wording. + +These tests exercise the REAL function bodies, extracted from the +`archsetup` script at run time (not a copy), with recording stubs standing +in for display, error_warn, and systemctl. The command run by run_task is +genuinely executed. + +Run from repo root: + python3 -m unittest tests.run-task.test_run_task +""" + +import os +import shutil +import subprocess +import tempfile +import unittest + + +REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +ARCHSETUP = os.path.join(REPO_ROOT, "archsetup") + +# A bash harness that sources the real run_task + enable_service out of the +# installer, with recording stubs for their dependencies. Each stub appends a +# tab-separated record to a file named by an env var, so the Python side can +# assert what was called. The real command passed to run_task still runs. +WRAPPER = r"""#!/bin/bash +ARCHSETUP="$1"; shift +logfile="$LOGFILE" + +display() { printf '%s\t%s\n' "$1" "$2" >> "$DISPLAY_LOG"; } +error_warn() { printf '%s\t%s\n' "$1" "$2" >> "$ERRWARN_LOG"; return 1; } +systemctl() { printf 'systemctl %s\n' "$*"; } + +source <(sed -n '/^run_task() {/,/^}/p' "$ARCHSETUP") +source <(sed -n '/^enable_service() {/,/^}/p' "$ARCHSETUP") + +"$@" +""" + + +class RunTaskHarness(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp(prefix="run-task-test-") + self.wrapper = os.path.join(self.tmp, "run.sh") + with open(self.wrapper, "w") as f: + f.write(WRAPPER) + os.chmod(self.wrapper, 0o755) + self.logfile = os.path.join(self.tmp, "install.log") + self.display_log = os.path.join(self.tmp, "display.log") + self.errwarn_log = os.path.join(self.tmp, "errwarn.log") + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def call(self, *args): + env = dict(os.environ) + env["LOGFILE"] = self.logfile + env["DISPLAY_LOG"] = self.display_log + env["ERRWARN_LOG"] = self.errwarn_log + return subprocess.run( + ["bash", self.wrapper, ARCHSETUP, *args], + capture_output=True, text=True, timeout=10, env=env, + ) + + def read(self, path): + if not os.path.exists(path): + return "" + with open(path) as f: + return f.read() + + # --- Normal cases ----------------------------------------------------- + + def test_run_task_success_announces_and_runs(self): + result = self.call("run_task", "doing a thing", "true") + self.assertEqual(result.returncode, 0, result.stderr) + # Announced as a "task" with the exact description. + self.assertEqual(self.read(self.display_log), "task\tdoing a thing\n") + # No warning on success. + self.assertEqual(self.read(self.errwarn_log), "") + + def test_run_task_captures_command_output_to_logfile(self): + result = self.call("run_task", "echo something", "echo", "hello-from-cmd") + self.assertEqual(result.returncode, 0, result.stderr) + self.assertIn("hello-from-cmd", self.read(self.logfile)) + # Command output is logged, not printed to the terminal. + self.assertNotIn("hello-from-cmd", result.stdout) + + def test_run_task_captures_stderr_to_logfile(self): + # `ls` of a missing path writes to stderr; it must land in the logfile. + missing = os.path.join(self.tmp, "no-such-path") + self.call("run_task", "listing", "ls", missing) + self.assertIn("no-such-path", self.read(self.logfile)) + + def test_run_task_preserves_multiple_arguments(self): + self.call("run_task", "multi-arg", "printf", "%s|%s|%s", "a", "b", "c") + self.assertIn("a|b|c", self.read(self.logfile)) + + def test_run_task_preserves_arguments_with_spaces(self): + self.call("run_task", "spacey", "printf", "[%s]", "two words") + self.assertIn("[two words]", self.read(self.logfile)) + + # --- enable_service --------------------------------------------------- + + def test_enable_service_single_unit(self): + self.call("enable_service", "rngd") + self.assertEqual(self.read(self.display_log), "task\tenabling rngd service\n") + self.assertIn("systemctl enable rngd", self.read(self.logfile)) + + def test_enable_service_multiple_units(self): + self.call("enable_service", "foo", "bar", "baz") + disp = self.read(self.display_log) + self.assertIn("task\tenabling foo service\n", disp) + self.assertIn("task\tenabling bar service\n", disp) + self.assertIn("task\tenabling baz service\n", disp) + log = self.read(self.logfile) + self.assertIn("systemctl enable foo", log) + self.assertIn("systemctl enable bar", log) + self.assertIn("systemctl enable baz", log) + + # --- Error cases ------------------------------------------------------ + + def test_run_task_failure_warns_with_description(self): + result = self.call("run_task", "failing thing", "false") + self.assertNotEqual(result.returncode, 0) + self.assertEqual(self.read(self.errwarn_log), "failing thing\t1\n") + + def test_run_task_failure_propagates_real_exit_code(self): + # `bash -c 'exit 42'` must surface 42 to error_warn, not a clobbered 0. + self.call("run_task", "exit-42", "bash", "-c", "exit 42") + self.assertEqual(self.read(self.errwarn_log), "exit-42\t42\n") + + def test_enable_service_failure_warns_per_unit(self): + # Override systemctl to fail; each unit should produce a warning. + env = dict(os.environ) + env["LOGFILE"] = self.logfile + env["DISPLAY_LOG"] = self.display_log + env["ERRWARN_LOG"] = self.errwarn_log + # Re-create wrapper with a failing systemctl stub for this case. + failing = os.path.join(self.tmp, "run-fail.sh") + with open(failing, "w") as f: + f.write(WRAPPER.replace( + "systemctl() { printf 'systemctl %s\\n' \"$*\"; }", + "systemctl() { printf 'systemctl %s\\n' \"$*\"; return 1; }", + )) + os.chmod(failing, 0o755) + subprocess.run( + ["bash", failing, ARCHSETUP, "enable_service", "alpha", "beta"], + capture_output=True, text=True, timeout=10, env=env, + ) + warns = self.read(self.errwarn_log) + self.assertIn("enabling alpha service\t1\n", warns) + self.assertIn("enabling beta service\t1\n", warns) + + +if __name__ == "__main__": + unittest.main() |
