aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/hypr-live-update-guard/test_hypr_live_update_guard.py95
-rw-r--r--tests/installer-steps/test_orchestrators.py117
-rw-r--r--tests/run-task/test_run_task.py172
3 files changed, 384 insertions, 0 deletions
diff --git a/tests/hypr-live-update-guard/test_hypr_live_update_guard.py b/tests/hypr-live-update-guard/test_hypr_live_update_guard.py
new file mode 100644
index 0000000..5ec5ce8
--- /dev/null
+++ b/tests/hypr-live-update-guard/test_hypr_live_update_guard.py
@@ -0,0 +1,95 @@
+"""Tests for the hypr-live-update-guard pacman PreTransaction hook script.
+
+The guard aborts a live pacman upgrade of GPU/compositor runtime libraries
+(mesa, hyprland, wayland, GPU drivers) while a Hyprland session is running,
+so the compositor doesn't SIGABRT when a now-"(deleted)" library is next
+called. It reads the triggering package names on stdin (pacman NeedsTargets)
+and exits non-zero to abort the transaction (AbortOnFail) before any package
+is swapped. When Hyprland isn't running, or an override is set, it exits 0
+and the upgrade proceeds.
+
+Test seams (env vars the production script honors):
+ HYPR_GUARD_RUNNING 1/0 forces the Hyprland-running check (default: pgrep)
+ HYPR_ALLOW_LIVE_UPDATE 1 overrides the guard (proceed anyway)
+ HYPR_GUARD_SENTINEL path whose existence also overrides the guard
+
+Run from repo root:
+ python3 -m unittest tests.hypr-live-update-guard.test_hypr_live_update_guard
+"""
+
+import os
+import subprocess
+import tempfile
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+GUARD = os.path.join(REPO_ROOT, "scripts", "hypr-live-update-guard")
+
+
+def run_guard(stdin="mesa\n", running="1", allow=None, sentinel=None):
+ env = dict(os.environ)
+ env["HYPR_GUARD_RUNNING"] = running
+ if allow is not None:
+ env["HYPR_ALLOW_LIVE_UPDATE"] = allow
+ # Point the sentinel at a path that does not exist unless a test sets one,
+ # so the host's real /run state can't leak into the result.
+ env["HYPR_GUARD_SENTINEL"] = sentinel if sentinel else "/nonexistent/guard-sentinel"
+ return subprocess.run(
+ ["sh", GUARD],
+ input=stdin, capture_output=True, text=True, timeout=10, env=env,
+ )
+
+
+class HyprLiveUpdateGuard(unittest.TestCase):
+ # --- Normal cases ---------------------------------------------------
+
+ def test_running_with_dangerous_pkg_aborts(self):
+ r = run_guard(stdin="mesa\n", running="1")
+ self.assertEqual(r.returncode, 1, r.stderr)
+
+ def test_abort_message_names_the_package_and_tty_remedy(self):
+ r = run_guard(stdin="mesa\n", running="1")
+ self.assertIn("mesa", r.stderr)
+ self.assertIn("TTY", r.stderr)
+
+ def test_not_running_allows(self):
+ r = run_guard(stdin="mesa\n", running="0")
+ self.assertEqual(r.returncode, 0, r.stderr)
+
+ def test_not_running_is_silent(self):
+ r = run_guard(stdin="mesa\nhyprland\n", running="0")
+ self.assertEqual(r.stderr.strip(), "")
+
+ # --- Boundary cases -------------------------------------------------
+
+ def test_multiple_packages_all_listed(self):
+ r = run_guard(stdin="mesa\nhyprland\nvulkan-radeon\n", running="1")
+ self.assertEqual(r.returncode, 1)
+ for pkg in ("mesa", "hyprland", "vulkan-radeon"):
+ self.assertIn(pkg, r.stderr)
+
+ def test_running_with_empty_stdin_still_guards(self):
+ # The hook only fires when dangerous targets exist, so an empty target
+ # list shouldn't normally happen; if Hyprland is up, stay safe (abort).
+ r = run_guard(stdin="", running="1")
+ self.assertEqual(r.returncode, 1)
+
+ # --- Override / error cases -----------------------------------------
+
+ def test_env_override_proceeds_even_when_running(self):
+ r = run_guard(stdin="mesa\n", running="1", allow="1")
+ self.assertEqual(r.returncode, 0, r.stderr)
+
+ def test_sentinel_file_override_proceeds(self):
+ with tempfile.NamedTemporaryFile(prefix="guard-allow-") as f:
+ r = run_guard(stdin="mesa\n", running="1", sentinel=f.name)
+ self.assertEqual(r.returncode, 0, r.stderr)
+
+ def test_override_env_zero_does_not_bypass(self):
+ r = run_guard(stdin="mesa\n", running="1", allow="0")
+ self.assertEqual(r.returncode, 1, r.stderr)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/installer-steps/test_orchestrators.py b/tests/installer-steps/test_orchestrators.py
new file mode 100644
index 0000000..e62c198
--- /dev/null
+++ b/tests/installer-steps/test_orchestrators.py
@@ -0,0 +1,117 @@
+"""Characterization tests for the decomposed installer step orchestrators.
+
+The 2026 decomposition turned the giant step functions into thin
+orchestrators that call one named sub-function per concern. These tests pin
+the call SEQUENCE of each orchestrator: a dropped, added, or reordered
+sub-step call fails the test. They guard the wiring, not the sub-functions'
+own behavior (those mutate the system and are exercised by the VM harness).
+
+Method: sed-extract the orchestrator from the real `archsetup` (its body is
+now just `display` + sub-function calls), source it with `display` silenced
+and every sub-function replaced by a recorder that echoes its own name, run
+it, and assert stdout is the expected ordered list.
+
+Run from repo root:
+ python3 -m unittest tests.installer-steps.test_orchestrators
+"""
+
+import os
+import subprocess
+import textwrap
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+ARCHSETUP = os.path.join(REPO_ROOT, "archsetup")
+
+# orchestrator -> exact ordered sub-step calls
+ORCHESTRATORS = {
+ "essential_services": [
+ "configure_randomness", "configure_networking", "configure_power",
+ "configure_ssh_server", "configure_fail2ban", "configure_firewall",
+ "configure_service_discovery", "configure_job_scheduling",
+ "configure_package_cache", "configure_snapshots",
+ "configure_user_lingering",
+ ],
+ "prerequisites": [
+ "bootstrap_pacman_keyring", "install_required_software",
+ "configure_build_environment", "configure_package_mirrors",
+ ],
+ "developer_workstation": [
+ "install_programming_languages", "install_editors",
+ "install_android_utilities", "install_vpn_tools",
+ "install_devops_utilities",
+ ],
+ "boot_ux": [
+ "tighten_efi_permissions", "add_nvme_early_module",
+ "configure_initramfs_hook", "configure_encrypted_autologin",
+ "configure_tlp_power", "trim_firmware", "configure_grub",
+ ],
+ "user_customizations": [
+ "clone_user_repos", "stow_dotfiles", "prune_waybar_battery",
+ "refresh_desktop_caches", "configure_dconf_defaults",
+ "finalize_dotfiles", "create_user_directories",
+ ],
+}
+
+
+def run_orchestrator(func, stubs, extra_defs=""):
+ """Source `func` from archsetup with `stubs` recording their names."""
+ stub_defs = "\n".join(f"{s}() {{ echo {s}; }}" for s in stubs)
+ script = textwrap.dedent(f"""\
+ display() {{ :; }}
+ {stub_defs}
+ {extra_defs}
+ source <(sed -n '/^{func}() {{/,/^}}/p' "{ARCHSETUP}")
+ {func}
+ """)
+ result = subprocess.run(
+ ["bash", "-c", script],
+ capture_output=True, text=True, timeout=10,
+ )
+ return result
+
+
+class OrchestratorSequence(unittest.TestCase):
+ def test_each_orchestrator_calls_substeps_in_order(self):
+ for func, expected in ORCHESTRATORS.items():
+ with self.subTest(orchestrator=func):
+ result = run_orchestrator(func, expected)
+ self.assertEqual(result.returncode, 0, result.stderr)
+ got = result.stdout.split()
+ self.assertEqual(got, expected,
+ f"{func} call sequence drifted")
+
+
+class SnapshotDispatch(unittest.TestCase):
+ """configure_snapshots branches on filesystem; pin each branch."""
+
+ SUBS = ["configure_zfs_snapshots", "configure_btrfs_snapshots"]
+
+ def test_zfs_root_runs_zfs_snapshots(self):
+ result = run_orchestrator(
+ "configure_snapshots", self.SUBS,
+ extra_defs="is_zfs_root() { return 0; }\nis_btrfs_root() { return 1; }",
+ )
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertEqual(result.stdout.split(), ["configure_zfs_snapshots"])
+
+ def test_btrfs_root_runs_btrfs_snapshots(self):
+ result = run_orchestrator(
+ "configure_snapshots", self.SUBS,
+ extra_defs="is_zfs_root() { return 1; }\nis_btrfs_root() { return 0; }",
+ )
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertEqual(result.stdout.split(), ["configure_btrfs_snapshots"])
+
+ def test_other_filesystem_runs_neither(self):
+ result = run_orchestrator(
+ "configure_snapshots", self.SUBS,
+ extra_defs="is_zfs_root() { return 1; }\nis_btrfs_root() { return 1; }",
+ )
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertEqual(result.stdout.split(), [])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/run-task/test_run_task.py b/tests/run-task/test_run_task.py
new file mode 100644
index 0000000..35036dd
--- /dev/null
+++ b/tests/run-task/test_run_task.py
@@ -0,0 +1,172 @@
+"""Tests for the run_task / enable_service helpers in the archsetup installer.
+
+run_task is the installer's describe-run-warn primitive. It replaces the
+hand-written idiom that recurs ~100 times across the script:
+
+ action="enabling rngd service" && display "task" "$action"
+ systemctl enable rngd >> "$logfile" 2>&1 || error_warn "$action" "$?"
+
+as a single call:
+
+ run_task "enabling rngd service" systemctl enable rngd
+
+It announces the task via display, runs the command with stdout+stderr
+appended to $logfile, and on failure calls error_warn with the command's
+real exit code (non-fatal). enable_service is a thin wrapper that enables
+one or more systemd units with the conventional "enabling <unit> service"
+wording.
+
+These tests exercise the REAL function bodies, extracted from the
+`archsetup` script at run time (not a copy), with recording stubs standing
+in for display, error_warn, and systemctl. The command run by run_task is
+genuinely executed.
+
+Run from repo root:
+ python3 -m unittest tests.run-task.test_run_task
+"""
+
+import os
+import shutil
+import subprocess
+import tempfile
+import unittest
+
+
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+ARCHSETUP = os.path.join(REPO_ROOT, "archsetup")
+
+# A bash harness that sources the real run_task + enable_service out of the
+# installer, with recording stubs for their dependencies. Each stub appends a
+# tab-separated record to a file named by an env var, so the Python side can
+# assert what was called. The real command passed to run_task still runs.
+WRAPPER = r"""#!/bin/bash
+ARCHSETUP="$1"; shift
+logfile="$LOGFILE"
+
+display() { printf '%s\t%s\n' "$1" "$2" >> "$DISPLAY_LOG"; }
+error_warn() { printf '%s\t%s\n' "$1" "$2" >> "$ERRWARN_LOG"; return 1; }
+systemctl() { printf 'systemctl %s\n' "$*"; }
+
+source <(sed -n '/^run_task() {/,/^}/p' "$ARCHSETUP")
+source <(sed -n '/^enable_service() {/,/^}/p' "$ARCHSETUP")
+
+"$@"
+"""
+
+
+class RunTaskHarness(unittest.TestCase):
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp(prefix="run-task-test-")
+ self.wrapper = os.path.join(self.tmp, "run.sh")
+ with open(self.wrapper, "w") as f:
+ f.write(WRAPPER)
+ os.chmod(self.wrapper, 0o755)
+ self.logfile = os.path.join(self.tmp, "install.log")
+ self.display_log = os.path.join(self.tmp, "display.log")
+ self.errwarn_log = os.path.join(self.tmp, "errwarn.log")
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp, ignore_errors=True)
+
+ def call(self, *args):
+ env = dict(os.environ)
+ env["LOGFILE"] = self.logfile
+ env["DISPLAY_LOG"] = self.display_log
+ env["ERRWARN_LOG"] = self.errwarn_log
+ return subprocess.run(
+ ["bash", self.wrapper, ARCHSETUP, *args],
+ capture_output=True, text=True, timeout=10, env=env,
+ )
+
+ def read(self, path):
+ if not os.path.exists(path):
+ return ""
+ with open(path) as f:
+ return f.read()
+
+ # --- Normal cases -----------------------------------------------------
+
+ def test_run_task_success_announces_and_runs(self):
+ result = self.call("run_task", "doing a thing", "true")
+ self.assertEqual(result.returncode, 0, result.stderr)
+ # Announced as a "task" with the exact description.
+ self.assertEqual(self.read(self.display_log), "task\tdoing a thing\n")
+ # No warning on success.
+ self.assertEqual(self.read(self.errwarn_log), "")
+
+ def test_run_task_captures_command_output_to_logfile(self):
+ result = self.call("run_task", "echo something", "echo", "hello-from-cmd")
+ self.assertEqual(result.returncode, 0, result.stderr)
+ self.assertIn("hello-from-cmd", self.read(self.logfile))
+ # Command output is logged, not printed to the terminal.
+ self.assertNotIn("hello-from-cmd", result.stdout)
+
+ def test_run_task_captures_stderr_to_logfile(self):
+ # `ls` of a missing path writes to stderr; it must land in the logfile.
+ missing = os.path.join(self.tmp, "no-such-path")
+ self.call("run_task", "listing", "ls", missing)
+ self.assertIn("no-such-path", self.read(self.logfile))
+
+ def test_run_task_preserves_multiple_arguments(self):
+ self.call("run_task", "multi-arg", "printf", "%s|%s|%s", "a", "b", "c")
+ self.assertIn("a|b|c", self.read(self.logfile))
+
+ def test_run_task_preserves_arguments_with_spaces(self):
+ self.call("run_task", "spacey", "printf", "[%s]", "two words")
+ self.assertIn("[two words]", self.read(self.logfile))
+
+ # --- enable_service ---------------------------------------------------
+
+ def test_enable_service_single_unit(self):
+ self.call("enable_service", "rngd")
+ self.assertEqual(self.read(self.display_log), "task\tenabling rngd service\n")
+ self.assertIn("systemctl enable rngd", self.read(self.logfile))
+
+ def test_enable_service_multiple_units(self):
+ self.call("enable_service", "foo", "bar", "baz")
+ disp = self.read(self.display_log)
+ self.assertIn("task\tenabling foo service\n", disp)
+ self.assertIn("task\tenabling bar service\n", disp)
+ self.assertIn("task\tenabling baz service\n", disp)
+ log = self.read(self.logfile)
+ self.assertIn("systemctl enable foo", log)
+ self.assertIn("systemctl enable bar", log)
+ self.assertIn("systemctl enable baz", log)
+
+ # --- Error cases ------------------------------------------------------
+
+ def test_run_task_failure_warns_with_description(self):
+ result = self.call("run_task", "failing thing", "false")
+ self.assertNotEqual(result.returncode, 0)
+ self.assertEqual(self.read(self.errwarn_log), "failing thing\t1\n")
+
+ def test_run_task_failure_propagates_real_exit_code(self):
+ # `bash -c 'exit 42'` must surface 42 to error_warn, not a clobbered 0.
+ self.call("run_task", "exit-42", "bash", "-c", "exit 42")
+ self.assertEqual(self.read(self.errwarn_log), "exit-42\t42\n")
+
+ def test_enable_service_failure_warns_per_unit(self):
+ # Override systemctl to fail; each unit should produce a warning.
+ env = dict(os.environ)
+ env["LOGFILE"] = self.logfile
+ env["DISPLAY_LOG"] = self.display_log
+ env["ERRWARN_LOG"] = self.errwarn_log
+ # Re-create wrapper with a failing systemctl stub for this case.
+ failing = os.path.join(self.tmp, "run-fail.sh")
+ with open(failing, "w") as f:
+ f.write(WRAPPER.replace(
+ "systemctl() { printf 'systemctl %s\\n' \"$*\"; }",
+ "systemctl() { printf 'systemctl %s\\n' \"$*\"; return 1; }",
+ ))
+ os.chmod(failing, 0o755)
+ subprocess.run(
+ ["bash", failing, ARCHSETUP, "enable_service", "alpha", "beta"],
+ capture_output=True, text=True, timeout=10, env=env,
+ )
+ warns = self.read(self.errwarn_log)
+ self.assertIn("enabling alpha service\t1\n", warns)
+ self.assertIn("enabling beta service\t1\n", warns)
+
+
+if __name__ == "__main__":
+ unittest.main()