diff --git a/CHANGELOG.md b/CHANGELOG.md index 8608594..ba9ba6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.8.2 - 2026-05-16 + +- Fixed routed audio recovery after PipeWire or WirePlumber relinks playback + streams while system audio routing is enabled. +- Fixed output recovery when speakers, headphones, or displays are recreated + under the same PipeWire name. + ## 0.8.1 - 2026-05-14 - Fix first-run Flatpak window sizing and state restoration. diff --git a/data/io.github.bhack.mini-eq.metainfo.xml b/data/io.github.bhack.mini-eq.metainfo.xml index 60ad359..72c5e34 100644 --- a/data/io.github.bhack.mini-eq.metainfo.xml +++ b/data/io.github.bhack.mini-eq.metainfo.xml @@ -33,11 +33,11 @@ - https://raw.githubusercontent.com/bhack/mini-eq/v0.8.1/docs/screenshots/mini-eq.png + https://raw.githubusercontent.com/bhack/mini-eq/v0.8.2/docs/screenshots/mini-eq.png Adjust sound output with equalizer controls - https://raw.githubusercontent.com/bhack/mini-eq/v0.8.1/docs/screenshots/mini-eq-dark.png + https://raw.githubusercontent.com/bhack/mini-eq/v0.8.2/docs/screenshots/mini-eq-dark.png Use the equalizer with dark style @@ -45,6 +45,14 @@ https://github.com/bhack/mini-eq/issues https://github.com/bhack/mini-eq + + +
    +
  • Fixed routed audio recovery after PipeWire or WirePlumber relinks playback streams while system audio routing is enabled.
  • +
  • Fixed output recovery when speakers, headphones, or displays are recreated under the same PipeWire name.
  • +
+
+
    diff --git a/pyproject.toml b/pyproject.toml index 2f0bb01..825c445 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mini-eq" -version = "0.8.1" +version = "0.8.2" description = "Compact PipeWire system-wide parametric equalizer for Linux desktops." readme = "README.md" requires-python = ">=3.11" diff --git a/src/mini_eq/pipewire_stream_router.py b/src/mini_eq/pipewire_stream_router.py index 35e0110..cb1fd3b 100644 --- a/src/mini_eq/pipewire_stream_router.py +++ b/src/mini_eq/pipewire_stream_router.py @@ -143,6 +143,9 @@ def _processing_path_node_ids(self) -> set[int]: def _link_touches_processing_path(self, link: PipeWireLink) -> bool: return bool(self._processing_path_node_ids() & {link.output_node_id, link.input_node_id}) + def _link_touches_routed_stream(self, link: PipeWireLink) -> bool: + return bool(self.routed_stream_ids & {link.output_node_id, link.input_node_id}) + def handle_link_state_changed(self, state: str | None) -> None: if state == LINK_STATE_ACTIVE: self.schedule_refresh(route_applied=True) @@ -318,6 +321,8 @@ def handle_object_added(self, _manager, node) -> None: if self._link_touches_processing_path(link): self.track_processing_link_state(link) self.schedule_refresh(route_applied=True) + elif self._link_touches_routed_stream(link): + self.schedule_refresh() def handle_object_removed(self, _manager, node) -> None: try: @@ -325,7 +330,10 @@ def handle_object_removed(self, _manager, node) -> None: except Exception: return + should_refresh = self._link_touches_processing_path(link) or self._link_touches_routed_stream(link) self.untrack_processing_link_state(link) + if should_refresh: + self.schedule_refresh() def untrack_processing_link_states(self) -> None: for handler_id in list(self.link_state_handler_ids.values()): diff --git a/src/mini_eq/routing.py b/src/mini_eq/routing.py index 627c51a..66e2c29 100644 --- a/src/mini_eq/routing.py +++ b/src/mini_eq/routing.py @@ -70,6 +70,7 @@ def __init__(self, output_sink: str | None) -> None: self.filter_node_id: int | None = None self.output_event_source_id = 0 self.pending_followed_output_sink: str | None = None + self.pending_current_output_sink_refresh = False self.output_object_added_handler_id = 0 self.output_object_removed_handler_id = 0 self.output_metadata_changed_handler_id = 0 @@ -148,6 +149,20 @@ def get_sink(self, sink_name: str) -> PipeWireNode | None: return self.output_backend.audio_sink_by_name(sink_name) + def filter_output_already_targets_sink(self, sink: PipeWireNode) -> bool: + if not self.running or self.filter_node_id is None or not sink.object_serial: + return False + + try: + filter_output = self.output_backend.output_stream_by_name(self.filter_output_name) + if filter_output is None: + return False + target = self.output_backend.stream_target(filter_output.bound_id) + except Exception: + return False + + return target.target_object == sink.object_serial + def output_preset_keys(self) -> tuple[str, ...]: return self.output_preset_target().keys @@ -271,9 +286,33 @@ def on_error(exc: Exception) -> None: return analyzer.set_enabled(enabled) def switch_output_sink(self, sink_name: str, explicit: bool) -> None: - if not sink_name or sink_name == self.output_sink: + if not sink_name: + if explicit: + self.follow_default_output = False + return + + if sink_name == self.output_sink: if explicit: self.follow_default_output = False + + output_sink = self.get_sink(sink_name) + if output_sink is None: + return + + self.refresh_output_route_param_monitor() + if self.stream_router is not None: + self.stream_router.set_output_sink_name(sink_name) + if self.output_analyzer is not None: + output_sink_description = output_sink.node_description if output_sink is not None else None + self.output_analyzer.set_output_sink_name(sink_name, output_sink_description) + + if self.filter_output_already_targets_sink(output_sink): + return + + if self.retarget_filter_output(): + return + + self.restart_engine() return if not self.is_valid_output_sink(sink_name): @@ -356,6 +395,8 @@ def handle_output_object_added(self, _manager, proxy) -> None: return if node.is_audio_sink: + if node.node_name == getattr(self, "output_sink", ""): + self.pending_current_output_sink_refresh = True self.schedule_output_event_refresh() def handle_output_object_removed(self, _manager, _proxy) -> None: @@ -423,11 +464,23 @@ def on_output_event_idle(self) -> bool: self.invalidate_output_preset_target() pending_followed_output_sink = getattr(self, "pending_followed_output_sink", None) + pending_current_output_sink_refresh = getattr(self, "pending_current_output_sink_refresh", False) self.pending_followed_output_sink = None + self.pending_current_output_sink_refresh = False + followed_output_refreshed = False if pending_followed_output_sink is not None: - self.refresh_followed_output_sink_from_event(pending_followed_output_sink) + followed_output_refreshed = self.refresh_followed_output_sink_from_event(pending_followed_output_sink) else: - self.refresh_followed_output_sink(snapshot=True) + followed_output_refreshed = self.refresh_followed_output_sink(snapshot=True) + if ( + pending_current_output_sink_refresh + and not followed_output_refreshed + and self.get_sink(getattr(self, "output_sink", "")) is not None + ): + try: + self.switch_output_sink(self.output_sink, explicit=False) + except Exception as exc: + self.emit_status(f"output refresh warning: {exc}") self.refresh_output_route_param_monitor() if self.outputs_changed_callback is not None: diff --git a/tests/test_check_flatpak_runtime.py b/tests/test_check_flatpak_runtime.py index 3f132a2..5cef226 100644 --- a/tests/test_check_flatpak_runtime.py +++ b/tests/test_check_flatpak_runtime.py @@ -5,6 +5,28 @@ from tools import check_flatpak_runtime +def node_item(item_id: int, name: str) -> dict: + return { + "id": item_id, + "type": "PipeWire:Interface:Node", + "info": {"props": {"node.name": name, "object.serial": str(item_id + 1000)}}, + } + + +def link_item(item_id: int, output_node: int, input_node: int, state: str) -> dict: + return { + "id": item_id, + "type": "PipeWire:Interface:Link", + "info": { + "state": state, + "props": { + "link.output.node": str(output_node), + "link.input.node": str(input_node), + }, + }, + } + + @pytest.mark.parametrize( "app_ref", [ @@ -37,3 +59,37 @@ def test_flatpak_runtime_smoke_includes_extra_flatpak_run_args(monkeypatch: pyte "io.github.bhack.mini-eq//master", "--check-deps", ] + + +def test_flatpak_runtime_recognizes_active_processing_path(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + check_flatpak_runtime, + "read_pw_dump", + lambda: [ + node_item(10, "mini_eq_sink"), + node_item(20, "mini_eq_sink_output"), + node_item(30, "ci_null_sink"), + node_item(40, "browser"), + link_item(90, 40, 10, "active"), + link_item(91, 20, 30, "active"), + ], + ) + + assert check_flatpak_runtime.processing_path_has_active_links() is True + + +def test_flatpak_runtime_rejects_inactive_processing_path(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + check_flatpak_runtime, + "read_pw_dump", + lambda: [ + node_item(10, "mini_eq_sink"), + node_item(20, "mini_eq_sink_output"), + node_item(30, "ci_null_sink"), + node_item(40, "browser"), + link_item(90, 40, 10, "active"), + link_item(91, 20, 30, "paused"), + ], + ) + + assert check_flatpak_runtime.processing_path_has_active_links() is False diff --git a/tests/test_check_headless_pipewire_runtime.py b/tests/test_check_headless_pipewire_runtime.py index cc720ad..c61949e 100644 --- a/tests/test_check_headless_pipewire_runtime.py +++ b/tests/test_check_headless_pipewire_runtime.py @@ -25,6 +25,26 @@ def link_item(item_id: int, output_node: int, input_node: int, state: str) -> di } +class AlwaysPendingContext: + def __init__(self) -> None: + self.iterations = 0 + + def pending(self) -> bool: + return True + + def iteration(self, may_block: bool) -> None: + assert may_block is False + self.iterations += 1 + + +def test_drain_main_context_limits_continuous_pending_events() -> None: + context = AlwaysPendingContext() + + headless.drain_main_context(context, max_iterations=3) + + assert context.iterations == 3 + + def test_headless_runtime_recognizes_active_processing_path(monkeypatch) -> None: monkeypatch.setattr( headless.live, @@ -71,3 +91,25 @@ def test_headless_runtime_rejects_stale_virtual_route(monkeypatch) -> None: monkeypatch.setattr(headless.live, "metadata_targets", lambda: {42: ("old-serial", "Spa:Id")}) assert headless.route_to_current_virtual(42, "mini_eq_sink") is None + + +def test_dynamic_sink_properties_create_hotplug_audio_sink() -> None: + properties = headless.dynamic_sink_properties("ci_hotplug_sink") + + assert 'node.name = "ci_hotplug_sink"' in properties + assert 'media.class = "Audio/Sink"' in properties + assert "object.linger = true" in properties + assert "factory.name = support.null-audio-sink" in properties + assert "session.suspend-timeout-seconds = 1" in properties + + +def test_alsa_null_sink_properties_create_alsa_pcm_audio_sink() -> None: + properties = headless.alsa_null_sink_properties("ci_alsa_null_sink") + + assert 'node.name = "ci_alsa_null_sink"' in properties + assert 'media.class = "Audio/Sink"' in properties + assert "object.linger = true" in properties + assert "factory.name = api.alsa.pcm.sink" in properties + assert 'api.alsa.path = "null"' in properties + assert 'audio.format = "S16LE"' in properties + assert "session.suspend-timeout-seconds = 1" in properties diff --git a/tests/test_github_workflows.py b/tests/test_github_workflows.py index d042538..34b1ed8 100644 --- a/tests/test_github_workflows.py +++ b/tests/test_github_workflows.py @@ -110,6 +110,7 @@ def test_headless_pipewire_runtime_smoke_is_optional_ci_gate_without_nested_gnom assert "python3-pyatspi" not in job assert 'timeout="${MINI_EQ_HEADLESS_PIPEWIRE_TIMEOUT:-90}"' in script assert 'audio_duration="${MINI_EQ_HEADLESS_PIPEWIRE_AUDIO_DURATION:-180}"' in script + assert 'idle_gap="${MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP:-8}"' in script def test_live_ui_runtime_smoke_uses_host_gir_build_environment() -> None: @@ -129,3 +130,4 @@ def test_flatpak_runtime_smoke_tolerates_pipewire_startup_race() -> None: assert "{ pw-dump 2>/dev/null || true; }" in script assert "first(.[] | select(" in script assert "| head -n 1" not in script + assert 'idle_gap="${MINI_EQ_FLATPAK_SMOKE_IDLE_GAP:-8}"' in script diff --git a/tests/test_mini_eq_pipewire_stream_router.py b/tests/test_mini_eq_pipewire_stream_router.py index af96dcc..af6b375 100644 --- a/tests/test_mini_eq_pipewire_stream_router.py +++ b/tests/test_mini_eq_pipewire_stream_router.py @@ -536,6 +536,61 @@ def test_pipewire_router_reapplies_controls_when_processing_link_becomes_active( assert applied == ["apply", "apply"] +def test_pipewire_router_reroutes_tracked_stream_when_relinked_away( + monkeypatch: pytest.MonkeyPatch, +) -> None: + spotify = make_node(1, pw_backend.STREAM_OUTPUT_AUDIO, "spotify", "Spotify") + speakers = make_node(22, pw_backend.AUDIO_SINK, "speakers") + backend = FakePipeWireBackend([spotify], sinks=[speakers]) + router = pw_router.PipeWireStreamRouter("mini_eq_sink", "mini_eq_sink_output", lambda _message: None, backend) + scheduled_callbacks: list[object] = [] + + monkeypatch.setattr( + pw_router.GLib, + "idle_add", + lambda callback: scheduled_callbacks.append(callback) or 321, + ) + + router.enabled = True + router.accept_stream_events = True + router.routed_stream_ids = {spotify.bound_id} + router.handle_object_added(None, make_link(92, output_node_id=spotify.bound_id, input_node_id=speakers.bound_id)) + + assert len(scheduled_callbacks) == 1 + scheduled_callbacks[0]() + + assert backend.moves == [(spotify.bound_id, "mini_eq_sink")] + + +def test_pipewire_router_reroutes_tracked_stream_when_current_link_disappears( + monkeypatch: pytest.MonkeyPatch, +) -> None: + spotify = make_node(1, pw_backend.STREAM_OUTPUT_AUDIO, "spotify", "Spotify") + virtual_sink = make_node(11, pw_backend.AUDIO_SINK, "mini_eq_sink") + backend = FakePipeWireBackend([spotify], sinks=[virtual_sink]) + router = pw_router.PipeWireStreamRouter("mini_eq_sink", "mini_eq_sink_output", lambda _message: None, backend) + scheduled_callbacks: list[object] = [] + + monkeypatch.setattr( + pw_router.GLib, + "idle_add", + lambda callback: scheduled_callbacks.append(callback) or 321, + ) + + router.enabled = True + router.accept_stream_events = True + router.routed_stream_ids = {spotify.bound_id} + router.handle_object_removed( + None, + make_link(92, output_node_id=spotify.bound_id, input_node_id=virtual_sink.bound_id), + ) + + assert len(scheduled_callbacks) == 1 + scheduled_callbacks[0]() + + assert backend.moves == [(spotify.bound_id, "mini_eq_sink")] + + def test_pipewire_router_tracks_internal_output_links(monkeypatch: pytest.MonkeyPatch) -> None: internal_output = make_node(90, pw_backend.STREAM_OUTPUT_AUDIO, "mini_eq_sink_output") backend = FakePipeWireBackend([internal_output], sinks=[make_node(22, pw_backend.AUDIO_SINK, "speakers")]) diff --git a/tests/test_mini_eq_routing.py b/tests/test_mini_eq_routing.py index 339f8f7..6b22b8f 100644 --- a/tests/test_mini_eq_routing.py +++ b/tests/test_mini_eq_routing.py @@ -341,6 +341,8 @@ def test_output_object_added_schedules_refresh_only_for_audio_sinks(monkeypatch: controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) controller.accept_output_events = True controller.output_event_source_id = 0 + controller.output_sink = "speakers" + controller.pending_current_output_sink_refresh = False controller.output_backend = type("Backend", (), {"node_from_proxy": lambda _self, node: node})() scheduled_callbacks: list[object] = [] @@ -358,9 +360,36 @@ def test_output_object_added_schedules_refresh_only_for_audio_sinks(monkeypatch: routing.SystemWideEqController.handle_output_object_added(controller, None, make_node(2, "speakers")) assert controller.output_event_source_id == 123 + assert controller.pending_current_output_sink_refresh is True assert len(scheduled_callbacks) == 1 +def test_output_event_idle_retargets_recreated_explicit_output() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + controller.accept_output_events = True + controller.output_event_source_id = 123 + controller.pending_followed_output_sink = None + controller.pending_current_output_sink_refresh = True + controller.follow_default_output = False + controller.output_sink = "hdmi" + controller.output_backend = FakeOutputBackend([make_node(1, "hdmi")]) + controller._output_preset_target_sink = "hdmi" + controller._output_preset_target = pw_routes.PipeWireOutputPresetTarget("hdmi", None, ("hdmi",)) + calls: list[object] = [] + controller.refresh_followed_output_sink = lambda **_kwargs: calls.append("refresh") or False + controller.switch_output_sink = lambda sink_name, explicit: calls.append(("switch", sink_name, explicit)) + controller.refresh_output_route_param_monitor = lambda: calls.append("route-monitor") + controller.outputs_changed_callback = lambda: calls.append("outputs") + + assert routing.SystemWideEqController.on_output_event_idle(controller) is False + + assert controller.output_event_source_id == 0 + assert controller.pending_current_output_sink_refresh is False + assert controller._output_preset_target_sink is None + assert controller._output_preset_target is None + assert calls == ["refresh", ("switch", "hdmi", False), "route-monitor", "outputs"] + + def test_output_event_idle_invalidates_output_preset_target_cache() -> None: controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) controller.accept_output_events = True @@ -592,6 +621,91 @@ def set_output_sink_name(self, sink_name: str) -> None: ] +def test_switch_output_sink_retargets_recreated_same_name_sink_without_restart() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + calls: list[object] = [] + + class FakeBackend(FakeOutputBackend): + def move_named_output_stream_to_target(self, stream_node_name: str, target_node_name: str) -> None: + calls.append(("retarget", stream_node_name, target_node_name)) + + class FakeRouter: + def set_output_sink_name(self, sink_name: str) -> None: + calls.append(("router-target", sink_name)) + + class FakeAnalyzer: + def set_output_sink_name(self, sink_name: str, sink_description: str | None = None) -> None: + calls.append(("analyzer-target", sink_name, sink_description)) + + controller.output_backend = FakeBackend([make_node(2, "hdmi")]) + controller.output_sink = "hdmi" + controller.follow_default_output = True + controller.running = True + controller.filter_node_id = 42 + controller.filter_output_name = "mini_eq_sink_output" + controller.virtual_sink_name = "mini_eq_sink" + controller.stream_router = FakeRouter() + controller.output_analyzer = FakeAnalyzer() + controller.refresh_output_route_param_monitor = lambda: calls.append("route-param-monitor") + controller.apply_state_to_engine = lambda: calls.append("apply") + controller.emit_status = lambda message: calls.append(("status", message)) + controller.stop_engine = lambda *_args, **_kwargs: calls.append("stop") + controller.start_engine = lambda: calls.append("start") + + routing.SystemWideEqController.switch_output_sink(controller, "hdmi", explicit=False) + + assert controller.output_sink == "hdmi" + assert controller.follow_default_output is True + assert calls == [ + "route-param-monitor", + ("router-target", "hdmi"), + ("analyzer-target", "hdmi", None), + ("retarget", "mini_eq_sink_output", "hdmi"), + "apply", + ("status", "filter-chain PipeWire EQ ready: mini_eq_sink -> hdmi"), + ] + + +def test_switch_output_sink_skips_same_name_retarget_when_filter_output_already_targets_sink() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + calls: list[object] = [] + sink = make_node(2, "hdmi") + filter_output = make_node(42, "mini_eq_sink_output", pw_backend.STREAM_OUTPUT_AUDIO) + + class FakeBackend(FakeOutputBackend): + def output_stream_by_name(self, node_name: str) -> pw_backend.PipeWireNode | None: + return filter_output if node_name == "mini_eq_sink_output" else None + + def stream_target(self, stream_bound_id: int) -> pw_backend.PipeWireStreamTarget: + assert stream_bound_id == filter_output.bound_id + return pw_backend.PipeWireStreamTarget(None, None, sink.object_serial, "Spa:Id") + + def move_named_output_stream_to_target(self, _stream_node_name: str, _target_node_name: str) -> None: + calls.append("unexpected-retarget") + + class FakeRouter: + def set_output_sink_name(self, sink_name: str) -> None: + calls.append(("router-target", sink_name)) + + controller.output_backend = FakeBackend([sink]) + controller.output_sink = "hdmi" + controller.follow_default_output = True + controller.running = True + controller.filter_node_id = 42 + controller.filter_output_name = "mini_eq_sink_output" + controller.virtual_sink_name = "mini_eq_sink" + controller.stream_router = FakeRouter() + controller.output_analyzer = None + controller.refresh_output_route_param_monitor = lambda: calls.append("route-param-monitor") + controller.apply_state_to_engine = lambda: calls.append("apply") + controller.emit_status = lambda message: calls.append(("status", message)) + controller.restart_engine = lambda: calls.append("restart") + + routing.SystemWideEqController.switch_output_sink(controller, "hdmi", explicit=False) + + assert calls == ["route-param-monitor", ("router-target", "hdmi")] + + def test_explicit_output_change_schedules_coalesced_output_refresh(monkeypatch: pytest.MonkeyPatch) -> None: controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) scheduled_callbacks: list[object] = [] diff --git a/tests/test_release_preflight.py b/tests/test_release_preflight.py index c6e105a..d268680 100644 --- a/tests/test_release_preflight.py +++ b/tests/test_release_preflight.py @@ -88,6 +88,7 @@ def test_release_preflight_runs_headless_pipewire_runtime_smoke(monkeypatch) -> monkeypatch.setenv("MINI_EQ_HEADLESS_PIPEWIRE_TIMEOUT", "12") monkeypatch.setenv("MINI_EQ_HEADLESS_PIPEWIRE_CYCLES", "4") monkeypatch.setenv("MINI_EQ_HEADLESS_PIPEWIRE_AUDIO_DURATION", "34") + monkeypatch.setenv("MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP", "5") monkeypatch.setattr(release_preflight, "run", lambda command, **_kwargs: commands.append(command)) release_preflight.run_headless_pipewire_runtime_smoke(Path("/python")) @@ -102,6 +103,8 @@ def test_release_preflight_runs_headless_pipewire_runtime_smoke(monkeypatch) -> "4", "--audio-duration", "34", + "--idle-gap", + "5", ] ] @@ -132,6 +135,7 @@ def test_release_preflight_uses_hosted_headless_pipewire_defaults(monkeypatch) - monkeypatch.delenv("MINI_EQ_HEADLESS_PIPEWIRE_TIMEOUT", raising=False) monkeypatch.delenv("MINI_EQ_HEADLESS_PIPEWIRE_CYCLES", raising=False) monkeypatch.delenv("MINI_EQ_HEADLESS_PIPEWIRE_AUDIO_DURATION", raising=False) + monkeypatch.delenv("MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP", raising=False) monkeypatch.setattr(release_preflight, "run", lambda command, **_kwargs: commands.append(command)) release_preflight.run_headless_pipewire_runtime_smoke(Path("/python")) @@ -146,6 +150,8 @@ def test_release_preflight_uses_hosted_headless_pipewire_defaults(monkeypatch) - "2", "--audio-duration", "180", + "--idle-gap", + "8", ] ] diff --git a/tools/check_flatpak_runtime.py b/tools/check_flatpak_runtime.py index 7e60d52..66ff4b6 100644 --- a/tools/check_flatpak_runtime.py +++ b/tools/check_flatpak_runtime.py @@ -8,6 +8,7 @@ import re import shlex import shutil +import signal import subprocess import sys import time @@ -42,6 +43,7 @@ SMOKE_MEDIA_ROLE = "MiniEQSmoke" SMOKE_NODE_NAME = "mini-eq-flatpak-smoke" VIRTUAL_SINK_NAME = "mini_eq_sink" +FILTER_OUTPUT_NAME = f"{VIRTUAL_SINK_NAME}_output" PIPEWIRE_MANAGER_ACCESS = "flatpak-manager" TARGET_OBJECT_RE = re.compile( r"update: id:(?P\d+) key:'target\.object' value:'(?P[^']*)' type:'(?P[^']*)'" @@ -129,6 +131,10 @@ def node_items() -> list[dict[str, Any]]: return [item for item in read_pw_dump() if item.get("type") == "PipeWire:Interface:Node"] +def link_items() -> list[dict[str, Any]]: + return [item for item in read_pw_dump() if item.get("type") == "PipeWire:Interface:Link"] + + def client_items() -> list[dict[str, Any]]: return [item for item in read_pw_dump() if item.get("type") == "PipeWire:Interface:Client"] @@ -164,6 +170,25 @@ def bound_id(node: dict[str, Any]) -> int: return node_id +def link_endpoint_ids(link: dict[str, Any]) -> set[int]: + endpoints: set[int] = set() + props = item_props(link) + for key in ("link.output.node", "link.input.node"): + try: + endpoints.add(int(props.get(key))) + except (TypeError, ValueError): + pass + return endpoints + + +def link_state(link: dict[str, Any]) -> str | None: + info_state = link.get("info", {}).get("state") + if isinstance(info_state, str): + return info_state + prop_state = item_props(link).get("link.state") + return str(prop_state) if prop_state is not None else None + + def metadata_targets() -> dict[int, tuple[str, str]]: result = subprocess.run(["pw-metadata", "-n", "default"], check=True, text=True, stdout=subprocess.PIPE) targets: dict[int, tuple[str, str]] = {} @@ -195,6 +220,46 @@ def wait_for(label: str, predicate: Callable[[], Any], timeout_seconds: float) - raise RuntimeError(f"Timed out waiting for {label}{detail}") +def route_to_current_virtual(smoke_id: int, virtual_serial: str) -> bool: + return metadata_targets().get(smoke_id) == (virtual_serial, "Spa:Id") + + +def force_stream_target(smoke_id: int, target_sink_name: str, timeout_seconds: float) -> str: + target_sink = wait_for( + f"PipeWire sink {target_sink_name}", + lambda: node_by_name(target_sink_name), + timeout_seconds, + ) + target_id = bound_id(target_sink) + target_serial = object_serial(target_sink) + for key, value in (("target.node", str(target_id)), ("target.object", target_serial)): + subprocess.run( + ["pw-metadata", "-n", "default", str(smoke_id), key, value, "Spa:Id"], + check=True, + text=True, + stdout=subprocess.DEVNULL, + ) + return target_serial + + +def processing_path_has_active_links() -> bool: + virtual_sink = node_by_name(VIRTUAL_SINK_NAME) + filter_output = node_by_name(FILTER_OUTPUT_NAME) + if virtual_sink is None or filter_output is None: + return False + + required_ids = {bound_id(virtual_sink): False, bound_id(filter_output): False} + for link in link_items(): + if link_state(link) != "active": + continue + endpoints = link_endpoint_ids(link) + for required_id in tuple(required_ids): + if required_id in endpoints: + required_ids[required_id] = True + + return all(required_ids.values()) + + def mini_eq_has_manager_access() -> bool: for client in client_items(): props = item_props(client) @@ -264,6 +329,52 @@ def stop_process(process: subprocess.Popen[str], label: str, timeout_seconds: fl return output or "" +def stop_smoke_stream(smoke: subprocess.Popen[str], timeout_seconds: float) -> None: + stop_process(smoke, "pw-cat smoke stream", timeout_seconds) + wait_for("smoke stream to disappear", lambda: smoke_stream_node() is None, timeout_seconds) + + +def wait_for_smoke_stream(smoke: subprocess.Popen[str], timeout_seconds: float) -> int: + def live_smoke_stream_node() -> dict[str, Any] | None: + if smoke.poll() is not None: + output = stop_process(smoke, "pw-cat smoke stream") + detail = f": {output.strip()}" if output.strip() else "" + raise RuntimeError(f"pw-cat exited before its PipeWire stream appeared{detail}") + return smoke_stream_node() + + return bound_id(wait_for("silent PipeWire smoke stream", live_smoke_stream_node, timeout_seconds)) + + +def smoke_stream_still_present(smoke_id: int) -> bool: + smoke_node = smoke_stream_node() + return smoke_node is not None and bound_id(smoke_node) == smoke_id + + +def wait_for_idle_gap(label: str, idle_gap_seconds: float) -> None: + deadline = time.monotonic() + idle_gap_seconds + wait_for(label, lambda: time.monotonic() >= deadline, idle_gap_seconds + 1.0) + + +def pause_smoke_stream_for_idle( + smoke: subprocess.Popen[str], + smoke_id: int, + idle_gap_seconds: float, + timeout_seconds: float, +) -> None: + if smoke.poll() is not None: + raise RuntimeError("pw-cat exited before it could be paused") + + smoke.send_signal(signal.SIGSTOP) + try: + wait_for( + "paused smoke stream to remain registered", lambda: smoke_stream_still_present(smoke_id), timeout_seconds + ) + wait_for_idle_gap("idle gap with paused smoke stream", idle_gap_seconds) + finally: + if smoke.poll() is None: + smoke.send_signal(signal.SIGCONT) + + def assert_no_existing_virtual_sink() -> None: if node_by_name(VIRTUAL_SINK_NAME) is not None: raise RuntimeError( @@ -276,29 +387,29 @@ def run_runtime_smoke( duration_seconds: float, timeout_seconds: float, smoke_target: str | None, + idle_gap_seconds: float, ) -> None: assert_no_existing_virtual_sink() deps = run(flatpak_run_command(app_ref, "--check-deps")) print(deps.stdout.rstrip(), flush=True) - # Keep pw-cat alive across stream discovery, app startup, routing, app runtime, and restore waits. - smoke_audio_duration = max(duration_seconds + timeout_seconds * 4.0 + 15.0, 60.0) + # Keep the Flatpak alive across every post-start transition this smoke can wait for. + post_start_transition_count = 9 if smoke_target is not None else 7 + app_duration = max( + duration_seconds, + timeout_seconds * post_start_transition_count + idle_gap_seconds * 2.0 + 10.0, + ) + + # Keep pw-cat alive across stream discovery, app startup, routing, idle phases, and restore waits. + smoke_audio_duration = max(app_duration + timeout_seconds * 4.0 + idle_gap_seconds * 2.0 + 15.0, 60.0) smoke_audio = create_silent_wav(smoke_audio_duration) smoke = start_smoke_stream(smoke_target, smoke_audio) app: subprocess.Popen[str] | None = None + relink_recovery_checked = False try: - - def live_smoke_stream_node() -> dict[str, Any] | None: - if smoke.poll() is not None: - output = stop_process(smoke, "pw-cat smoke stream") - detail = f": {output.strip()}" if output.strip() else "" - raise RuntimeError(f"pw-cat exited before its PipeWire stream appeared{detail}") - return smoke_stream_node() - - smoke_node = wait_for("silent PipeWire smoke stream", live_smoke_stream_node, timeout_seconds) - smoke_id = bound_id(smoke_node) + smoke_id = wait_for_smoke_stream(smoke, timeout_seconds) original_target = metadata_targets().get(smoke_id) command = flatpak_run_command( @@ -306,7 +417,7 @@ def live_smoke_stream_node() -> dict[str, Any] | None: "--headless", "--auto-route", "--duration", - str(duration_seconds), + str(app_duration), ) print(f"$ {format_command(command)}", flush=True) app = subprocess.Popen(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -337,12 +448,52 @@ def live_virtual_sink() -> dict[str, Any] | None: def smoke_stream_targets_virtual_sink() -> bool: note_manager_access() require_mini_eq_running("the smoke stream was routed") - target = metadata_targets().get(smoke_id) - return target == (virtual_serial, "Spa:Id") + return route_to_current_virtual(smoke_id, virtual_serial) wait_for("smoke stream routed through Mini EQ", smoke_stream_targets_virtual_sink, timeout_seconds) + wait_for("Mini EQ processing path links to become active", processing_path_has_active_links, timeout_seconds) + + print("## Flatpak idle stream recreation", flush=True) + stop_smoke_stream(smoke, timeout_seconds) + wait_for_idle_gap("idle gap after smoke stream cleanup", idle_gap_seconds) + smoke = start_smoke_stream(smoke_target, smoke_audio) + smoke_id = wait_for_smoke_stream(smoke, timeout_seconds) + original_target = None + wait_for("recreated smoke stream routed through Mini EQ", smoke_stream_targets_virtual_sink, timeout_seconds) + wait_for( + "Mini EQ processing path links to become active after stream recreation", + processing_path_has_active_links, + timeout_seconds, + ) + + print("## Flatpak paused stream resume", flush=True) + pause_smoke_stream_for_idle(smoke, smoke_id, idle_gap_seconds, timeout_seconds) + wait_for( + "resumed smoke stream to remain registered", lambda: smoke_stream_still_present(smoke_id), timeout_seconds + ) + wait_for( + "resumed smoke stream still routed through Mini EQ", smoke_stream_targets_virtual_sink, timeout_seconds + ) + wait_for( + "Mini EQ processing path links to become active after paused resume", + processing_path_has_active_links, + timeout_seconds, + ) - output, _stderr = app.communicate(timeout=max(duration_seconds + timeout_seconds, timeout_seconds)) + if smoke_target is not None: + print("## Flatpak tracked stream relink recovery", flush=True) + force_stream_target(smoke_id, smoke_target, timeout_seconds) + wait_for( + "relinked smoke stream routed back through Mini EQ", smoke_stream_targets_virtual_sink, timeout_seconds + ) + wait_for( + "Mini EQ processing path links to become active after tracked stream relink", + processing_path_has_active_links, + timeout_seconds, + ) + relink_recovery_checked = True + + output, _stderr = app.communicate(timeout=max(app_duration + timeout_seconds, timeout_seconds)) print(output.rstrip(), flush=True) if app.returncode != 0: raise RuntimeError(f"Mini EQ Flatpak exited with status {app.returncode}") @@ -364,7 +515,11 @@ def smoke_stream_restored() -> bool: else: print("Mini EQ PipeWire manager access client was not observed; routing behavior was verified.", flush=True) - print("Flatpak runtime smoke passed: stream routing and restore behavior verified.") + relink_summary = "tracked stream relink recovery, " if relink_recovery_checked else "" + print( + "Flatpak runtime smoke passed: stream routing, idle stream recreation, " + f"paused stream resume, {relink_summary}active processing links, and restore behavior verified." + ) finally: if app is not None: stop_process(app, "Mini EQ Flatpak") @@ -386,7 +541,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace: "--duration", type=float, default=8.0, - help="How long to keep the headless Mini EQ app running during the routing check.", + help="Minimum seconds to keep the headless Mini EQ app running; idle phases may extend this.", ) parser.add_argument( "--timeout", @@ -398,7 +553,13 @@ def parse_args(argv: list[str]) -> argparse.Namespace: "--smoke-target", type=pipewire_node_target, default=None, - help="Optional PipeWire node target for the silent smoke stream.", + help="Optional PipeWire node target for the silent smoke stream; enables tracked relink recovery coverage.", + ) + parser.add_argument( + "--idle-gap", + type=float, + default=8.0, + help=("Seconds to keep Mini EQ routed during the streamless recreation and paused persistent-stream phases."), ) return parser.parse_args(argv) @@ -408,7 +569,7 @@ def main(argv: list[str] | None = None) -> int: try: require_tools("flatpak", "pw-cat", "pw-dump", "pw-metadata") - run_runtime_smoke(args.app_ref, args.duration, args.timeout, args.smoke_target) + run_runtime_smoke(args.app_ref, args.duration, args.timeout, args.smoke_target, args.idle_gap) except subprocess.CalledProcessError as exc: if exc.stdout: sys.stderr.write(exc.stdout) diff --git a/tools/check_headless_pipewire_runtime.py b/tools/check_headless_pipewire_runtime.py index 24ad6f6..5879f55 100755 --- a/tools/check_headless_pipewire_runtime.py +++ b/tools/check_headless_pipewire_runtime.py @@ -4,6 +4,7 @@ import argparse import os import shutil +import signal import subprocess import sys import tempfile @@ -19,6 +20,9 @@ HELPER_SKIP_EXIT_CODE = live.HELPER_SKIP_EXIT_CODE REPO_ROOT = Path(__file__).resolve().parents[1] +HOTPLUG_SINK_NAME = "ci_hotplug_sink" +ALSA_NULL_SINK_NAME = "ci_alsa_null_sink" +MAX_CONTEXT_DRAIN_ITERATIONS = 250 def format_command(command: list[str | Path]) -> str: @@ -35,6 +39,13 @@ def ensure_source_path() -> None: sys.path.insert(0, src_path) +def drain_main_context(context: Any, *, max_iterations: int = MAX_CONTEXT_DRAIN_ITERATIONS) -> None: + for _ in range(max_iterations): + if not context.pending(): + return + context.iteration(False) + + def dispatch_until( label: str, predicate: Callable[[], Any], @@ -48,8 +59,7 @@ def dispatch_until( last_error: Exception | None = None while time.monotonic() < deadline: - while context.pending(): - context.iteration(False) + drain_main_context(context) try: value = predicate() @@ -61,8 +71,7 @@ def dispatch_until( time.sleep(interval_seconds) - while context.pending(): - context.iteration(False) + drain_main_context(context) detail = f": {last_error}" if last_error is not None else "" raise RuntimeError(f"timed out waiting for {label}{detail}") @@ -117,6 +126,135 @@ def wait_for_processing_path_active( ) +def dynamic_sink_properties(sink_name: str) -> str: + return ( + "{ " + "factory.name = support.null-audio-sink " + f'node.name = "{sink_name}" ' + 'node.description = "CI Hotplug Sink" ' + 'media.class = "Audio/Sink" ' + "object.linger = true " + 'audio.position = "FL,FR" ' + "session.suspend-timeout-seconds = 1 " + "adapter.auto-port-config = { " + "mode = dsp " + "monitor = true " + "position = preserve " + "} " + "}" + ) + + +def alsa_null_sink_properties(sink_name: str) -> str: + return ( + "{ " + "factory.name = api.alsa.pcm.sink " + f'node.name = "{sink_name}" ' + 'node.description = "CI ALSA Null Sink" ' + 'media.class = "Audio/Sink" ' + 'api.alsa.path = "null" ' + 'audio.format = "S16LE" ' + "audio.rate = 48000 " + 'audio.position = "FL,FR" ' + "object.linger = true " + "session.suspend-timeout-seconds = 1 " + "adapter.auto-port-config = { " + "mode = dsp " + "monitor = true " + "position = preserve " + "} " + "}" + ) + + +def create_dynamic_sink(sink_name: str, timeout_seconds: float) -> dict[str, Any]: + if live.node_by_name(sink_name) is not None: + destroy_dynamic_sink(sink_name, timeout_seconds) + + command = ["pw-cli", "create-node", "adapter", dynamic_sink_properties(sink_name)] + print(f"$ {format_command(command)}", flush=True) + subprocess.run(command, check=True, text=True, stdout=subprocess.DEVNULL) + return dispatch_until(sink_name, lambda: live.node_by_name(sink_name), timeout_seconds) + + +def create_alsa_null_sink(sink_name: str, timeout_seconds: float) -> dict[str, Any] | None: + if live.node_by_name(sink_name) is not None: + destroy_dynamic_sink(sink_name, timeout_seconds) + + command = ["pw-cli", "create-node", "adapter", alsa_null_sink_properties(sink_name)] + print(f"$ {format_command(command)}", flush=True) + result = subprocess.run(command, check=False, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + if result.returncode != 0: + output = result.stdout.strip() + detail = f": {output}" if output else "" + print(f"## skipping ALSA null output phase; api.alsa.pcm.sink is unavailable{detail}", flush=True) + return None + + return dispatch_until(sink_name, lambda: live.node_by_name(sink_name), timeout_seconds) + + +def destroy_dynamic_sink(sink_name: str, timeout_seconds: float) -> None: + sink = live.node_by_name(sink_name) + if sink is None: + return + + sink_id = live.node_id(sink) + command = ["pw-cli", "destroy", str(sink_id)] + print(f"$ {format_command(command)}", flush=True) + subprocess.run(command, check=True, text=True, stdout=subprocess.DEVNULL) + dispatch_until(f"{sink_name} to disappear", lambda: live.node_by_name(sink_name) is None, timeout_seconds) + + +def switch_default_output_and_wait( + controller, + sink_name: str, + filter_output_name: str, + timeout_seconds: float, +) -> str: + sink = dispatch_until( + sink_name, + lambda: live.node_by_name(sink_name), + timeout_seconds, + ) + sink_serial = live.object_serial(sink) + live.set_configured_default_sink_name(sink_name, timeout_seconds) + try: + dispatch_until( + f"Mini EQ controller followed {sink_name}", + lambda: controller.output_sink == sink_name, + timeout_seconds, + ) + except RuntimeError: + print( + f"Controller default-follow state after {sink_name} default move: " + f"{describe_controller_default_follow_state(controller)}", + flush=True, + ) + raise + dispatch_until( + f"{filter_output_name} target.object metadata to point at {sink_serial}", + lambda: node_targets_serial(filter_output_name, sink_serial), + timeout_seconds, + ) + return sink_serial + + +def wait_for_stream_routed_and_processing( + smoke_id: int, + virtual_sink_name: str, + filter_output_name: str, + timeout_seconds: float, + label: str, +) -> str: + virtual_serial = dispatch_until( + f"synthetic stream routed to current Mini EQ virtual sink {label}", + lambda: route_to_current_virtual(smoke_id, virtual_sink_name), + timeout_seconds, + ) + wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) + return virtual_serial + + def bad_monitor_source_nodes() -> list[dict[str, Any]]: return [ live.item_props(node) @@ -149,6 +287,81 @@ def ready() -> bool: dispatch_until("Mini EQ controller ready and routed", ready, timeout_seconds) +def wait_for_smoke_stream(smoke: subprocess.Popen[str], timeout_seconds: float) -> int: + smoke_node = dispatch_until( + "synthetic PipeWire playback stream", + lambda: live.smoke_stream_node() if smoke.poll() is None else None, + timeout_seconds, + ) + return live.node_id(smoke_node) + + +def wait_for_smoke_routed(smoke_id: int, virtual_sink_name: str, timeout_seconds: float) -> str: + return dispatch_until( + "synthetic stream routed to current Mini EQ virtual sink", + lambda: route_to_current_virtual(smoke_id, virtual_sink_name), + timeout_seconds, + ) + + +def force_stream_target(smoke_id: int, target_sink_name: str, timeout_seconds: float) -> str: + target_sink = dispatch_until( + f"PipeWire sink {target_sink_name}", + lambda: live.node_by_name(target_sink_name), + timeout_seconds, + ) + target_id = live.node_id(target_sink) + target_serial = live.object_serial(target_sink) + for key, value in (("target.node", str(target_id)), ("target.object", target_serial)): + subprocess.run( + ["pw-metadata", "-n", "default", str(smoke_id), key, value, "Spa:Id"], + check=True, + text=True, + stdout=subprocess.DEVNULL, + ) + return target_serial + + +def stop_smoke_stream(smoke: subprocess.Popen[str], timeout_seconds: float) -> None: + live.terminate_process(smoke, "pw-cat synthetic stream") + dispatch_until("synthetic stream to disappear", lambda: live.smoke_stream_node() is None, timeout_seconds) + + +def smoke_stream_still_present(smoke_id: int) -> bool: + smoke_node = live.smoke_stream_node() + return smoke_node is not None and live.node_id(smoke_node) == smoke_id + + +def wait_for_idle_gap(label: str, idle_gap_seconds: float) -> None: + idle_deadline = time.monotonic() + idle_gap_seconds + dispatch_until(label, lambda: time.monotonic() >= idle_deadline, idle_gap_seconds + 1.0) + + +def pause_smoke_stream_for_idle( + smoke: subprocess.Popen[str], + smoke_id: int, + idle_gap_seconds: float, + timeout_seconds: float, + during_idle: Callable[[], None] | None = None, +) -> None: + if smoke.poll() is not None: + raise RuntimeError("synthetic stream exited before it could be paused") + + smoke.send_signal(signal.SIGSTOP) + try: + dispatch_until( + "paused synthetic stream to remain registered", + lambda: smoke_stream_still_present(smoke_id), + timeout_seconds, + ) + wait_for_idle_gap("idle gap with paused synthetic stream", idle_gap_seconds) + if during_idle is not None: + during_idle() + finally: + if smoke.poll() is None: + smoke.send_signal(signal.SIGCONT) + + def describe_controller_default_follow_state(controller) -> str: try: cached_defaults = controller.output_backend.defaults() @@ -181,7 +394,163 @@ def describe_controller_default_follow_state(controller) -> str: ) -def run_controller_flow(*, tmp_dir: Path, timeout_seconds: float, cycles: int, audio_duration: float) -> None: +def run_dynamic_hotplug_recovery_phase( + *, + controller, + smoke: subprocess.Popen[str], + smoke_id: int, + virtual_sink_name: str, + filter_output_name: str, + idle_gap_seconds: float, + timeout_seconds: float, +) -> str: + controller.set_analyzer_enabled(False) + print("## headless dynamic output hotplug recovery with monitor off", flush=True) + + hotplug_sink = create_dynamic_sink(HOTPLUG_SINK_NAME, timeout_seconds) + first_hotplug_serial = live.object_serial(hotplug_sink) + switch_default_output_and_wait(controller, HOTPLUG_SINK_NAME, filter_output_name, timeout_seconds) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after dynamic output move", + ) + + def remove_hotplug_and_select_primary() -> None: + destroy_dynamic_sink(HOTPLUG_SINK_NAME, timeout_seconds) + switch_default_output_and_wait(controller, live.PRIMARY_SINK_NAME, filter_output_name, timeout_seconds) + + pause_smoke_stream_for_idle( + smoke, + smoke_id, + idle_gap_seconds, + timeout_seconds, + during_idle=remove_hotplug_and_select_primary, + ) + dispatch_until( + "resumed synthetic stream to remain registered after dynamic output removal", + lambda: smoke_stream_still_present(smoke_id), + timeout_seconds, + ) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after dynamic output removal with monitor off", + ) + + hotplug_sink = create_dynamic_sink(HOTPLUG_SINK_NAME, timeout_seconds) + second_hotplug_serial = live.object_serial(hotplug_sink) + if second_hotplug_serial == first_hotplug_serial: + raise RuntimeError(f"{HOTPLUG_SINK_NAME} was recreated without a new object.serial") + + switch_default_output_and_wait(controller, HOTPLUG_SINK_NAME, filter_output_name, timeout_seconds) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after dynamic output reappeared with new serial", + ) + switch_default_output_and_wait(controller, live.PRIMARY_SINK_NAME, filter_output_name, timeout_seconds) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after restoring primary from dynamic output", + ) + destroy_dynamic_sink(HOTPLUG_SINK_NAME, timeout_seconds) + return virtual_serial + + +def run_alsa_null_output_phase( + *, + controller, + smoke: subprocess.Popen[str], + smoke_id: int, + virtual_sink_name: str, + filter_output_name: str, + idle_gap_seconds: float, + timeout_seconds: float, +) -> tuple[str, bool]: + controller.set_analyzer_enabled(False) + print("## headless ALSA-backed null output recovery with monitor off", flush=True) + + alsa_sink = create_alsa_null_sink(ALSA_NULL_SINK_NAME, timeout_seconds) + if alsa_sink is None: + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after skipped ALSA-backed null output phase", + ) + return virtual_serial, False + + try: + switch_default_output_and_wait(controller, ALSA_NULL_SINK_NAME, filter_output_name, timeout_seconds) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after ALSA-backed null output move", + ) + + pause_smoke_stream_for_idle(smoke, smoke_id, idle_gap_seconds, timeout_seconds) + dispatch_until( + "resumed synthetic stream to remain registered after ALSA-backed output idle", + lambda: smoke_stream_still_present(smoke_id), + timeout_seconds, + ) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after ALSA-backed null output idle", + ) + + def remove_alsa_and_select_primary() -> None: + destroy_dynamic_sink(ALSA_NULL_SINK_NAME, timeout_seconds) + switch_default_output_and_wait(controller, live.PRIMARY_SINK_NAME, filter_output_name, timeout_seconds) + + pause_smoke_stream_for_idle( + smoke, + smoke_id, + idle_gap_seconds, + timeout_seconds, + during_idle=remove_alsa_and_select_primary, + ) + dispatch_until( + "resumed synthetic stream to remain registered after ALSA-backed output removal", + lambda: smoke_stream_still_present(smoke_id), + timeout_seconds, + ) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after ALSA-backed null output removal", + ) + return virtual_serial, True + finally: + destroy_dynamic_sink(ALSA_NULL_SINK_NAME, timeout_seconds) + + +def run_controller_flow( + *, + tmp_dir: Path, + timeout_seconds: float, + cycles: int, + audio_duration: float, + idle_gap_seconds: float, +) -> None: ensure_source_path() from mini_eq.routing import SystemWideEqController @@ -190,17 +559,12 @@ def run_controller_flow(*, tmp_dir: Path, timeout_seconds: float, cycles: int, a live.verify_pipewire_gobject_probe(timeout_seconds) audio_file = live.create_sine_wav(tmp_dir / "mini-eq-headless-pipewire-smoke.wav", audio_duration) - smoke = live.start_smoke_stream(audio_file) + smoke: subprocess.Popen[str] | None = live.start_smoke_stream(audio_file) controller = None statuses: list[str] = [] try: - smoke_node = dispatch_until( - "synthetic PipeWire playback stream", - lambda: live.smoke_stream_node() if smoke.poll() is None else None, - timeout_seconds, - ) - smoke_id = live.node_id(smoke_node) + smoke_id = wait_for_smoke_stream(smoke, timeout_seconds) controller = SystemWideEqController(None) controller.set_status_callback(statuses.append) @@ -208,12 +572,121 @@ def run_controller_flow(*, tmp_dir: Path, timeout_seconds: float, cycles: int, a virtual_sink_name = controller.virtual_sink_name filter_output_name = controller.filter_output_name - virtual_serial = dispatch_until( - "synthetic stream routed to current Mini EQ virtual sink", - lambda: route_to_current_virtual(smoke_id, virtual_sink_name), + virtual_serial = wait_for_smoke_routed(smoke_id, virtual_sink_name, timeout_seconds) + wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) + + print("## headless idle stream recreation", flush=True) + stop_smoke_stream(smoke, timeout_seconds) + smoke = None + wait_for_idle_gap("idle gap after synthetic stream cleanup", idle_gap_seconds) + smoke = live.start_smoke_stream(audio_file) + smoke_id = wait_for_smoke_stream(smoke, timeout_seconds) + virtual_serial = wait_for_smoke_routed(smoke_id, virtual_sink_name, timeout_seconds) + wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) + + print("## headless paused stream resume", flush=True) + pause_smoke_stream_for_idle(smoke, smoke_id, idle_gap_seconds, timeout_seconds) + dispatch_until( + "resumed synthetic stream to remain registered", + lambda: smoke_stream_still_present(smoke_id), timeout_seconds, ) - wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after paused resume", + ) + + print("## headless tracked stream relink recovery", flush=True) + force_stream_target(smoke_id, live.PRIMARY_SINK_NAME, timeout_seconds) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after tracked stream relink", + ) + + controller.set_analyzer_enabled(False) + print("## headless paused output switch recovery with monitor off", flush=True) + + def switch_to_alt_output() -> None: + switch_default_output_and_wait( + controller, + live.ALT_SINK_NAME, + filter_output_name, + timeout_seconds, + ) + + pause_smoke_stream_for_idle( + smoke, + smoke_id, + idle_gap_seconds, + timeout_seconds, + during_idle=switch_to_alt_output, + ) + dispatch_until( + "resumed synthetic stream to remain registered after paused output move", + lambda: smoke_stream_still_present(smoke_id), + timeout_seconds, + ) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after paused output move with monitor off", + ) + + def switch_to_primary_output() -> None: + switch_default_output_and_wait( + controller, + live.PRIMARY_SINK_NAME, + filter_output_name, + timeout_seconds, + ) + + pause_smoke_stream_for_idle( + smoke, + smoke_id, + idle_gap_seconds, + timeout_seconds, + during_idle=switch_to_primary_output, + ) + dispatch_until( + "resumed synthetic stream to remain registered after paused output restore", + lambda: smoke_stream_still_present(smoke_id), + timeout_seconds, + ) + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, + timeout_seconds, + "after paused output restore with monitor off", + ) + + virtual_serial = run_dynamic_hotplug_recovery_phase( + controller=controller, + smoke=smoke, + smoke_id=smoke_id, + virtual_sink_name=virtual_sink_name, + filter_output_name=filter_output_name, + idle_gap_seconds=idle_gap_seconds, + timeout_seconds=timeout_seconds, + ) + + virtual_serial, alsa_null_phase_verified = run_alsa_null_output_phase( + controller=controller, + smoke=smoke, + smoke_id=smoke_id, + virtual_sink_name=virtual_sink_name, + filter_output_name=filter_output_name, + idle_gap_seconds=idle_gap_seconds, + timeout_seconds=timeout_seconds, + ) for cycle in range(cycles): print(f"## headless route toggle cycle {cycle + 1}/{cycles}", flush=True) @@ -225,68 +698,41 @@ def run_controller_flow(*, tmp_dir: Path, timeout_seconds: float, cycles: int, a ) controller.route_system_audio(True, announce=False) - virtual_serial = dispatch_until( - "synthetic stream rerouted to current Mini EQ virtual sink", - lambda: route_to_current_virtual(smoke_id, virtual_sink_name), + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, timeout_seconds, + "after route toggle", ) - wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) - alt_sink = dispatch_until( + switch_default_output_and_wait( + controller, live.ALT_SINK_NAME, - lambda: live.node_by_name(live.ALT_SINK_NAME), + filter_output_name, timeout_seconds, ) - alt_serial = live.object_serial(alt_sink) - live.set_configured_default_sink_name(live.ALT_SINK_NAME, timeout_seconds) - try: - dispatch_until( - f"Mini EQ controller followed {live.ALT_SINK_NAME}", - lambda: controller.output_sink == live.ALT_SINK_NAME, - timeout_seconds, - ) - except RuntimeError: - print( - "Controller default-follow state after alt default move: " - f"{describe_controller_default_follow_state(controller)}", - flush=True, - ) - raise - dispatch_until( - f"{filter_output_name} target.object metadata to point at {alt_serial}", - lambda: node_targets_serial(filter_output_name, alt_serial), - timeout_seconds, - ) - virtual_serial = dispatch_until( - "synthetic stream stayed routed to current Mini EQ virtual sink after output move", - lambda: route_to_current_virtual(smoke_id, virtual_sink_name), + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, timeout_seconds, + "after output move", ) - wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) - primary_sink = dispatch_until( + switch_default_output_and_wait( + controller, live.PRIMARY_SINK_NAME, - lambda: live.node_by_name(live.PRIMARY_SINK_NAME), + filter_output_name, timeout_seconds, ) - primary_serial = live.object_serial(primary_sink) - live.set_configured_default_sink_name(live.PRIMARY_SINK_NAME, timeout_seconds) - dispatch_until( - f"Mini EQ controller followed {live.PRIMARY_SINK_NAME}", - lambda: controller.output_sink == live.PRIMARY_SINK_NAME, + virtual_serial = wait_for_stream_routed_and_processing( + smoke_id, + virtual_sink_name, + filter_output_name, timeout_seconds, + "after output restore", ) - dispatch_until( - f"{filter_output_name} target.object metadata to point at {primary_serial}", - lambda: node_targets_serial(filter_output_name, primary_serial), - timeout_seconds, - ) - virtual_serial = dispatch_until( - "synthetic stream stayed routed to current Mini EQ virtual sink after output restore", - lambda: route_to_current_virtual(smoke_id, virtual_sink_name), - timeout_seconds, - ) - wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) controller.set_analyzer_enabled(False) for cycle in range(cycles): @@ -315,13 +761,20 @@ def run_controller_flow(*, tmp_dir: Path, timeout_seconds: float, cycles: int, a ) wait_for_processing_path_active(virtual_sink_name, filter_output_name, timeout_seconds) - live.terminate_process(smoke, "pw-cat synthetic stream") + stop_smoke_stream(smoke, timeout_seconds) smoke = None - dispatch_until("synthetic stream to disappear", lambda: live.smoke_stream_node() is None, timeout_seconds) + + optional_phase_labels = [] + if alsa_null_phase_verified: + optional_phase_labels.append("ALSA-backed null output recovery") + optional_phases = f"{', '.join(optional_phase_labels)}, " if optional_phase_labels else "" print( "Headless PipeWire runtime smoke passed: synthetic stream routing, route toggles, " - "default-output moves, active processing links, monitor toggles, and stream cleanup verified." + "idle stream recreation, paused stream resume, tracked stream relink recovery, " + "paused default-output moves with monitor off, dynamic output hotplug recovery, " + f"{optional_phases}active processing links, " + "monitor toggles, and stream cleanup verified." ) if statuses: print("Controller status trace:", flush=True) @@ -339,13 +792,14 @@ def start_pipewire_session( pipewire, wireplumber = live.start_pipewire_processes(tmp_dir) live.wait_for_sink(live.PRIMARY_SINK_NAME, timeout_seconds) live.wait_for_sink(live.ALT_SINK_NAME, timeout_seconds) - live.wait_for("WirePlumber default output metadata", live.default_output_metadata_is_ready, timeout_seconds) + live.wait_for("WirePlumber default metadata", live.default_metadata_is_ready, timeout_seconds) + live.set_configured_default_sink_name(live.PRIMARY_SINK_NAME, timeout_seconds) return pipewire, wireplumber def run_helper(_args: argparse.Namespace) -> int: try: - for tool in ("pipewire", "wireplumber", "wpctl", "pw-cat", "pw-dump", "pw-metadata"): + for tool in ("pipewire", "wireplumber", "wpctl", "pw-cat", "pw-cli", "pw-dump", "pw-metadata"): require_tool(tool) except RuntimeError as exc: print(str(exc), file=sys.stderr) @@ -354,6 +808,7 @@ def run_helper(_args: argparse.Namespace) -> int: timeout_seconds = float(os.environ["MINI_EQ_HEADLESS_PIPEWIRE_TIMEOUT"]) cycles = int(os.environ["MINI_EQ_HEADLESS_PIPEWIRE_CYCLES"]) audio_duration = float(os.environ["MINI_EQ_HEADLESS_PIPEWIRE_AUDIO_DURATION"]) + idle_gap_seconds = float(os.environ["MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP"]) pipewire: subprocess.Popen[str] | None = None wireplumber: subprocess.Popen[str] | None = None @@ -381,6 +836,7 @@ def run_helper(_args: argparse.Namespace) -> int: timeout_seconds=timeout_seconds, cycles=cycles, audio_duration=audio_duration, + idle_gap_seconds=idle_gap_seconds, ) finally: live.terminate_process(wireplumber, "WirePlumber") @@ -401,6 +857,7 @@ def run_parent(args: argparse.Namespace) -> int: env["MINI_EQ_HEADLESS_PIPEWIRE_TIMEOUT"] = str(args.timeout) env["MINI_EQ_HEADLESS_PIPEWIRE_CYCLES"] = str(args.cycles) env["MINI_EQ_HEADLESS_PIPEWIRE_AUDIO_DURATION"] = str(args.audio_duration) + env["MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP"] = str(args.idle_gap) env["PYTHONUNBUFFERED"] = "1" env.pop("DISPLAY", None) env.pop("WAYLAND_DISPLAY", None) @@ -433,6 +890,12 @@ def parse_args(argv: list[str]) -> argparse.Namespace: default=120.0, help="Duration of the generated sine-wave playback stream.", ) + parser.add_argument( + "--idle-gap", + type=float, + default=8.0, + help=("Seconds to keep Mini EQ routed during the streamless recreation and paused persistent-stream phases."), + ) return parser.parse_args(argv) diff --git a/tools/check_live_ui_runtime.py b/tools/check_live_ui_runtime.py index f4cb059..4eada15 100755 --- a/tools/check_live_ui_runtime.py +++ b/tools/check_live_ui_runtime.py @@ -229,16 +229,22 @@ def set_configured_default_sink_name(sink_name: str, timeout_seconds: float) -> lambda: node_by_name(sink_name), timeout_seconds, ) - subprocess.run( - [ - "wpctl", - "set-default", - str(node_id(sink)), - ], - check=True, - text=True, - stdout=subprocess.DEVNULL, - ) + + def set_default() -> bool: + result = subprocess.run( + [ + "wpctl", + "set-default", + str(node_id(sink)), + ], + check=False, + text=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return result.returncode == 0 + + wait_for(f"WirePlumber to accept {sink_name} as default sink", set_default, timeout_seconds) wait_for( f"PipeWire configured default sink metadata to become {sink_name}", lambda: configured_default_sink_name() == sink_name, diff --git a/tools/release_preflight.py b/tools/release_preflight.py index d992592..1fe5d21 100755 --- a/tools/release_preflight.py +++ b/tools/release_preflight.py @@ -485,6 +485,7 @@ def run_headless_pipewire_runtime_smoke(python: Path) -> None: minimum=1, maximum=3600, ) + idle_gap = bounded_int_env("MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP", 8, minimum=0, maximum=600) run( [ python, @@ -495,6 +496,8 @@ def run_headless_pipewire_runtime_smoke(python: Path) -> None: cycles, "--audio-duration", audio_duration, + "--idle-gap", + idle_gap, ] ) diff --git a/tools/run_flatpak_runtime_smoke_ci.sh b/tools/run_flatpak_runtime_smoke_ci.sh index a3c955e..0c3a055 100755 --- a/tools/run_flatpak_runtime_smoke_ci.sh +++ b/tools/run_flatpak_runtime_smoke_ci.sh @@ -4,6 +4,7 @@ set -euo pipefail manifest="${MINI_EQ_FLATPAK_MANIFEST:-io.github.bhack.mini-eq.yaml}" duration="${MINI_EQ_FLATPAK_SMOKE_DURATION:-8}" timeout="${MINI_EQ_FLATPAK_SMOKE_TIMEOUT:-20}" +idle_gap="${MINI_EQ_FLATPAK_SMOKE_IDLE_GAP:-8}" build_flatpak="${MINI_EQ_FLATPAK_BUILD:-1}" install_remote="${MINI_EQ_FLATPAK_INSTALL_REMOTE:-0}" install_ref="${MINI_EQ_FLATPAK_INSTALL_REF:-io.github.bhack.mini-eq}" @@ -87,6 +88,7 @@ export MINI_EQ_FLATPAK_MANIFEST="$manifest" export MINI_EQ_FLATPAK_APP_REF="$app_ref" export MINI_EQ_FLATPAK_SMOKE_DURATION="$duration" export MINI_EQ_FLATPAK_SMOKE_TIMEOUT="$timeout" +export MINI_EQ_FLATPAK_SMOKE_IDLE_GAP="$idle_gap" export MINI_EQ_FLATPAK_BUILD="$build_flatpak" export MINI_EQ_FLATPAK_INSTALL_REMOTE="$install_remote" export MINI_EQ_FLATPAK_INSTALL_REF="$install_ref" @@ -172,5 +174,6 @@ python3 tools/check_flatpak_runtime.py \ --app-ref "$MINI_EQ_FLATPAK_APP_REF" \ --duration "$MINI_EQ_FLATPAK_SMOKE_DURATION" \ --timeout "$MINI_EQ_FLATPAK_SMOKE_TIMEOUT" \ + --idle-gap "$MINI_EQ_FLATPAK_SMOKE_IDLE_GAP" \ --smoke-target ci_null_sink SH diff --git a/tools/run_headless_pipewire_runtime_smoke_ci.sh b/tools/run_headless_pipewire_runtime_smoke_ci.sh index dfa8f17..4429255 100755 --- a/tools/run_headless_pipewire_runtime_smoke_ci.sh +++ b/tools/run_headless_pipewire_runtime_smoke_ci.sh @@ -5,6 +5,7 @@ venv="${MINI_EQ_HEADLESS_PIPEWIRE_VENV:-${RUNNER_TEMP:-/tmp}/mini-eq-headless-pi timeout="${MINI_EQ_HEADLESS_PIPEWIRE_TIMEOUT:-90}" cycles="${MINI_EQ_HEADLESS_PIPEWIRE_CYCLES:-3}" audio_duration="${MINI_EQ_HEADLESS_PIPEWIRE_AUDIO_DURATION:-180}" +idle_gap="${MINI_EQ_HEADLESS_PIPEWIRE_IDLE_GAP:-8}" repo_root="$(git rev-parse --show-toplevel)" cd "$repo_root" @@ -33,4 +34,5 @@ PY "$venv/bin/python" tools/check_headless_pipewire_runtime.py \ --timeout "$timeout" \ --cycles "$cycles" \ - --audio-duration "$audio_duration" + --audio-duration "$audio_duration" \ + --idle-gap "$idle_gap"