diff --git a/include/sway/desktop/animation.h b/include/sway/desktop/animation.h index a11e9472..b80ef7f6 100644 --- a/include/sway/desktop/animation.h +++ b/include/sway/desktop/animation.h @@ -135,6 +135,12 @@ bool animation_animating(); // Are we in the middle of an animation for output? bool animation_animating_output(struct wlr_output *output); +// Manual stepping control for testing +void animation_set_manual_stepping(bool enabled); +void animation_step(uint32_t ms); +uint32_t animation_get_duration(); +uint32_t animation_get_elapsed_time(); + // Get the current parameters for the active animation void animation_get_values(double *t, double *x, double *y); diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..82cbd5bd --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = -n 4 diff --git a/scroll.lua b/scroll.lua index d9de7e90..f9737d6c 100644 --- a/scroll.lua +++ b/scroll.lua @@ -643,4 +643,52 @@ function scroll.add_callback(event, cb_func, cb_data) end --- @return integer function scroll.remove_callback(id) end +--- +--- Enables or disables manual stepping for animations. +--- When enabled, animations do not progress automatically with real time. +--- Instead, they stay at the current frame until scroll.animation_step is called. +--- +--- @param enabled boolean +--- +function scroll.animation_set_manual_stepping(enabled) end + +--- +--- Steps the current animation forward by the given amount of milliseconds. +--- Only has an effect if manual stepping is enabled. +--- +--- @param ms integer +--- +function scroll.animation_step(ms) end + +--- +--- Returns true if there is an active animation running. +--- +--- @return boolean +--- +function scroll.animating() end + +--- +--- Returns true if there are pending transactions that haven't been applied yet. +--- +--- @return boolean +--- +function scroll.pending_transactions() end + +--- +--- Returns the total duration of the current animation in milliseconds. +--- Returns 0 if there is no active animation. +--- +--- @return integer +--- +function scroll.animation_get_duration() end + +--- +--- Returns the elapsed time of the current animation in milliseconds. +--- Accounts for manual stepping if enabled. +--- Returns 0 if there is no active animation. +--- +--- @return integer +--- +function scroll.animation_get_elapsed_time() end + return scroll diff --git a/sway/desktop/animation.c b/sway/desktop/animation.c index 90c5a5cb..d431492b 100644 --- a/sway/desktop/animation.c +++ b/sway/desktop/animation.c @@ -167,6 +167,9 @@ struct sway_animation { struct sway_animation_callbacks default_callbacks; struct sway_animation_config config; + + bool manual_stepping; + uint32_t manual_time_ms; }; static struct sway_animation *animation = NULL; @@ -619,7 +622,11 @@ void animation_begin() { if (path) { animation_reset_path(path); animation->animating = true; - clock_gettime(CLOCK_MONOTONIC, &animation->start); + if (animation->manual_stepping) { + animation->manual_time_ms = 0; + } else { + clock_gettime(CLOCK_MONOTONIC, &animation->start); + } if (animation->current.callbacks.callback_begin) { animation->current.callbacks.callback_begin(animation->current.callbacks.callback_begin_data); } @@ -650,7 +657,12 @@ static bool animation_set_time(struct timespec *time) { if (!curve) { goto last; } - uint32_t diff = difftime_ms(&animation->start, time); + uint32_t diff; + if (animation->manual_stepping) { + diff = animation->manual_time_ms; + } else { + diff = difftime_ms(&animation->start, time); + } uint32_t duration = curve->duration_ms; animation->time = (double) diff / duration; if (animation->time <= 1.0) { @@ -661,7 +673,11 @@ static bool animation_set_time(struct timespec *time) { path->idx = path->curves->length - 1; goto last; } else { - addtime_ms(&animation->start, duration); + if (animation->manual_stepping) { + animation->manual_time_ms -= duration; + } else { + addtime_ms(&animation->start, duration); + } } } return false; @@ -883,3 +899,75 @@ void destroy_animation_curve(struct sway_animation_curve *curve) { } free(curve); } + +void animation_set_manual_stepping(bool enabled) { + if (animation) { + animation->manual_stepping = enabled; + if (enabled) { + animation->manual_time_ms = 0; + } + } +} + +void animation_step(uint32_t ms) { + if (animation && animation->manual_stepping && animation->animating) { + animation->manual_time_ms += ms; + for (int i = 0; i < root->outputs->length; ++i) { + struct sway_output *output = root->outputs->items[i]; + if (animation_animating_output(output->wlr_output)) { + animation_animate(output->wlr_output); + } + } + } +} + +uint32_t animation_get_duration() { + if (!animation || !animation->animating) { + return 0; + } + struct sway_animation_path *path = get_path(); + if (!path) { + return 0; + } + uint32_t duration = 0; + for (int i = 0; i < path->curves->length; ++i) { + struct sway_animation_curve *curve = path->curves->items[i]; + if (curve) { + duration += curve->duration_ms; + } + } + return duration; +} + +uint32_t animation_get_elapsed_time() { + if (!animation || !animation->animating) { + return 0; + } + struct sway_animation_path *path = get_path(); + if (!path) { + return 0; + } + if (animation->manual_stepping) { + uint32_t elapsed = 0; + for (int i = 0; i < path->idx; ++i) { + struct sway_animation_curve *curve = path->curves->items[i]; + if (curve) { + elapsed += curve->duration_ms; + } + } + elapsed += animation->manual_time_ms; + return elapsed; + } else { + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + uint32_t elapsed = 0; + for (int i = 0; i < path->idx; ++i) { + struct sway_animation_curve *curve = path->curves->items[i]; + if (curve) { + elapsed += curve->duration_ms; + } + } + elapsed += difftime_ms(&animation->start, &now); + return elapsed; + } +} diff --git a/sway/lua.c b/sway/lua.c index 9d7cf462..466ba3a2 100644 --- a/sway/lua.c +++ b/sway/lua.c @@ -11,6 +11,7 @@ #include "sway/output.h" #include "sway/desktop/animation.h" #include "sway/ipc-server.h" +#include "sway/server.h" #if 0 static void print_table(lua_State *L, int index); @@ -1613,7 +1614,53 @@ static int scroll_remove_callback(lua_State *L) { return 0; } +static int scroll_animation_set_manual_stepping(lua_State *L) { + int argc = lua_gettop(L); + if (argc < 1) { + return 0; + } + bool enabled = lua_toboolean(L, 1); + animation_set_manual_stepping(enabled); + return 0; +} + +static int scroll_animation_step(lua_State *L) { + int argc = lua_gettop(L); + if (argc < 1) { + return 0; + } + int ms = luaL_checkinteger(L, 1); + if (ms < 0) { + return 0; + } + animation_step(ms); + return 0; +} + +static int scroll_animating(lua_State *L) { + lua_pushboolean(L, animation_animating()); + return 1; +} + +static int scroll_pending_transactions(lua_State *L) { + bool pending = server.queued_transaction != NULL || server.pending_transaction != NULL || + server.dirty_nodes->length > 0; + lua_pushboolean(L, pending); + return 1; +} + +static int scroll_animation_get_duration(lua_State *L) { + lua_pushinteger(L, animation_get_duration()); + return 1; +} + +static int scroll_animation_get_elapsed_time(lua_State *L) { + lua_pushinteger(L, animation_get_elapsed_time()); + return 1; +} + // Module functions +/* clang-format off */ static luaL_Reg const scroll_lib[] = { { "log", scroll_log }, { "state_set_value", scroll_state_set_value }, @@ -1682,8 +1729,15 @@ static luaL_Reg const scroll_lib[] = { { "scratchpad_hide", scroll_scratchpad_hide }, { "add_callback", scroll_add_callback }, { "remove_callback", scroll_remove_callback }, + { "animation_set_manual_stepping", scroll_animation_set_manual_stepping }, + { "animation_step", scroll_animation_step }, + { "animating", scroll_animating }, + { "pending_transactions", scroll_pending_transactions }, + { "animation_get_duration", scroll_animation_get_duration }, + { "animation_get_elapsed_time", scroll_animation_get_elapsed_time }, { NULL, NULL } }; +/* clang-format on */ // Module Loader int luaopen_scroll(lua_State *L) { diff --git a/sway/scroll.5.scd b/sway/scroll.5.scd index 1d5f04e4..d10a37f0 100644 --- a/sway/scroll.5.scd +++ b/sway/scroll.5.scd @@ -2390,6 +2390,31 @@ scroll.command(nil, "set_size v 0.33333333; move left nomode") Removes a callback set earlier using *add_callback*. _id_ is the unique identifier returned by *add_callback*. +*animation_set_manual_stepping(enabled)* + Enables or disables manual stepping for animations. When enabled, + animations do not progress automatically with real time. Instead, they + stay at the current frame until *animation_step* is called. + +*animation_step(ms)* + Steps the current animation forward by the given amount of _ms_ + (milliseconds). Only has an effect if manual stepping is enabled. + +*animating()* + Returns _true_ if there is an active animation running. + +*pending_transactions()* + Returns _true_ if there are pending transactions that haven't been applied + yet. + +*animation_get_duration()* + Returns the total duration of the current animation in milliseconds, or + 0 if there is no active animation. + +*animation_get_elapsed_time()* + Returns the elapsed time of the current animation in milliseconds. + Accounts for manual stepping if enabled. Returns 0 if there is no active + animation. + Examples: Calling this script from the configuration file, you will get focus on every diff --git a/tests/conftest.py b/tests/conftest.py index 4531b51c..77027476 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,52 +10,109 @@ def pytest_addoption(parser: pytest.Parser) -> None: parser.addoption("--scroll", help="the scroll binary to test", default=None) +def _build_scroll() -> str: + # Auto-build using Meson/Ninja + print("\nBuilding scroll with Meson/Ninja...") + build_dir = os.path.abspath("./build") + if not os.path.exists(build_dir): + res = subprocess.run( + [ + "meson", + "setup", + "build", + "-Dwerror=false", + "-Db_sanitize=address", + "-Dbuildtype=debugoptimized", + ], + capture_output=True, + text=True, + ) + if res.returncode != 0: + pytest.exit( + f"Failed to setup build:\nStdout: {res.stdout}\nStderr: {res.stderr}" + ) + else: + # Ensure ASan is enabled + res = subprocess.run( + [ + "meson", + "configure", + "build", + "-Db_sanitize=address", + "-Dbuildtype=debugoptimized", + ], + capture_output=True, + text=True, + ) + if res.returncode != 0: + pytest.exit( + "Failed to configure build with ASan:\nStdout:" + f" {res.stdout}\nStderr: {res.stderr}" + ) + + # Run ninja to compile (incremental build) + res = subprocess.run(["ninja", "-C", "build"], capture_output=True, text=True) + if res.returncode != 0: + pytest.exit( + f"Failed to build scroll:\nStdout: {res.stdout}\nStderr: {res.stderr}" + ) + + return os.path.join(build_dir, "sway", "scroll") + + @pytest.fixture(scope="session") def scroll_compositor_binary(request: pytest.FixtureRequest) -> str: binary_path: str = request.config.getoption("scroll") if not binary_path: - # Auto-build using Meson/Ninja - print("\nBuilding scroll with Meson/Ninja...") - build_dir = os.path.abspath("./build") - if not os.path.exists(build_dir): - res = subprocess.run( - ["meson", "setup", "build", "-Dwerror=false", "-Db_sanitize=address"], - capture_output=True, - text=True, - ) - if res.returncode != 0: - pytest.exit( - f"Failed to setup build:\nStdout: {res.stdout}\nStderr: {res.stderr}" - ) + # Check if we are running under xdist + try: + worker_id = request.getfixturevalue("worker_id") + except Exception: + worker_id = "master" + + if worker_id == "master": + binary_path = _build_scroll() else: - # Ensure ASan is enabled - res = subprocess.run( - ["meson", "configure", "build", "-Db_sanitize=address"], - capture_output=True, - text=True, - ) - if res.returncode != 0: - pytest.exit( - f"Failed to configure build with ASan:\nStdout: {res.stdout}\nStderr: {res.stderr}" - ) + tmp_path_factory = request.getfixturevalue("tmp_path_factory") + shared_dir = tmp_path_factory.getbasetemp().parent + lock_path = shared_dir / "scroll_build.lock" + status_path = shared_dir / "scroll_build.status" - # Run ninja to compile (incremental build) - res = subprocess.run(["ninja", "-C", "build"], capture_output=True, text=True) - if res.returncode != 0: - pytest.exit( - f"Failed to build scroll:\nStdout: {res.stdout}\nStderr: {res.stderr}" - ) + import fcntl - binary_path = os.path.join(build_dir, "sway", "scroll") + # Open with 'a' to avoid truncating while another process might have it locked + with open(lock_path, "a") as lock_file: + fcntl.flock(lock_file, fcntl.LOCK_EX) + try: + if status_path.exists(): + binary_path = status_path.read_text().strip() + else: + binary_path = _build_scroll() + status_path.write_text(binary_path) + finally: + fcntl.flock(lock_file, fcntl.LOCK_UN) else: binary_path = os.path.abspath(binary_path) assert os.path.exists(binary_path), f"Binary not found at {binary_path}" + + # Set up PATH to include build directories so that the compositor can find + # our newly built scrollbar, swaymsg, swaynag, etc. + build_dir = Path(binary_path).parent.parent + old_path = os.environ.get("PATH", "") + build_paths = [ + str(build_dir / "sway"), + str(build_dir / "swaymsg"), + str(build_dir / "swaybar"), + str(build_dir / "swaynag"), + ] + os.environ["PATH"] = ":".join(build_paths) + ":" + old_path + return binary_path @pytest.fixture(scope="session") -def scroll_compositor( +def _scroll_compositor_session( scroll_compositor_binary: str, tmp_path_factory: pytest.TempPathFactory ) -> Generator[ScrollInstance, None, None]: temp_dir: Path = tmp_path_factory.mktemp("scroll") @@ -63,6 +120,14 @@ def scroll_compositor( yield inst +@pytest.fixture(scope="function") +def scroll_compositor( + _scroll_compositor_session: ScrollInstance, +) -> Generator[ScrollInstance, None, None]: + yield _scroll_compositor_session + _scroll_compositor_session.reset() + + @pytest.fixture(scope="function") def fresh_compositor( scroll_compositor_binary: str, tmp_path: Path diff --git a/tests/test_animation_offset.py b/tests/test_animation_offset.py index 7cd55108..a149bb34 100644 --- a/tests/test_animation_offset.py +++ b/tests/test_animation_offset.py @@ -1,4 +1,3 @@ -import time from typing import Generator from pathlib import Path import pytest @@ -29,13 +28,13 @@ def test_animation_offset_unintended_move( try: with wayland_client(inst, "client1"): wait_for_client_map(inst, "client1") - time.sleep(0.5) + inst.wait_for_idle() with wayland_client(inst, "client2"): wait_for_client_map(inst, "client2") - time.sleep(0.5) + inst.wait_for_idle() with wayland_client(inst, "client3"): wait_for_client_map(inst, "client3") - time.sleep(0.5) + inst.wait_for_idle() tree = inst.get_tree() @@ -77,11 +76,13 @@ def find_views(node, result): print("Focusing rightmost...") res = inst.cmd(f"[con_id={v3['id']}] focus") assert res[0]["success"] - time.sleep(1.0) # wait for focus scroll to settle + inst.wait_for_idle() + + # Enable manual stepping for the swap animation + inst.set_manual_stepping(True) # Swap leftmost and middle by moving leftmost right, using its ID as context - print("Swapping leftmost and middle in background...") - # We use execute_lua to run scroll.command with v1['id'] context + print("Swapping leftmost and middle...") res_lua = inst.execute_lua( f"return scroll.command({v1['id']}, 'move right')" ) @@ -94,15 +95,19 @@ def find_views(node, result): query_id = parent_id if parent_id is not None else v3["id"] print(f"Querying container {query_id} (parent of {v3['id']})") - # Query position of client3 during animation + # Query position of client3 during animation by manually stepping positions = [] - start_time = time.time() - while time.time() - start_time < 2.0: + # Total animation duration is 5000ms. We step 2000ms total to match the original test's 2.0s limit. + # 40 steps of 50ms = 2000ms. + for _ in range(40): + inst.animation_step(50) geom = inst.execute_lua( f"return scroll.container_get_animated_geometry({query_id})" ) positions.append(geom) - time.sleep(0.05) + + # Restore normal stepping + inst.set_manual_stepping(False) print(f"Positions of client3: {positions}") diff --git a/tests/test_clients.py b/tests/test_clients.py index b29a7c69..aa8d4706 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -1,9 +1,9 @@ import os -import subprocess -import time from pathlib import Path -import pytest +import subprocess from conftest import ScrollInstance +import pytest +from test_utils import wait_for_client_map def test_wayland_client(scroll_compositor: ScrollInstance) -> None: @@ -23,31 +23,12 @@ def test_wayland_client(scroll_compositor: ScrollInstance) -> None: [str(client_path), title, app_id], env=env ) - view_info: dict | None = None - tries: int = 0 - while tries < 50: - view_info = scroll_compositor.execute_lua(""" - local view = scroll.focused_view() - if view then - return { - id = view, - title = scroll.view_get_title(view), - app_id = scroll.view_get_app_id(view) - } - end - """) - if ( - view_info - and view_info.get("title") == title - and view_info.get("app_id") == app_id - ): - break - - time.sleep(0.1) - tries += 1 - - assert tries < 50, "Timed out waiting for client to map or verify" - assert view_info is not None + view_id = wait_for_client_map(scroll_compositor, title) + app_id_actual = scroll_compositor.execute_lua( + f"return scroll.view_get_app_id({view_id})" + ) + assert app_id_actual == app_id + view_info = {"id": view_id} scroll_compositor.execute_lua(f"scroll.view_close({view_info['id']})") @@ -68,13 +49,7 @@ def test_x11_client(scroll_compositor: ScrollInstance) -> None: xauthority: str | None = scroll_compositor.getenv("XAUTHORITY") # Wait for Xwayland to be ready - xwayland_ready_tries: int = 0 - while xwayland_ready_tries < 50: - if "Xserver is ready" in scroll_compositor.read_log(): - break - time.sleep(0.1) - xwayland_ready_tries += 1 - assert xwayland_ready_tries < 50, "Timed out waiting for Xwayland to be ready" + scroll_compositor.wait_for_log_pattern("Xserver is ready", from_start=True) client_path: Path = Path("./build/tests/x11-test-client").resolve() if not client_path.exists(): @@ -93,36 +68,16 @@ def test_x11_client(scroll_compositor: ScrollInstance) -> None: [str(client_path), title, instance, class_name], env=env ) - view_info: dict | None = None - tries: int = 0 - while tries < 50: - view_info = scroll_compositor.execute_lua(""" - local view = scroll.focused_view() - if view then - return { - id = view, - title = scroll.view_get_title(view), - class = scroll.view_get_class(view), - shell = scroll.view_get_shell(view) - } - end - """) - if ( - view_info - and view_info.get("title") == title - and view_info.get("class") == class_name - ): - assert view_info.get("shell") == "xwayland" - break - - time.sleep(0.1) - tries += 1 - - if tries >= 50: - Path("build/test_x11_compositor.log").write_text(scroll_compositor.read_log()) - print("Wrote compositor log to build/test_x11_compositor.log") - assert tries < 50, "Timed out waiting for X11 client to map or verify" - assert view_info is not None + view_id = wait_for_client_map(scroll_compositor, title) + class_actual = scroll_compositor.execute_lua( + f"return scroll.view_get_class({view_id})" + ) + shell_actual = scroll_compositor.execute_lua( + f"return scroll.view_get_shell({view_id})" + ) + assert class_actual == class_name + assert shell_actual == "xwayland" + view_info = {"id": view_id} scroll_compositor.execute_lua(f"scroll.view_close({view_info['id']})") diff --git a/tests/test_focused_inactive_null_crash.py b/tests/test_focused_inactive_null_crash.py index c38f5bdb..73c4c2cc 100644 --- a/tests/test_focused_inactive_null_crash.py +++ b/tests/test_focused_inactive_null_crash.py @@ -1,4 +1,3 @@ -import time import json from conftest import ScrollInstance from test_utils import wayland_client, wait_for_client_map @@ -35,7 +34,7 @@ def test_focused_inactive_null_crash(fresh_compositor: ScrollInstance) -> None: # 5. Move w1 to workspace 2 (this detaches it, making col's focused_inactive_child NULL) fresh_compositor.cmd(f"[con_id={w1_id}] move container to workspace 2") - time.sleep(0.1) + fresh_compositor.wait_for_idle() # 6. Focus the parent column print(f"col_id: {col_id}") @@ -68,7 +67,6 @@ def test_focused_inactive_null_crash(fresh_compositor: ScrollInstance) -> None: pass ret = fresh_compositor.proc.wait(timeout=5) print(f"Compositor exit code: {ret}") - assert ret == 0, f"Compositor crashed or exited with error code {ret}" - print("Compositor Log:") print(fresh_compositor.read_log()) + assert ret == 0, f"Compositor crashed or exited with error code {ret}" diff --git a/tests/test_focused_inactive_uaf.py b/tests/test_focused_inactive_uaf.py index 1a19e5fc..8ca5ebb4 100644 --- a/tests/test_focused_inactive_uaf.py +++ b/tests/test_focused_inactive_uaf.py @@ -1,36 +1,35 @@ -import time from conftest import ScrollInstance from test_utils import wayland_client, wait_for_client_map -def test_focused_inactive_uaf(fresh_compositor: ScrollInstance) -> None: +def test_focused_inactive_uaf(scroll_compositor: ScrollInstance) -> None: # 1. Set mode to vertical to stack windows - fresh_compositor.cmd("set_mode v") + scroll_compositor.cmd("set_mode v") # 2. Create first view - with wayland_client(fresh_compositor, "client1") as client1: - wait_for_client_map(fresh_compositor, "client1") - w1_id = fresh_compositor.execute_lua("return scroll.focused_container()") + with wayland_client(scroll_compositor, "client1") as client1: + wait_for_client_map(scroll_compositor, "client1") + w1_id = scroll_compositor.execute_lua("return scroll.focused_container()") print(f"w1_id: {w1_id}") # 3. Create second view - with wayland_client(fresh_compositor, "client2"): - wait_for_client_map(fresh_compositor, "client2") - w2_id = fresh_compositor.execute_lua("return scroll.focused_container()") + with wayland_client(scroll_compositor, "client2"): + wait_for_client_map(scroll_compositor, "client2") + w2_id = scroll_compositor.execute_lua("return scroll.focused_container()") print(f"w2_id: {w2_id}") # Verify they are siblings (same parent) - w1_parent = fresh_compositor.execute_lua( + w1_parent = scroll_compositor.execute_lua( f"return scroll.container_get_parent({w1_id})" ) - w2_parent = fresh_compositor.execute_lua( + w2_parent = scroll_compositor.execute_lua( f"return scroll.container_get_parent({w2_id})" ) print(f"w1_parent: {w1_parent}, w2_parent: {w2_parent}") assert w1_parent == w2_parent, "w1 and w2 should be siblings" # Get Col1 ID (parent ID) - col1_id = fresh_compositor.execute_lua(""" + col1_id = scroll_compositor.execute_lua(""" local outputs = scroll.root_get_outputs() local workspaces = scroll.output_get_workspaces(outputs[1]) local ws1 @@ -47,29 +46,28 @@ def test_focused_inactive_uaf(fresh_compositor: ScrollInstance) -> None: assert col1_id == w1_parent, "Col1 ID should match parent ID" # 4. Focus w1 to make it the focused_inactive_child of the parent - fresh_compositor.cmd(f"[con_id={w1_id}] focus") - focused = fresh_compositor.execute_lua("return scroll.focused_container()") + scroll_compositor.cmd(f"[con_id={w1_id}] focus") + focused = scroll_compositor.execute_lua("return scroll.focused_container()") assert focused == w1_id, ( f"w1 ({w1_id}) should be focused, but got {focused}" ) # 5. Switch to workspace 2 - fresh_compositor.cmd("workspace 2") + scroll_compositor.cmd("workspace 2") # 6. Kill w1 (which is on workspace 1) - fresh_compositor.cmd(f"[con_id={w1_id}] kill") + scroll_compositor.cmd(f"[con_id={w1_id}] kill") - # Wait for client1 to exit (destruction complete) client1.wait(timeout=5) - time.sleep(0.1) + scroll_compositor.wait_for_idle() # 7. Switch back to workspace 1. - fresh_compositor.cmd("workspace 1") + scroll_compositor.cmd("workspace 1") # 8. Run move command targeted at Col1 (which has dangling focused_inactive_child) # This should trigger UAF in container_get_active_view! - fresh_compositor.cmd(f"[con_id={col1_id}] move left nomode") + scroll_compositor.cmd(f"[con_id={col1_id}] move left nomode") # If we survive, let's verify w2 is still there - focused = fresh_compositor.execute_lua("return scroll.focused_container()") + focused = scroll_compositor.execute_lua("return scroll.focused_container()") print(f"Focused after move: {focused}") diff --git a/tests/test_geometry.py b/tests/test_geometry.py index 6ce92324..70a8ef45 100644 --- a/tests/test_geometry.py +++ b/tests/test_geometry.py @@ -1,4 +1,3 @@ -import time from typing import Generator from pathlib import Path import pytest @@ -29,7 +28,7 @@ def test_static_geometry(scroll_compositor: ScrollInstance) -> None: assert con_id is not None # Wait for any map animation to settle - time.sleep(2.0) + inst.wait_for_idle() geom = inst.execute_lua(f"return scroll.container_get_geometry({con_id})") actual_geom = inst.execute_lua( @@ -90,9 +89,7 @@ def test_animating_geometry(animating_compositor: ScrollInstance) -> None: with wayland_client(inst, "client2"): wait_for_client_map(inst, "client2") - - # Wait for initial map animations to settle - time.sleep(0.5) + inst.wait_for_idle() geom_before = inst.execute_lua( f"return scroll.container_get_geometry({c1})" @@ -104,6 +101,9 @@ def test_animating_geometry(animating_compositor: ScrollInstance) -> None: print("Before move:", geom_before) + # Enable manual stepping + inst.set_manual_stepping(True) + # Trigger move inst.execute_lua(f"scroll.command({c1}, 'move right')") @@ -119,24 +119,23 @@ def test_animating_geometry(animating_compositor: ScrollInstance) -> None: print("Immediately after trigger (actual):", actual_geom_after_trigger) # The target geometry (geom) should have jumped to the final position. - # The actual geometry should still be close to the initial position. + # The actual geometry should still be exactly the initial position. assert geom_after_trigger != geom_before - assert actual_geom_after_trigger["x"] == pytest.approx( - actual_geom_before["x"], abs=10.0 - ) + assert actual_geom_after_trigger == actual_geom_before - # Monitor animation + # Monitor animation using manual steps actual_xs = [] target_xs = [] - start_time = time.time() - while time.time() - start_time < 2.5: # Animation is 2s + + # Step 10 times, 200ms each (total 2000ms = 2s) + for _ in range(10): + inst.animation_step(200) g = inst.execute_lua(f"return scroll.container_get_geometry({c1})") ag = inst.execute_lua( f"return scroll.container_get_animated_geometry({c1})" ) target_xs.append(g["x"]) actual_xs.append(ag["x"]) - time.sleep(0.1) print("Target Xs:", target_xs) print("Actual Xs:", actual_xs) @@ -146,14 +145,16 @@ def test_animating_geometry(animating_compositor: ScrollInstance) -> None: for tx in target_xs: assert tx == final_x - # Actual Xs should start near before_x and end at final_x - assert actual_xs[0] < final_x # Assuming it moved right + # Actual Xs should start near before_x (after first step) and end at final_x + assert actual_xs[0] < final_x assert actual_xs[-1] == pytest.approx(final_x, abs=1.0) - # Verify it is monotonically increasing (if it moved right) + # Verify it is monotonically increasing (since it moved right) for i in range(1, len(actual_xs)): assert actual_xs[i] >= actual_xs[i - 1] - 0.1 + inst.set_manual_stepping(False) + def test_invalid_geometry(scroll_compositor: ScrollInstance) -> None: inst = scroll_compositor diff --git a/tests/test_leak_two_clients.py b/tests/test_leak_two_clients.py new file mode 100644 index 00000000..8e1a7579 --- /dev/null +++ b/tests/test_leak_two_clients.py @@ -0,0 +1,43 @@ +from pathlib import Path +import subprocess +import time +from test_utils import run_compositor, wait_for_client_map, wayland_client + + +def test_leak_two_clients(scroll_compositor_binary: str, tmp_path: Path) -> None: + config_path = Path(__file__).parent.parent / "config.in" + config_content = config_path.read_text() + + fresh_compositor = None + try: + with run_compositor(scroll_compositor_binary, tmp_path, config_content) as fc: + fresh_compositor = fc + # Start two clients and keep them running + with wayland_client(fresh_compositor, "client1"): + wait_for_client_map(fresh_compositor, "client1") + + with wayland_client(fresh_compositor, "client2"): + wait_for_client_map(fresh_compositor, "client2") + + # Let them run a bit + time.sleep(0.5) + + # Terminate compositor while clients are still running + fresh_compositor.proc.terminate() + try: + ret = fresh_compositor.proc.wait(timeout=5) + except subprocess.TimeoutExpired: + fresh_compositor.proc.kill() + ret = fresh_compositor.proc.wait() + + log_content = fresh_compositor.read_log() + print(f"Compositor log:\n{log_content}") + + assert ret == 0, f"Compositor exited with code {ret}" + except Exception as e: + if fresh_compositor: + try: + print(f"Compositor log on failure:\n{fresh_compositor.read_log()}") + except Exception as le: + print(f"Failed to read compositor log: {le}") + raise e diff --git a/tests/test_lua_api.py b/tests/test_lua_api.py index 53e9e2f3..22360cf5 100644 --- a/tests/test_lua_api.py +++ b/tests/test_lua_api.py @@ -1,4 +1,8 @@ +from pathlib import Path +from typing import Generator from conftest import ScrollInstance +import pytest +from test_utils import run_compositor, wait_for_client_map, wayland_client def test_lua_comprehensive_api(scroll_compositor: ScrollInstance) -> None: @@ -91,3 +95,69 @@ def test_lua_comprehensive_api(scroll_compositor: ScrollInstance) -> None: assert len(invalid_output_ws) == 0 assert scroll_compositor.proc.poll() is None + + +@pytest.fixture(scope="function") +def animating_compositor( + scroll_compositor_binary: str, tmp_path: Path +) -> Generator[ScrollInstance, None, None]: + config: str = ( + "workspace 1\n" + "xwayland force\n" + "animations enabled yes\n" + "animations window_move yes 1000 var 3 [ 0.25 0.1 0.25 1.0 ]\n" # 1s (1000ms) duration + ) + with run_compositor(scroll_compositor_binary, tmp_path, config) as inst: + yield inst + + +def test_lua_animation_api(animating_compositor: ScrollInstance) -> None: + inst = animating_compositor + + # Initially not animating + assert inst.execute_lua("return scroll.animating()") is False + assert inst.execute_lua("return scroll.animation_get_duration()") == 0 + assert inst.execute_lua("return scroll.animation_get_elapsed_time()") == 0 + + with wayland_client(inst, "client1"): + v1 = wait_for_client_map(inst, "client1") + inst.wait_for_idle() + + with wayland_client(inst, "client2"): + wait_for_client_map(inst, "client2") + inst.wait_for_idle() + + # Enable manual stepping + inst.set_manual_stepping(True) + + # Trigger window move (starts animation) + res = inst.execute_lua(f"return scroll.command({v1}, 'move right')") + assert res == [0] + + # Now animating + assert inst.execute_lua("return scroll.animating()") is True + # Duration should be 1000ms (as configured) + assert inst.execute_lua("return scroll.animation_get_duration()") == 1000 + # Elapsed time should start at 0 + assert inst.execute_lua("return scroll.animation_get_elapsed_time()") == 0 + + # Step 200ms + inst.animation_step(200) + assert inst.execute_lua("return scroll.animating()") is True + assert inst.execute_lua("return scroll.animation_get_duration()") == 1000 + assert inst.execute_lua("return scroll.animation_get_elapsed_time()") == 200 + + # Step another 500ms (total 700ms) + inst.animation_step(500) + assert inst.execute_lua("return scroll.animating()") is True + assert inst.execute_lua("return scroll.animation_get_duration()") == 1000 + assert inst.execute_lua("return scroll.animation_get_elapsed_time()") == 700 + + # Step past the end (another 400ms, total 1100ms > 1000ms) + inst.animation_step(400) + # Animation should have ended + assert inst.execute_lua("return scroll.animating()") is False + assert inst.execute_lua("return scroll.animation_get_duration()") == 0 + assert inst.execute_lua("return scroll.animation_get_elapsed_time()") == 0 + + inst.set_manual_stepping(False) diff --git a/tests/test_lua_path.py b/tests/test_lua_path.py index 2fb6461b..6d7a671d 100644 --- a/tests/test_lua_path.py +++ b/tests/test_lua_path.py @@ -1,4 +1,3 @@ -import time from pathlib import Path from conftest import ScrollInstance from test_utils import run_compositor @@ -36,8 +35,7 @@ def test_lua_relative_path_config_load( config = "workspace 1\nxwayland force\nlua test_relative.lua\n" with run_compositor(binary_path, tmp_path, config) as inst: - time.sleep(1.0) - assert "RELATIVE_LOAD_SUCCESS" in inst.read_log() + inst.wait_for_log_pattern("RELATIVE_LOAD_SUCCESS", from_start=True) def test_lua_relative_path_subdir_config_load( @@ -51,8 +49,7 @@ def test_lua_relative_path_subdir_config_load( config = "workspace 1\nxwayland force\nlua scripts/test_relative2.lua\n" with run_compositor(binary_path, tmp_path, config) as inst: - time.sleep(1.0) - assert "RELATIVE_SUBDIR_LOAD_SUCCESS" in inst.read_log() + inst.wait_for_log_pattern("RELATIVE_SUBDIR_LOAD_SUCCESS", from_start=True) def test_lua_relative_glob_config_load( @@ -66,8 +63,7 @@ def test_lua_relative_glob_config_load( config = "workspace 1\nxwayland force\nlua scripts/test_glob*.lua\n" with run_compositor(binary_path, tmp_path, config) as inst: - time.sleep(1.0) - assert "RELATIVE_GLOB_LOAD_SUCCESS" in inst.read_log() + inst.wait_for_log_pattern("RELATIVE_GLOB_LOAD_SUCCESS", from_start=True) def test_lua_relative_glob_multiple_config_load( @@ -81,5 +77,4 @@ def test_lua_relative_glob_multiple_config_load( config = "workspace 1\nxwayland force\nlua scripts/test_glob*.lua\n" with run_compositor(binary_path, tmp_path, config) as inst: - time.sleep(1.0) - assert "Path expanded to multiple files" in inst.read_log() + inst.wait_for_log_pattern("Path expanded to multiple files", from_start=True) diff --git a/tests/test_move_cleanup_crash.py b/tests/test_move_cleanup_crash.py index e7d9949e..edca7f01 100644 --- a/tests/test_move_cleanup_crash.py +++ b/tests/test_move_cleanup_crash.py @@ -1,42 +1,35 @@ -import time from conftest import ScrollInstance from test_utils import wayland_client, wait_for_client_map -def test_move_cleanup_uaf_crash(fresh_compositor: ScrollInstance) -> None: +def test_move_cleanup_uaf_crash(scroll_compositor: ScrollInstance) -> None: # 1. Open Window 1 on Workspace 1 - with wayland_client(fresh_compositor, "Window 1"): - wait_for_client_map(fresh_compositor, "Window 1") + with wayland_client(scroll_compositor, "Window 1"): + wait_for_client_map(scroll_compositor, "Window 1") # Make Window 1 floating - res = fresh_compositor.cmd("floating enable") + res = scroll_compositor.cmd("floating enable") assert res and res[0]["success"], f"floating enable failed: {res}" # 2. Switch to Workspace 3 (so Workspace 1 becomes inactive) - res = fresh_compositor.cmd("workspace 3") + res = scroll_compositor.cmd("workspace 3") assert res and res[0]["success"], f"workspace 3 failed: {res}" - time.sleep(0.5) + scroll_compositor.wait_for_idle() # 3. Use criteria to move Window 1 from Workspace 1 to Workspace 2. # This should make Workspace 1 empty and inactive, so it gets destroyed. # Then the move command cleanup code should UAF on Workspace 1. try: - res = fresh_compositor.cmd( + res = scroll_compositor.cmd( '[title="Window 1"] move container to workspace 2' ) assert res and res[0]["success"], f"move failed: {res}" except Exception as e: - print(f"Compositor log:\n{fresh_compositor.read_log()}") + print(f"Compositor log:\n{scroll_compositor.read_log()}") raise e # Check if compositor process is still alive - log_content = fresh_compositor.read_log() + log_content = scroll_compositor.read_log() print(f"Compositor log:\n{log_content}") - if ( - fresh_compositor.proc.poll() is not None - or "node_table not initialized" in log_content - ): - pass - - assert fresh_compositor.proc.poll() is None, "Compositor crashed" + assert scroll_compositor.proc.poll() is None, "Compositor crashed" diff --git a/tests/test_normal_exit.py b/tests/test_normal_exit.py index 0a51e8e7..78c37907 100644 --- a/tests/test_normal_exit.py +++ b/tests/test_normal_exit.py @@ -1,5 +1,8 @@ +from pathlib import Path +import subprocess import time from conftest import ScrollInstance +from test_utils import run_compositor def test_normal_exit_no_errors(fresh_compositor: ScrollInstance) -> None: @@ -15,14 +18,10 @@ def test_normal_exit_no_errors(fresh_compositor: ScrollInstance) -> None: raise e # Wait for compositor to exit - tries = 0 - poll = None - while tries < 50: - poll = fresh_compositor.proc.poll() - if poll is not None: - break - time.sleep(0.1) - tries += 1 + try: + poll = fresh_compositor.proc.wait(timeout=5) + except subprocess.TimeoutExpired: + poll = None assert poll is not None, "Compositor did not exit" @@ -32,3 +31,44 @@ def test_normal_exit_no_errors(fresh_compositor: ScrollInstance) -> None: assert poll == 0, f"Compositor exited with non-zero code: {poll}" assert "node_table not initialized" not in log_content + + +def test_normal_exit_with_bar(scroll_compositor_binary: str, tmp_path: Path) -> None: + config_content = """ +workspace 1 +xwayland force +animations enabled no +bar { + scrollbar_command scrollbar +} +""" + fresh_compositor = None + try: + with run_compositor(scroll_compositor_binary, tmp_path, config_content) as fc: + fresh_compositor = fc + + # Let it run a bit to ensure swaybar starts + time.sleep(0.5) + + # Send exit command + try: + res = fresh_compositor.cmd("exit") + assert res and res[0]["success"], f"Exit command failed: {res}" + except EOFError: + pass + + ret = fresh_compositor.proc.wait(timeout=5) + assert ret == 0, f"Compositor exited with code {ret}" + + log_content = fresh_compositor.read_log() + print(f"Compositor log:\n{log_content}") + assert "ERROR: LeakSanitizer" not in log_content, ( + "Leak detected in compositor or helper process" + ) + except Exception as e: + if fresh_compositor: + try: + print(f"Compositor log on failure:\n{fresh_compositor.read_log()}") + except Exception as le: + print(f"Failed to read compositor log: {le}") + raise e diff --git a/tests/test_shutdown_events.py b/tests/test_shutdown_events.py new file mode 100644 index 00000000..9fb57bc5 --- /dev/null +++ b/tests/test_shutdown_events.py @@ -0,0 +1,113 @@ +import json +from pathlib import Path +import socket +import struct +from test_utils import run_compositor, wait_for_client_map, wayland_client + + +def test_shutdown_events_verification( + scroll_compositor_binary: str, tmp_path: Path +) -> None: + config_path: Path = Path(__file__).parent.parent / "config.in" + config_content: str = config_path.read_text() + + with run_compositor(scroll_compositor_binary, tmp_path, config_content) as fc: + # Connect a second socket for events subscription + event_socket: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + event_socket.connect(fc.ipc.socket_path) + + # Helper functions for the custom subscription socket + def send_msg(msg_type: int, payload: str) -> None: + payload_bytes: bytes = payload.encode("utf-8") + length: int = len(payload_bytes) + header: bytes = struct.pack("<6sII", b"i3-ipc", length, msg_type) + event_socket.sendall(header + payload_bytes) + + def recv_msg() -> tuple[int, str]: + header_data: bytes = b"" + while len(header_data) < 14: + chunk: bytes = event_socket.recv(14 - len(header_data)) + if not chunk: + raise EOFError("Socket closed") + header_data += chunk + magic: bytes + length: int + msg_type: int + magic, length, msg_type = struct.unpack("<6sII", header_data) + payload_data: bytes = b"" + while len(payload_data) < length: + chunk = event_socket.recv(length - len(payload_data)) + if not chunk: + raise EOFError("Socket closed") + payload_data += chunk + return msg_type, payload_data.decode("utf-8") + + # Subscribe to workspace, window, and shutdown events + # 2 is IPC_SUBSCRIBE + send_msg(2, json.dumps(["workspace", "window", "shutdown"])) + msg_type, payload = recv_msg() + assert msg_type == 2 + assert json.loads(payload)["success"] is True + + # Start two clients to have some active views/workspaces + with wayland_client(fc, "client1"): + wait_for_client_map(fc, "client1") + with wayland_client(fc, "client2"): + wait_for_client_map(fc, "client2") + + # Drain all pending events on subscription socket before terminating + event_socket.setblocking(False) + try: + while True: + header_data: bytes = event_socket.recv(14) + if len(header_data) == 14: + magic, length, msg_type = struct.unpack( + "<6sII", header_data + ) + payload_data = b"" + event_socket.setblocking(True) + while len(payload_data) < length: + chunk = event_socket.recv(length - len(payload_data)) + if not chunk: + break + payload_data += chunk + event_socket.setblocking(False) + except BlockingIOError: + pass + + event_socket.setblocking(True) + + # Now terminate the compositor by sending the exit command + try: + fc.cmd("exit") + except (EOFError, BrokenPipeError, ConnectionResetError): + # The socket might close immediately during exit processing, which is fine + pass + + # Read all events sent during shutdown until socket EOF + shutdown_events: list[tuple[int, dict]] = [] + try: + while True: + msg_type, payload = recv_msg() + shutdown_events.append((msg_type, json.loads(payload))) + except (EOFError, BrokenPipeError, ConnectionResetError): + pass # EOF or connection error is expected when compositor exits + + print(f"Events received during shutdown: {shutdown_events}") + + shutdown_msg_type: int = (1 << 31) | 6 + # Verify that if any events are received, they are only shutdown events and nothing unexpected + unexpected_events: list[tuple[int, dict]] = [] + for mtype, p in shutdown_events: + if mtype == shutdown_msg_type: + assert p["change"] == "exit" + else: + unexpected_events.append((mtype, p)) + + try: + assert not unexpected_events, ( + f"Received unexpected events: {unexpected_events}" + ) + except AssertionError as ae: + Path("scroll-test-failure.log").write_text(fc.read_log()) + raise ae diff --git a/tests/test_shutdown_lua_callbacks.py b/tests/test_shutdown_lua_callbacks.py new file mode 100644 index 00000000..7f13375d --- /dev/null +++ b/tests/test_shutdown_lua_callbacks.py @@ -0,0 +1,84 @@ +from pathlib import Path +import re +from test_utils import run_compositor, wait_for_client_map, wayland_client + + +def test_shutdown_lua_callbacks_verification( + scroll_compositor_binary: str, tmp_path: Path +) -> None: + config_path: Path = Path(__file__).parent.parent / "config.in" + config_content: str = config_path.read_text() + + with run_compositor(scroll_compositor_binary, tmp_path, config_content) as fc: + # Register callbacks for all events, logging them to the scroll debug log + res = fc.execute_lua(""" + scroll.add_callback("view_map", function(view, data) + scroll.log("LUA_CALLBACK: view_map " .. tostring(view)) + end, nil) + scroll.add_callback("view_unmap", function(view, data) + scroll.log("LUA_CALLBACK: view_unmap " .. tostring(view)) + end, nil) + scroll.add_callback("view_focus", function(view, data) + scroll.log("LUA_CALLBACK: view_focus " .. tostring(view)) + end, nil) + scroll.add_callback("workspace_create", function(ws, data) + scroll.log("LUA_CALLBACK: workspace_create " .. tostring(ws)) + end, nil) + scroll.add_callback("workspace_focus", function(ws, data) + scroll.log("LUA_CALLBACK: workspace_focus " .. tostring(ws)) + end, nil) + """) + print(f"execute_lua result: {res}") + + # Start two clients to populate windows and workspaces + with wayland_client(fc, "client1"): + wait_for_client_map(fc, "client1") + with wayland_client(fc, "client2"): + wait_for_client_map(fc, "client2") + + # Record the log length before exit command + log_before_exit: str = fc.read_log() + log_len_before_exit: int = len(log_before_exit) + + # Now terminate the compositor via the exit command + try: + fc.cmd("exit") + except (EOFError, BrokenPipeError, ConnectionResetError): + pass + + # Wait for compositor to exit + fc.proc.wait(timeout=5) + + # Read all new log lines generated during shutdown + full_log: str = fc.read_log() + shutdown_log: str = full_log[log_len_before_exit:] + + # Extract all callback events triggered during shutdown + callback_pattern = re.compile(r"LUA_CALLBACK: (\w+)") + triggered_callbacks: list[str] = callback_pattern.findall(shutdown_log) + + print(f"Triggered callbacks during shutdown: {triggered_callbacks}") + + # During shutdown, we unmap the existing views, so 'view_unmap' callbacks are expected. + # However, we should NOT see any new focus events ('view_focus', 'workspace_focus') + # or creation events ('workspace_create') being invoked. + unexpected_callbacks: list[str] = [ + cb + for cb in triggered_callbacks + if cb + in ( + "view_map", + "view_focus", + "workspace_create", + "workspace_focus", + ) + ] + + try: + assert not unexpected_callbacks, ( + "Unexpected Lua callbacks invoked during shutdown:" + f" {unexpected_callbacks}" + ) + except AssertionError as ae: + Path("scroll-lua-callbacks-failure.log").write_text(full_log) + raise ae diff --git a/tests/test_space_aba.py b/tests/test_space_aba.py index c01e83e3..270edc76 100644 --- a/tests/test_space_aba.py +++ b/tests/test_space_aba.py @@ -1,41 +1,43 @@ -import time from conftest import ScrollInstance from test_utils import wayland_client, wait_for_client_map -def test_space_aba(fresh_compositor: ScrollInstance) -> None: - # 1. Create client1 on WS 1 - with wayland_client(fresh_compositor, "client1"): - wait_for_client_map(fresh_compositor, "client1") - - # Save space "sp1" - fresh_compositor.cmd("space_save sp1") - - # client1 is closed now. - # Wait for it to be fully destroyed. - time.sleep(0.2) - - # 2. Create client2 on WS 1. - # Hopefully it reuses client1's view struct address. - with wayland_client(fresh_compositor, "client2"): - wait_for_client_map(fresh_compositor, "client2") - - # Switch to WS 2 - fresh_compositor.cmd("workspace 2") - - # Load space "sp1" on WS 2. - # If ABA bug occurs, it might find client2 (matching old client1 address) - # and move it to WS 2. - fresh_compositor.cmd("space_load sp1 load") - - # Check if client2 is visible on WS 2. - # If it was moved to WS 2, it should be focused because space_load focuses restored containers. - focused_title = fresh_compositor.execute_lua(""" - local view = scroll.focused_view() - return view and scroll.view_get_title(view) - """) - print(f"Focused title after space_load: {focused_title}") - - assert focused_title != "client2", ( - "ABA bug: client2 was incorrectly moved to WS 2!" - ) +def test_space_aba(scroll_compositor: ScrollInstance) -> None: + inst = scroll_compositor + try: + # 1. Create client1 on WS 1 + with wayland_client(inst, "client1"): + wait_for_client_map(inst, "client1") + + # Save space "sp1" + inst.cmd("space_save sp1") + + # client1 is closed now. + inst.wait_for_idle() + + # 2. Create client2 on WS 1. + # Hopefully it reuses client1's view struct address. + with wayland_client(inst, "client2"): + wait_for_client_map(inst, "client2") + + # Switch to WS 2 + inst.cmd("workspace 2") + + # Load space "sp1" on WS 2. + # If ABA bug occurs, it might find client2 (matching old client1 address) + # and move it to WS 2. + inst.cmd("space_load sp1 load") + + # Check if client2 is visible on WS 2. + # If it was moved to WS 2, it should be focused because space_load focuses restored containers. + focused_title = inst.execute_lua(""" + local view = scroll.focused_view() + return view and scroll.view_get_title(view) + """) + print(f"Focused title after space_load: {focused_title}") + + assert focused_title != "client2", ( + "ABA bug: client2 was incorrectly moved to WS 2!" + ) + finally: + inst.cmd("space delete sp1") diff --git a/tests/test_space_crash.py b/tests/test_space_crash.py index 6e35559e..b68f0e87 100644 --- a/tests/test_space_crash.py +++ b/tests/test_space_crash.py @@ -2,82 +2,72 @@ from test_utils import wayland_client, wait_for_client_map -def test_space_restore_uaf_crash(fresh_compositor: ScrollInstance) -> None: - # 1. Open Window 1 on Workspace 1 - with wayland_client(fresh_compositor, "Window 1"): - wait_for_client_map(fresh_compositor, "Window 1") - - # Make Window 1 floating - res = fresh_compositor.cmd("floating enable") - assert res and res[0]["success"], f"floating enable failed: {res}" - - # Save layout "space1" on Workspace 1 - res = fresh_compositor.cmd("space save space1") - assert res and res[0]["success"], f"space save failed: {res}" - - # 2. Switch to Workspace 2 - res = fresh_compositor.cmd("workspace 2") - assert res and res[0]["success"], f"workspace 2 failed: {res}" - - # Move Window 1 to Workspace 2 - # (It should still be floating? Yes, moving floating window to workspace works) - # Actually, we can just move it. - # Wait, if we are on Workspace 2, we can't easily move it here unless we focus it. - # But we switched to Workspace 2, so focus is on Workspace 2 (empty). - # We should go back to Workspace 1, move it to Workspace 2, then go to Workspace 2. - res = fresh_compositor.cmd("workspace 1") - assert res and res[0]["success"], f"workspace 1 failed: {res}" - - res = fresh_compositor.cmd("move container to workspace 2") - assert res and res[0]["success"], f"move to ws 2 failed: {res}" - - res = fresh_compositor.cmd("workspace 2") - assert res and res[0]["success"], f"workspace 2 failed: {res}" - - # Now Window 1 is floating on Workspace 2. - # We must make it tiled on Workspace 2. - res = fresh_compositor.cmd("floating disable") - assert res and res[0]["success"], f"floating disable failed: {res}" - - # Set mode to vertical to stack next window - res = fresh_compositor.cmd("set_mode v") - assert res and res[0]["success"], f"set_mode v failed: {res}" - - # Open Window 2 on Workspace 2 - with wayland_client(fresh_compositor, "Window 2"): - wait_for_client_map(fresh_compositor, "Window 2") - - # Now we have on Workspace 2: Split (V) -> [Window 1, Window 2] - - # Move Window 2 to Workspace 3 (so Window 1 is only child of V-split) - res = fresh_compositor.cmd("move container to workspace 3") - assert res and res[0]["success"], f"move to ws 3 failed: {res}" - - # Now Window 1 is the only child of V-split on Workspace 2. - # Workspace 2 has no other windows. - - # 3. Go to Workspace 1 - res = fresh_compositor.cmd("workspace 1") +def test_space_restore_uaf_crash(scroll_compositor: ScrollInstance) -> None: + inst = scroll_compositor + try: + # 1. Open Window 1 on Workspace 1 + with wayland_client(inst, "Window 1"): + wait_for_client_map(inst, "Window 1") + + # Make Window 1 floating + res = inst.cmd("floating enable") + assert res and res[0]["success"], f"floating enable failed: {res}" + + # Save layout "space1" on Workspace 1 + res = inst.cmd("space save space1") + assert res and res[0]["success"], f"space save failed: {res}" + + # 2. Switch to Workspace 2 + res = inst.cmd("workspace 2") + assert res and res[0]["success"], f"workspace 2 failed: {res}" + + # Move Window 1 to Workspace 2 + res = inst.cmd("workspace 1") assert res and res[0]["success"], f"workspace 1 failed: {res}" - # 4. Restore layout "space1" - # This should try to restore Window 1 as floating on Workspace 1. - # It will detach Window 1 from Workspace 2, reaping V-split and destroying Workspace 2. - # Then it will call arrange_container with dangling Workspace 2 pointer. - try: - res = fresh_compositor.cmd("space restore space1") - assert res and res[0]["success"], f"space restore failed: {res}" - except Exception as e: - print(f"Compositor log:\n{fresh_compositor.read_log()}") - raise e - - # Check if compositor process is still alive - log_content = fresh_compositor.read_log() - print(f"Compositor log:\n{log_content}") - if ( - fresh_compositor.proc.poll() is not None - or "node_table not initialized" in log_content - ): - pass - - assert fresh_compositor.proc.poll() is None, "Compositor crashed" + res = inst.cmd("move container to workspace 2") + assert res and res[0]["success"], f"move to ws 2 failed: {res}" + + res = inst.cmd("workspace 2") + assert res and res[0]["success"], f"workspace 2 failed: {res}" + + # Now Window 1 is floating on Workspace 2. + # We must make it tiled on Workspace 2. + res = inst.cmd("floating disable") + assert res and res[0]["success"], f"floating disable failed: {res}" + + # Set mode to vertical to stack next window + res = inst.cmd("set_mode v") + assert res and res[0]["success"], f"set_mode v failed: {res}" + + # Open Window 2 on Workspace 2 + with wayland_client(inst, "Window 2"): + wait_for_client_map(inst, "Window 2") + + # Now we have on Workspace 2: Split (V) -> [Window 1, Window 2] + + # Move Window 2 to Workspace 3 (so Window 1 is only child of V-split) + res = inst.cmd("move container to workspace 3") + assert res and res[0]["success"], f"move to ws 3 failed: {res}" + + # Now Window 1 is the only child of V-split on Workspace 2. + # Workspace 2 has no other windows. + + # 3. Go to Workspace 1 + res = inst.cmd("workspace 1") + assert res and res[0]["success"], f"workspace 1 failed: {res}" + + # 4. Restore layout "space1" + try: + res = inst.cmd("space restore space1") + assert res and res[0]["success"], f"space restore failed: {res}" + except Exception as e: + print(f"Compositor log:\n{inst.read_log()}") + raise e + + # Check if compositor process is still alive + log_content = inst.read_log() + print(f"Compositor log:\n{log_content}") + assert inst.proc.poll() is None, "Compositor crashed" + finally: + inst.cmd("space delete space1") diff --git a/tests/test_utils.py b/tests/test_utils.py index 27df53ed..bcbc4638 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -152,6 +152,80 @@ def execute_lua(self, code: str) -> Any: def getenv(self, var: str) -> str | None: return self.execute_lua(f'return os.getenv("{var}")') + def wait_for_idle(self, timeout: float = 5.0) -> None: + start = time.time() + while time.time() - start < timeout: + pending = self.execute_lua( + "return scroll.pending_transactions() or scroll.animating()" + ) + if not pending: + return + time.sleep(0.005) + raise TimeoutError("Timeout waiting for compositor to become idle") + + def wait_for_transactions(self, timeout: float = 5.0) -> None: + start = time.time() + while time.time() - start < timeout: + if not self.execute_lua("return scroll.pending_transactions()"): + return + time.sleep(0.005) + raise TimeoutError("Timeout waiting for transactions") + + def set_manual_stepping(self, enabled: bool) -> None: + self.execute_lua( + f"scroll.animation_set_manual_stepping({str(enabled).lower()})" + ) + + def reset(self) -> None: + # 1. Kill all views to clean up leftover windows + try: + self.cmd("kill all") + except Exception: + pass + + # 2. Clean up extra outputs + try: + tree = self.get_tree() + outputs: list[str] = [] + for child in tree.get("nodes", []): + if child.get("type") == "output" and child.get("name") != "__i3": + outputs.append(child["name"]) + + if "HEADLESS-1" in outputs: + for out in outputs: + if out != "HEADLESS-1" and out.startswith("HEADLESS-"): + self.cmd(f"output {out} unplug") + self.wait_for_idle() + except Exception: + pass + + # 3. Reload config to reset defaults + try: + self.cmd("reload") + self.wait_for_idle() + except Exception: + pass + + # 4. Reset workspaces (recreate workspace 1) + try: + self.cmd("workspace __temp") + self.cmd("workspace 1") + self.wait_for_idle() + except Exception: + pass + + # 5. Reset manual stepping (animations will be reset by config reload) + try: + self.execute_lua( + "if scroll and scroll.animation_set_manual_stepping then " + "scroll.animation_set_manual_stepping(false) end" + ) + except Exception: + pass + + def animation_step(self, ms: int) -> None: + self.execute_lua(f"scroll.animation_step({ms})") + @contextmanager def assert_logs_match( self, pattern: str, timeout: float = 5.0 @@ -171,6 +245,24 @@ def assert_logs_match( ) time.sleep(0.1) + def wait_for_log_pattern( + self, pattern: str, timeout: float = 5.0, from_start: bool = False + ) -> None: + compiled_pattern = re.compile(pattern) + start_time = time.time() + initial_log_len = 0 if from_start else len(self.read_log()) + while True: + current_log: str = self.read_log() + log_to_search = current_log if from_start else current_log[initial_log_len:] + if compiled_pattern.search(log_to_search): + return + if time.time() - start_time > timeout: + raise AssertionError( + f"Pattern '{pattern}' not found in log output within" + f" {timeout}s.\nLog searched was:\n{log_to_search}" + ) + time.sleep(0.1) + @contextmanager def run_compositor( @@ -181,7 +273,7 @@ def run_compositor( config_path: Path = temp_dir / "config" if config_content is None: - config_content = "workspace 1\nxwayland force\n" + config_content = "workspace 1\nxwayland force\nanimations enabled no\n" config_path.write_text(config_content) env = os.environ.copy() @@ -269,7 +361,7 @@ def wayland_client( def wait_for_client_map(compositor: ScrollInstance, title: str) -> int: tries: int = 0 - while tries < 50: + while tries < 200: view_id = compositor.execute_lua(f""" local view = scroll.focused_view() if view and scroll.view_get_title(view) == "{title}" then diff --git a/tests/test_workspace_split_uaf.py b/tests/test_workspace_split_uaf.py index cb034517..0bfa6cc5 100644 --- a/tests/test_workspace_split_uaf.py +++ b/tests/test_workspace_split_uaf.py @@ -1,59 +1,55 @@ -import time from conftest import ScrollInstance from test_utils import wayland_client, wait_for_client_map -def test_workspace_split_uaf_crash(fresh_compositor: ScrollInstance) -> None: +def test_workspace_split_uaf_crash(scroll_compositor: ScrollInstance) -> None: try: # 1. Open Window 1 on Workspace 1 (active on HEADLESS-1) - with wayland_client(fresh_compositor, "Window 1"): - wait_for_client_map(fresh_compositor, "Window 1") + with wayland_client(scroll_compositor, "Window 1"): + wait_for_client_map(scroll_compositor, "Window 1") # 2. Split workspace 1. This creates workspace 2 as sibling. # Workspace 1 has Window 1, Workspace 2 is empty. - res = fresh_compositor.cmd("workspace split") + res = scroll_compositor.cmd("workspace split") assert res and res[0]["success"], f"workspace split failed: {res}" # 3. Create a second output HEADLESS-2. # It should get a default workspace (probably 3). - res = fresh_compositor.cmd("create_output") + res = scroll_compositor.cmd("create_output") assert res and res[0]["success"], f"create_output failed: {res}" - - time.sleep(0.5) + scroll_compositor.wait_for_idle() # 4. Unplug HEADLESS-1. # Workspaces 1 and 2 should be evacuated. # Workspace 1 (non-empty) is moved to HEADLESS-2. # Workspace 2 (empty) is destroyed. - res = fresh_compositor.cmd("output HEADLESS-1 unplug") + res = scroll_compositor.cmd("output HEADLESS-1 unplug") assert res and res[0]["success"], f"unplug failed: {res}" - - time.sleep(0.5) + scroll_compositor.wait_for_idle() # 5. Focus Workspace 3 on HEADLESS-2 (so Workspace 1 becomes inactive) - res = fresh_compositor.cmd("workspace 3") + res = scroll_compositor.cmd("workspace 3") assert res and res[0]["success"], f"workspace 3 failed: {res}" - - time.sleep(0.5) + scroll_compositor.wait_for_idle() # 6. Move Window 1 from Workspace 1 to Workspace 3. # Since Window 1 is moved out of Workspace 1, and Workspace 1 is inactive, # it should trigger workspace_consider_destroy(Workspace 1). # Workspace 1 is empty, and it is split (sibling was Workspace 2). # It will try to access Workspace 2 (which is destroyed) -> UAF. - res = fresh_compositor.cmd( + res = scroll_compositor.cmd( '[title="Window 1"] move container to workspace 3' ) assert res and res[0]["success"], f"move container failed: {res}" # Check if compositor process is still alive - log_content = fresh_compositor.read_log() + log_content = scroll_compositor.read_log() print(f"Compositor log:\n{log_content}") - assert fresh_compositor.proc.poll() is None, "Compositor crashed" + assert scroll_compositor.proc.poll() is None, "Compositor crashed" except Exception as e: print(f"Test failed with exception: {e}") try: - print(f"Compositor log:\n{fresh_compositor.read_log()}") + print(f"Compositor log:\n{scroll_compositor.read_log()}") except Exception as log_err: print(f"Failed to read compositor log: {log_err}") raise e