From 772310fed91c8217c399c32f8648e8d7e0246f9e Mon Sep 17 00:00:00 2001 From: Sourav Pati Date: Thu, 21 May 2026 10:34:08 -0700 Subject: [PATCH 1/2] test: add scalable fleet integration coverage Adds NATS-backed large-fleet integration coverage for discovery, schema compaction and drill-down, heterogeneous fleets, invoke_many, ESTOP aliasing, broadcast, where, fire_at, and correlation subscriptions. Also documents release test coverage and addresses subscription and predicate behavior needed by the comprehensive scenarios. --- docs/discovery-release-test-summary.md | 66 +++ docs/discovery.md | 42 +- .../device_connect_agent_tools/tools.py | 56 ++- .../tests/test_subscribe.py | 7 + .../device_connect_edge/device.py | 51 ++- .../tests/test_device_where.py | 67 ++++ packages/device-connect-server/README.md | 13 +- tests/fixtures/devices.py | 32 +- tests/fixtures/infrastructure.py | 59 +-- tests/tests/test_tools_broadcast.py | 87 +++- tests/tests/test_tools_invoke.py | 98 ++++- .../tests/test_tools_large_fleet_broadcast.py | 297 ++++++++++++++ .../tests/test_tools_large_fleet_discovery.py | 360 +++++++++++++++++ .../test_tools_large_fleet_heterogeneous.py | 310 ++++++++++++++ tests/tests/test_tools_large_fleet_invoke.py | 377 ++++++++++++++++++ tests/tests/test_tools_selector.py | 13 +- 16 files changed, 1891 insertions(+), 44 deletions(-) create mode 100644 docs/discovery-release-test-summary.md create mode 100644 packages/device-connect-edge/tests/test_device_where.py create mode 100644 tests/tests/test_tools_large_fleet_broadcast.py create mode 100644 tests/tests/test_tools_large_fleet_discovery.py create mode 100644 tests/tests/test_tools_large_fleet_heterogeneous.py create mode 100644 tests/tests/test_tools_large_fleet_invoke.py diff --git a/docs/discovery-release-test-summary.md b/docs/discovery-release-test-summary.md new file mode 100644 index 0000000..f569f7e --- /dev/null +++ b/docs/discovery-release-test-summary.md @@ -0,0 +1,66 @@ +# Discovery and Operations Release Test Summary + +Date: 2026-05-21 + +This document summarizes the test coverage behind the discovery and operations +release. It is written as source material for release notes and public release +documentation. + +## Summary + +The release has strong automated coverage for selector-based discovery, label +vocabulary, pagination, compact and expanded result shapes, synchronous +invocation, async broadcast, subscriptions, and large simulated fleets. The +largest real integration runs use 200 NATS-backed simulated devices by default. + +## Evidence + +| Evidence | Scope | Result | +|---|---|---| +| Agent-tools unit tests | `discover`, `discover_labels`, `invoke`, `invoke_many`, `broadcast`, `subscribe`, `await_replies`, error envelopes, truncation, safety warnings | `94 passed in 0.42s` | +| Edge predicate unit tests | Edge-side `where` dependency warning, bounded predicate evaluation, CEL compile/eval behavior | `19 passed in 0.23s` | +| Broadcast/subscription integration tests | Broadcast replies, `where`, bindings, `fire_at`, `on_late`, safety warning, live and snapshot event subscriptions | `12 passed, 12 skipped in 49.53s` on NATS run | +| Large-fleet integration suite | 200-device discovery, pagination, truncation, heterogeneous fleets, ESTOP alias, `invoke_many`, broadcast, `where`, `fire_at`, correlation subscribe | `18 passed in 37.05s` | +| Integration inventory | Hierarchical tools, selector discovery, invoke, broadcast, subscribe, large-fleet tests across NATS/Zenoh where applicable | `144 collected` | + +## Coverage Matrix + +| Area | What Was Tested | Representative Tests | Confidence | +|---|---|---|---| +| Hierarchical discovery compatibility | `describe_fleet`, `list_devices`, `get_device_functions`, small-fleet expansion, pagination, missing device, status/function counts, deprecation warnings | `test_tools_hierarchical.py`, `packages/.../test_discover.py` | Medium. Compatibility path is covered on small fleets; large-fleet stress is on the selector APIs. | +| Selector discovery | Device, function, and event scopes; bare names; globs; `*`; `key:value`; OR within key; AND across keys; case sensitivity; invalid selectors | `test_tools_selector.py`, `packages/.../test_discover.py` | High. Core selector behavior is covered in unit and integration tests. | +| Labels and truncation | Multi-axis `discover_labels`, per-key pagination, `label_histogram`, `more`, multivalued labels, compact broad rows, expanded drill-down schemas | `test_long_tail_label_histogram_reports_more`, `test_per_key_label_drill_down_bypasses_truncation`, `TestLongTailTruncation` | High for device/function labels; event-label scale remains thinner. | +| Function schemas | Schemas returned for narrow function result sets; broad function queries compact above threshold; drill-down returns `parameters` | `test_large_function_set_stays_compact_and_supports_drill_down`, `test_heterogeneous_discovery_outputs_expected_matrix` | High. Verifies progressive narrowing before returning schemas. | +| Heterogeneous fleets | Sensors, cameras, and robots with different functions/labels; exact category/function histograms; exact `(device_id, function)` matrix | `test_heterogeneous_discovery_outputs_expected_matrix` | High for mixed function discovery. | +| `invoke` | Single-target success, no-match, ambiguous-match, invalid scope, event-scope rejection, JSON-RPC/connection errors, bounded ambiguity preview | `test_tools_invoke.py`, `packages/.../test_invoke.py`, `test_invoke_ambiguity_stays_bounded_in_large_fleet` | High. Exactly-one semantics and common error cases are covered. | +| `invoke_many` | Fan-out success, zero candidates, function-only selector, partial failures, per-target timeout forwarding, concurrency cap, heterogeneous target isolation | `test_invoke_many_partial_failure_accounting_at_scale`, `test_heterogeneous_invoke_many_targets_only_matching_functions` | High. Sync fan-out behavior is covered at small and large scale. | +| ESTOP alias | `function(estop)` discovery and `invoke_many` fan-out target only functions named `estop`, excluding unrelated `safety:critical` decoys | `test_invoke_many_estop_alias_targets_only_estop_functions_at_scale` | Medium-high. API selector/fan-out behavior is covered; this is not a physical safety certification. | +| `broadcast` | Correlation envelope, candidate count, target envelope, zero matches, invalid scope, invalid predicate, publish failure, safety warning, large fan-out replies | `test_broadcast_large_fan_out_returns_correlation_and_target_count`, `test_heterogeneous_broadcast_replies_only_from_matching_functions` | High for async fan-out core. | +| `where` and bindings | CEL predicate filtering, namespaced `bindings`, edge self-election at scale, startup warning for missing predicate support, bounded fail-closed evaluation | `test_broadcast_where_self_election_narrows_large_candidates`, `test_broadcast_where_with_bindings`, `test_device_where.py` | High for current predicate behavior. | +| `fire_at` | Scheduled fan-out, `actually_fired_at` spread, late `on_late=skip` behavior | `test_broadcast_fire_at_synchronizes_large_fan_out`, `test_broadcast_fire_at_late_with_skip_drops` | Medium-high. Large-fleet happy path and small-fleet late/skip behavior are covered. | +| Subscriptions and replies | `subscribe("correlation:")`, live top-level `event(...)`, snapshot `device(...).event(...)`, iterator protocol, race-safe buffer drain, `await_replies` partial/complete collection | `test_subscribe_correlation_drains_large_reply_stream`, `test_subscribe_top_level_event_selector_includes_late_joiners`, `test_subscribe_device_event_selector_is_snapshot`, `test_subscribe.py` | Medium-high. Correlation replies and event subscription semantics are explicitly covered. | + +## Large-Scale Testing Approach + +Routine PR and CI validation should continue to use 200 real NATS-backed +simulated devices. That size exercises registry discovery, selectors, +pagination, truncation, fan-out, and correlation replies while keeping routine +validation fast and reliable. + +10K-device confidence should come from a synthetic registry/load test in a +nightly or release workflow. That test should seed or emulate the registry at +10K devices and measure selector, label, pagination, and operation-resolution +behavior without launching 10K live edge runtimes. Real 10K runtime validation +should wait until server-side selector push-down exists, because the current +implementation still loads the full registry roster before matching. + +## Release Notes + +| Topic | Release Note | +|---|---| +| Event subscriptions | `subscribe("event(...)")` is live by event name and includes late-joining devices that emit those event names. `subscribe("device(...).event(...)")` snapshots the resolved device set at subscription time. | +| Predicate support | `where` predicates require predicate support at the dispatcher and participating edges. Missing edge support logs a startup warning and fails closed for `where` broadcasts. Predicate evaluation is bounded and fail-closed on timeout. | +| Discovery scale | The current implementation is validated with 200 real simulated devices in PR/CI. Server-side selector push-down remains future work for larger production-scale fleets. | +| CLI migration | Local mDNS scanning is `devctl mdns-scan` or `devctl scan`. Selector-based fleet discovery uses `devctl discover`. | +| Extension metadata | Protocol/vendor-specific schema extensions are treated as opaque by Device Connect and should be tested by the owning protocol integration. | +| ESTOP | The `function(estop)` selector pattern is covered for API discovery and fan-out. This is not a safety certification of physical devices, policy enforcement, or fail-safe behavior. | diff --git a/docs/discovery.md b/docs/discovery.md index 31be7eb..dc7b8ea 100644 --- a/docs/discovery.md +++ b/docs/discovery.md @@ -139,7 +139,7 @@ is whether the caller waits for replies and how they arrive. | `invoke(selector, params)` | exactly one (device, function) tuple | sync, single result | | `invoke_many(selector, params, timeout=)` | any number of (device, function) tuples | sync, aggregated | | `broadcast(selector, params, where=, bindings=, fire_at=, on_late=)` | any number of (device, function) tuples | async; correlation-tagged replies stream as events | -| `subscribe(selector)` | events, or `"correlation:"` for broadcast replies | live stream (`Subscription` handle) | +| `subscribe(selector)` | events, or `"correlation:"` for broadcast replies | stream (`Subscription` handle) | | `await_replies(correlation_id, timeout=, until=)` | replies for one broadcast | sync helper that subscribes, collects, returns | `invoke_many` runs every target's call in parallel and returns when each @@ -152,6 +152,15 @@ failures do not abort siblings; the response carries both `results` and subject keyed by that id. Subscribe with `subscribe("correlation:")` or block with `await_replies(correlation_id, timeout=...)`. +`subscribe("event(...)")` is fleet-live by event name: it resolves the +matching event names once, then subscribes with a device wildcard so +late-joining devices that emit those event names are included. In +contrast, `subscribe("device(...).event(...)")` is device-set snapshot: +it resolves the matching devices at subscription time and subscribes to +those concrete device/event subjects. Use top-level `event(...)` for +long-running fleet monitors and the device-anchored form when you want a +predictable fixed target set. + ### Edge-side `where` predicate `broadcast` accepts an optional `where` expression that runs at each @@ -178,11 +187,16 @@ predicate evaluator is an optional install: ``` pip install device-connect-agent-tools[predicate] +pip install device-connect-edge[predicate] ``` Without the extra, calling `broadcast(..., where=...)` returns an -`invalid_predicate` error immediately at the dispatcher; calls without a -`where` work unchanged. +`invalid_predicate` error immediately at the dispatcher. Edges that do +not have the predicate extra log a startup warning and fail closed for +`where` broadcasts, skipping execution rather than running an +unevaluated predicate. Predicate evaluation is also wall-clock bounded +at the edge; timeouts are logged with the broadcast correlation id and +fail closed. Calls without a `where` work unchanged. ### Synchronized fan-out (`fire_at` + `on_late`) @@ -211,8 +225,9 @@ broadcast would produce. The three response shapes below — one for `discover`, two for `discover_labels` — are the source of truth for callers. Fields not -listed are reserved for forward-compatible extensions; do not rely on -field order. +listed are reserved for forward-compatible, protocol-specific, or +vendor-specific extensions. Treat those extension fields as opaque unless +they are documented by Device Connect; do not rely on field order. ### `discover` @@ -479,13 +494,28 @@ with subscribe("device(location:lab-A/*).event(modality:motion)") as sub: handle(event) ``` +Use `subscribe("event(modality:motion)")` instead for a long-running +fleet-wide monitor that should also receive matching events from devices +that join after the subscription starts. + ## CLI The same selector syntax drives the operator CLIs. Every CLI command maps to the matching Python tool call. +### CLI migration note + +This release splits local mDNS scanning from selector discovery. Use +`devctl mdns-scan --timeout 5` or the shorter `devctl scan --timeout 5` +to find uncommissioned devices on the local network. `devctl discover` +now owns selector-driven discovery over the registered fleet. + ``` -# Discovery (devctl) +# Local mDNS scan (devctl) +devctl mdns-scan [--timeout T] +devctl scan [--timeout T] + +# Selector discovery (devctl) devctl discover "" [--offset N] [--limit M] devctl discover-labels [--key K] [--offset N] [--limit M] diff --git a/packages/device-connect-agent-tools/device_connect_agent_tools/tools.py b/packages/device-connect-agent-tools/device_connect_agent_tools/tools.py index 8e337cf..748b0cc 100644 --- a/packages/device-connect-agent-tools/device_connect_agent_tools/tools.py +++ b/packages/device-connect-agent-tools/device_connect_agent_tools/tools.py @@ -1064,12 +1064,61 @@ def _correlation_subjects(conn: Any, correlation_id: str) -> list[str]: ] +def _event_names_for_filter(selector: str) -> tuple[list[str] | None, dict[str, Any] | None]: + """Resolve top-level ``event(...)`` to event names for live wildcard subs.""" + try: + sel = parse_selector(selector) + except SelectorParseError as e: + return None, _empty_envelope(error=_error("selector_parse_error", str(e))) + if sel.scope != Scope.EVENT_ONLY: + return None, _empty_envelope( + scope=sel.scope.value, + error=_error( + "invalid_subscribe_scope", + "top-level live event subscriptions require event(...) scope; " + f"got scope={sel.scope.value!r}", + ), + ) + + rows: list[dict] = [] + offset = 0 + while True: + page = discover(selector, offset=offset, limit=DISCOVER_HARD_LIMIT) + if "error" in page: + return None, page + rows.extend(page["results"]) + if page["next_offset"] is None: + break + offset = page["next_offset"] + + names = sorted({ + row.get("name") for row in rows + if row.get("name") + }) + return names, None + + def _event_subjects_for_selector(selector: str) -> tuple[list[str] | None, dict[str, Any] | None]: """Resolve an event-scoped selector to per-device subjects. Returns ``(subjects, None)`` on success or ``(None, error_envelope)`` if the selector failed to parse or used a non-event scope. """ + try: + sel = parse_selector(selector) + except SelectorParseError as e: + return None, _empty_envelope(error=_error("selector_parse_error", str(e))) + + if sel.scope == Scope.EVENT_ONLY: + names, error = _event_names_for_filter(selector) + if error is not None: + return None, error + conn = get_connection() + return [ + f"device-connect.{conn.zone}.*.event.{name}" + for name in (names or []) + ], None + rows: list[dict] = [] offset = 0 while True: @@ -1113,8 +1162,11 @@ def subscribe(selector: str) -> Subscription: Args: selector: One of: - ``"correlation:"`` for broadcast replies of a prior call. - - An event-scoped selector (``event()`` or - ``device(...).event()``) for live event streams. + - ``event(...)`` for live event streams. Matching event names are + resolved once, then subscribed with a device wildcard so devices + that join later and emit those event names are included. + - ``device(...).event(...)`` for a snapshot event stream over the + devices resolved when the subscription is created. Returns: A :class:`Subscription` handle. Iterate with ``sub.iter(timeout)`` diff --git a/packages/device-connect-agent-tools/tests/test_subscribe.py b/packages/device-connect-agent-tools/tests/test_subscribe.py index a8b4be4..38d2b1e 100644 --- a/packages/device-connect-agent-tools/tests/test_subscribe.py +++ b/packages/device-connect-agent-tools/tests/test_subscribe.py @@ -104,6 +104,13 @@ def test_event_selector_subscribes_per_device(self, fake_conn): assert subj.endswith(".event.object_detected") sub.close() + def test_top_level_event_selector_subscribes_by_event_name_wildcard(self, fake_conn): + sub = tools_mod.subscribe("event(object_detected)") + assert fake_conn.subscribed_subjects == [ + "device-connect.default.*.event.object_detected" + ] + sub.close() + def test_event_selector_zero_matches_returns_idle(self, fake_conn): sub = tools_mod.subscribe("event(no_such_event)") assert fake_conn.subscribed_subjects == [] diff --git a/packages/device-connect-edge/device_connect_edge/device.py b/packages/device-connect-edge/device_connect_edge/device.py index c776d1a..6609efc 100644 --- a/packages/device-connect-edge/device_connect_edge/device.py +++ b/packages/device-connect-edge/device_connect_edge/device.py @@ -62,6 +62,7 @@ async def capture_image(self, resolution: str = "1080p") -> dict: import logging import os import re +import threading import time import uuid from pathlib import Path @@ -1317,7 +1318,9 @@ def _evaluate_where( "status": status_dict, "bindings": bindings or {}, } - return bool(predicate.evaluate(context)) + return self._evaluate_where_with_timeout( + predicate, context, correlation_id, + ) except Exception as e: self._logger.warning( "Broadcast %s: where predicate failed (skipping): %s", @@ -1326,6 +1329,50 @@ def _evaluate_where( return False + def _evaluate_where_with_timeout( + self, + predicate: Any, + context: Dict[str, Any], + correlation_id: str, + timeout_s: float = 0.05, + ) -> bool: + """Evaluate a predicate with a short wall-clock deadline.""" + result: Dict[str, Any] = {} + + def _run() -> None: + try: + result["value"] = bool(predicate.evaluate(context)) + except Exception as e: + result["error"] = e + + thread = threading.Thread(target=_run, daemon=True) + thread.start() + thread.join(timeout_s) + if thread.is_alive(): + self._logger.warning( + "Broadcast %s: where predicate timed out after %.3fs (skipping)", + correlation_id, timeout_s, + ) + return False + if "error" in result: + raise result["error"] + return bool(result.get("value", False)) + + + def _warn_if_predicate_extra_missing(self) -> None: + """Log once at startup when edge-side CEL predicates are unavailable.""" + try: + from device_connect_edge.predicate import compile_where + compile_where("true") + except Exception as e: + self._logger.warning( + "Edge-side where predicates are unavailable on this device: %s. " + "Install the predicate extra on every edge that should evaluate " + "broadcast where clauses: pip install 'device-connect-edge[predicate]'", + e, + ) + + async def _event_dispatch_loop(self) -> None: """Send queued events, retrying on failure.""" @@ -1552,6 +1599,8 @@ async def run(self) -> None: # Set up DeviceDriver capabilities (router, registry, subscriptions) await self._setup_agentic_driver() + self._warn_if_predicate_extra_missing() + # Start device routines (@periodic decorated methods) await self._driver._start_routines() diff --git a/packages/device-connect-edge/tests/test_device_where.py b/packages/device-connect-edge/tests/test_device_where.py new file mode 100644 index 0000000..e79673a --- /dev/null +++ b/packages/device-connect-edge/tests/test_device_where.py @@ -0,0 +1,67 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for edge-side broadcast ``where`` runtime behavior.""" + +import time +from unittest.mock import Mock + +from device_connect_edge import DeviceRuntime + + +class _SlowPredicate: + def evaluate(self, context): + time.sleep(0.2) + return True + + +class _RaisingPredicate: + def evaluate(self, context): + raise RuntimeError("bad predicate") + + +def test_where_eval_timeout_fails_closed_and_warns(): + runtime = DeviceRuntime(device_id="where-timeout-test") + runtime._logger = Mock() + + result = runtime._evaluate_where_with_timeout( + _SlowPredicate(), {}, "corr-timeout", timeout_s=0.01, + ) + + assert result is False + runtime._logger.warning.assert_called_once() + assert "where predicate timed out" in ( + runtime._logger.warning.call_args.args[0] + ) + + +def test_where_eval_error_propagates_to_caller(): + runtime = DeviceRuntime(device_id="where-error-test") + + try: + runtime._evaluate_where_with_timeout( + _RaisingPredicate(), {}, "corr-error", timeout_s=0.1, + ) + except RuntimeError as exc: + assert "bad predicate" in str(exc) + else: + raise AssertionError("expected predicate exception to propagate") + + +def test_missing_predicate_extra_warns_at_startup(monkeypatch): + import device_connect_edge.predicate as predicate_mod + + def _missing_extra(expression): + raise predicate_mod.PredicateCompileError("cel-python missing") + + runtime = DeviceRuntime(device_id="where-missing-extra-test") + runtime._logger = Mock() + monkeypatch.setattr(predicate_mod, "compile_where", _missing_extra) + + runtime._warn_if_predicate_extra_missing() + + runtime._logger.warning.assert_called_once() + assert "Edge-side where predicates are unavailable" in ( + runtime._logger.warning.call_args.args[0] + ) diff --git a/packages/device-connect-server/README.md b/packages/device-connect-server/README.md index 0a4d424..f7b1def 100644 --- a/packages/device-connect-server/README.md +++ b/packages/device-connect-server/README.md @@ -267,7 +267,10 @@ Each tenant gets a distributable credential bundle (zip) in `security_infra/tena devctl list # list registered devices devctl list --compact # compact output devctl register --id myDevice --keepalive # register a test device -devctl discover --timeout 5 # find uncommissioned devices (mDNS) +devctl mdns-scan --timeout 5 # find uncommissioned devices (mDNS) +devctl scan --timeout 5 # alias for mdns-scan +devctl discover "device(category:camera)" # discover registered devices by selector +devctl discover-labels # list selector label keys and values devctl commission cam-001 --pin 1234-5678 # commission a device devctl interactive # REPL for device operations @@ -281,6 +284,14 @@ statectl locks # list held locks statectl stats # key counts by namespace ``` +### CLI migration note + +This release renames the local-network mDNS scan from +`devctl discover --timeout ...` to `devctl mdns-scan --timeout ...`; +`devctl scan --timeout ...` is the short alias. The `discover` verb now belongs +to selector discovery over the registered fleet, for example +`devctl discover "device(category:camera)"`. + ## Device Commissioning Flow New devices must be provisioned and commissioned before joining the mesh. The mechanism depends on your messaging backend: diff --git a/tests/fixtures/devices.py b/tests/fixtures/devices.py index 3fcae8e..85aa0d9 100644 --- a/tests/fixtures/devices.py +++ b/tests/fixtures/devices.py @@ -10,7 +10,7 @@ import asyncio import logging import uuid -from typing import List, Optional, Tuple +from typing import Callable, List, Optional, Tuple from device_connect_edge import DeviceRuntime @@ -106,6 +106,36 @@ async def spawn_sensor( ) return await self._spawn(driver, device_id, **kwargs) + async def spawn_sensor_fleet( + self, + prefix: str, + count: int, + *, + failure_rate: float = 0.0, + location: str = "scale-room", + location_for: Callable[[int], str] | None = None, + initial_temp: float = 22.0, + initial_humidity: float = 45.0, + registration_timeout: float = 20.0, + ) -> list[Tuple[DeviceRuntime, TestSensorDriver]]: + """Spawn many sensors concurrently for scale integration tests.""" + spawned = await asyncio.gather(*[ + self.spawn_sensor( + f"{prefix}-{i:04d}", + failure_rate=failure_rate, + location=location_for(i) if location_for else location, + initial_temp=initial_temp + (i % 10) / 10, + initial_humidity=initial_humidity, + wait_for_registration=False, + ) + for i in range(count) + ]) + await asyncio.gather(*[ + self._wait_for_registration(device, registration_timeout) + for device, _ in spawned + ]) + return list(spawned) + async def _wait_for_registration(self, device: DeviceRuntime, timeout: float) -> None: start = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start < timeout: diff --git a/tests/fixtures/infrastructure.py b/tests/fixtures/infrastructure.py index be8007f..b32aadd 100644 --- a/tests/fixtures/infrastructure.py +++ b/tests/fixtures/infrastructure.py @@ -219,33 +219,46 @@ async def wait_for_all_services( logger.info("All infrastructure services healthy") -async def clear_device_registry(etcd_host: str = "localhost", etcd_port: int = 2379) -> int: +async def clear_device_registry( + etcd_host: str = "localhost", + etcd_port: int = 2379, + tenant: str = "default", +) -> int: """Clear all devices from the registry via etcd HTTP API.""" import base64 import aiohttp base_url = f"http://{etcd_host}:{etcd_port}" - prefix = "/device-connect/devices/" - key_start = base64.b64encode(prefix.encode()).decode() - prefix_end = prefix[:-1] + chr(ord(prefix[-1]) + 1) - key_end = base64.b64encode(prefix_end.encode()).decode() - + prefixes = [ + f"/device-connect/{tenant}/devices/", + # Historical pre-tenant path; clear it too for compatibility with + # older local infrastructure or tests that left stale entries behind. + "/device-connect/devices/", + ] + + cleared = 0 async with aiohttp.ClientSession() as session: - async with session.post( - f"{base_url}/v3/kv/range", - json={"key": key_start, "range_end": key_end, "count_only": True}, - timeout=aiohttp.ClientTimeout(total=10), - ) as resp: - if resp.status != 200: - return 0 - data = await resp.json() - count = int(data.get("count", 0)) - - if count > 0: - await session.post( - f"{base_url}/v3/kv/deleterange", - json={"key": key_start, "range_end": key_end}, + for prefix in prefixes: + key_start = base64.b64encode(prefix.encode()).decode() + prefix_end = prefix[:-1] + chr(ord(prefix[-1]) + 1) + key_end = base64.b64encode(prefix_end.encode()).decode() + + async with session.post( + f"{base_url}/v3/kv/range", + json={"key": key_start, "range_end": key_end, "count_only": True}, timeout=aiohttp.ClientTimeout(total=10), - ) - logger.info(f"Cleared {count} devices from registry") - return count + ) as resp: + if resp.status != 200: + continue + data = await resp.json() + count = int(data.get("count", 0)) + + if count > 0: + await session.post( + f"{base_url}/v3/kv/deleterange", + json={"key": key_start, "range_end": key_end}, + timeout=aiohttp.ClientTimeout(total=10), + ) + cleared += count + logger.info("Cleared %d devices from registry prefix %s", count, prefix) + return cleared diff --git a/tests/tests/test_tools_broadcast.py b/tests/tests/test_tools_broadcast.py index 5d3885b..cf21d0b 100644 --- a/tests/tests/test_tools_broadcast.py +++ b/tests/tests/test_tools_broadcast.py @@ -328,6 +328,91 @@ async def test_subscribe_event_selector_live_stream(device_spawner, messaging_ur await asyncio.to_thread(disconnect) +@pytest.mark.asyncio +@pytest.mark.integration +async def test_subscribe_top_level_event_selector_includes_late_joiners( + device_spawner, messaging_url +): + """Top-level event subscriptions wildcard devices for long-running agents.""" + first_device, first_driver = await device_spawner.spawn_camera( + "itest-evlive-cam-1", location="lab-A", + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, subscribe + + await _wait_for_devices(messaging_url, {"itest-evlive-cam-1"}) + try: + with subscribe("event(object_detected)") as sub: + await asyncio.sleep(SETTLE_TIME) + second_device, second_driver = await device_spawner.spawn_camera( + "itest-evlive-cam-2", location="lab-A", + ) + await _wait_for_devices( + messaging_url, {"itest-evlive-cam-1", "itest-evlive-cam-2"} + ) + + await first_driver.trigger_event( + "object_detected", {"label": "first", "confidence": 0.9} + ) + await second_driver.trigger_event( + "object_detected", {"label": "late", "confidence": 0.91} + ) + msgs = await asyncio.to_thread( + list, sub.iter(timeout=2.0, poll_interval=0.05), + ) + labels = { + (m.get("params") or {}).get("label") or m.get("label") + for m in msgs + } + assert {"first", "late"} <= labels, msgs + finally: + await asyncio.to_thread(disconnect) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_subscribe_device_event_selector_is_snapshot( + device_spawner, messaging_url +): + """Device-anchored event subscriptions keep the initial resolved device set.""" + first_device, first_driver = await device_spawner.spawn_camera( + "itest-evsnap-cam-1", location="lab-A", + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, subscribe + + await _wait_for_devices(messaging_url, {"itest-evsnap-cam-1"}) + try: + with subscribe("device(itest-evsnap-*).event(object_detected)") as sub: + await asyncio.sleep(SETTLE_TIME) + second_device, second_driver = await device_spawner.spawn_camera( + "itest-evsnap-cam-2", location="lab-A", + ) + await _wait_for_devices( + messaging_url, {"itest-evsnap-cam-1", "itest-evsnap-cam-2"} + ) + + await first_driver.trigger_event( + "object_detected", {"label": "first", "confidence": 0.9} + ) + await second_driver.trigger_event( + "object_detected", {"label": "late", "confidence": 0.91} + ) + msgs = await asyncio.to_thread( + list, sub.iter(timeout=2.0, poll_interval=0.05), + ) + labels = { + (m.get("params") or {}).get("label") or m.get("label") + for m in msgs + } + assert "first" in labels, msgs + assert "late" not in labels, msgs + finally: + await asyncio.to_thread(disconnect) + + @pytest.mark.asyncio @pytest.mark.integration async def test_subscribe_correlation_form(device_spawner, messaging_url): @@ -358,7 +443,7 @@ def collect(): await asyncio.to_thread(disconnect) -# -- PR 29 review #1: safety:critical advisory WARN ------------------ +# -- Safety-critical advisory warning regression coverage ------------- @pytest.mark.asyncio diff --git a/tests/tests/test_tools_invoke.py b/tests/tests/test_tools_invoke.py index df9878b..e1c902d 100644 --- a/tests/tests/test_tools_invoke.py +++ b/tests/tests/test_tools_invoke.py @@ -11,7 +11,9 @@ """ import asyncio +import os import time +import uuid import pytest @@ -19,13 +21,13 @@ DISCOVERY_TIMEOUT = 5.0 -async def _wait_for_devices(messaging_url, expected_ids): +async def _wait_for_devices(messaging_url, expected_ids, timeout=DISCOVERY_TIMEOUT): """Connect and poll until all expected ``device_ids`` are visible.""" from device_connect_agent_tools import connect from device_connect_agent_tools.connection import get_connection await asyncio.to_thread(connect, nats_url=messaging_url) - deadline = time.monotonic() + DISCOVERY_TIMEOUT + deadline = time.monotonic() + timeout while True: conn = get_connection() devices = await asyncio.to_thread(conn.list_devices) @@ -65,6 +67,98 @@ async def test_invoke_sensor_reading(device_spawner, messaging_url): await asyncio.to_thread(disconnect) +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.slow +@pytest.mark.timeout(180) +@pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True) +async def test_scalable_fleet_discovery_and_invoke_many( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Discover and invoke hundreds of Docker-backed simulated devices.""" + if messaging_backend != "nats": + pytest.skip("scale test uses registry-backed NATS discovery") + + fleet_size = int(os.getenv("DC_SCALE_FLEET_SIZE", "200")) + prefix = f"itest-scale-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + expected_ids = {f"{prefix}-{i:04d}" for i in range(fleet_size)} + + await device_spawner.spawn_sensor_fleet( + prefix, + fleet_size, + location=location, + initial_temp=21.0, + registration_timeout=30.0, + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import ( + disconnect, + discover, + discover_labels, + invoke_many, + ) + + devices = await _wait_for_devices( + messaging_url, expected_ids, timeout=45.0, + ) + visible_ids = {d.get("device_id") for d in devices} + assert expected_ids <= visible_ids + + try: + labels = await asyncio.to_thread(discover_labels, "device.location") + assert labels["axis"] == "device" + assert labels["key"] == "location" + assert labels["values"][location] == fleet_size + + page_size = 50 if fleet_size >= 100 else max(1, fleet_size // 2) + + first_page = await asyncio.to_thread( + discover, f"device(location:{location})", 0, page_size, + ) + assert first_page["scope"] == "device_only" + assert first_page["matched"] == fleet_size + assert first_page["returned"] == page_size + assert first_page["next_offset"] == page_size + + second_page = await asyncio.to_thread( + discover, + f"device(location:{location})", + first_page["next_offset"], + page_size, + ) + first_ids = {row["device_id"] for row in first_page["results"]} + second_ids = {row["device_id"] for row in second_page["results"]} + assert first_ids + assert second_ids + assert first_ids.isdisjoint(second_ids) + + functions = await asyncio.to_thread( + discover, f"device(location:{location}).function(get_reading)", 0, 25, + ) + assert functions["scope"] == "device_function" + assert functions["matched"] == fleet_size + assert functions["returned"] == min(25, fleet_size) + + result = await asyncio.to_thread( + invoke_many, + f"device(location:{location}).function(get_reading)", + {"unit": "celsius"}, + 10.0, + 64, + "Scale integration test fan-out", + ) + assert result["candidates"] == fleet_size + assert result["matched"] == fleet_size + assert result["succeeded"] == fleet_size + assert result["failed"] == 0 + assert {row["device_id"] for row in result["results"]} == expected_ids + assert all(row["result"]["unit"] == "celsius" for row in result["results"]) + finally: + await asyncio.to_thread(disconnect) + + @pytest.mark.asyncio @pytest.mark.integration async def test_invoke_robot_dispatch(device_spawner, event_capture, messaging_url): diff --git a/tests/tests/test_tools_large_fleet_broadcast.py b/tests/tests/test_tools_large_fleet_broadcast.py new file mode 100644 index 0000000..7bb98fe --- /dev/null +++ b/tests/tests/test_tools_large_fleet_broadcast.py @@ -0,0 +1,297 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Slow NATS-backed large-fleet tests for broadcast reply fan-out.""" + +import asyncio +import os +import time +import uuid + +import pytest + +SETTLE_TIME = 0.3 +DISCOVERY_TIMEOUT = 45.0 +DEFAULT_FLEET_SIZE = 200 + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.integration, + pytest.mark.slow, + pytest.mark.timeout(240), + pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True), +] + + +def _fleet_size() -> int: + return max(1, int(os.getenv("DC_SCALE_FLEET_SIZE", str(DEFAULT_FLEET_SIZE)))) + + +def _reply_timeout(fleet_size: int) -> float: + return max(15.0, min(60.0, fleet_size * 0.2)) + + +def _spread_threshold(fleet_size: int) -> float: + return max(1.0, min(2.5, fleet_size * 0.01)) + + +async def _wait_for_devices(messaging_url, expected_ids, timeout=DISCOVERY_TIMEOUT): + """Connect and poll until all expected ``device_ids`` are visible.""" + from device_connect_agent_tools import connect + from device_connect_agent_tools.connection import get_connection + + await asyncio.to_thread(connect, nats_url=messaging_url) + deadline = time.monotonic() + timeout + while True: + conn = get_connection() + devices = await asyncio.to_thread(conn.list_devices) + ids = {d.get("device_id") for d in devices} + if expected_ids.issubset(ids) or time.monotonic() > deadline: + return devices + await asyncio.sleep(0.25) + + +async def _spawn_sensor_fleet( + device_spawner, + *, + prefix: str, + fleet_size: int, + location: str = "scale-room", + location_for=None, +) -> set[str]: + expected_ids = {f"{prefix}-{i:04d}" for i in range(fleet_size)} + await device_spawner.spawn_sensor_fleet( + prefix, + fleet_size, + location=location, + location_for=location_for, + initial_temp=21.0, + registration_timeout=30.0, + ) + await asyncio.sleep(SETTLE_TIME) + return expected_ids + + +def _collect_subscription_replies(correlation_id: str, expected: int, timeout: float): + from device_connect_agent_tools import subscribe + + deadline = time.monotonic() + timeout + gathered = [] + with subscribe(f"correlation:{correlation_id}") as sub: + while len(gathered) < expected and time.monotonic() < deadline: + gathered.extend(sub.read()) + if len(gathered) >= expected: + break + time.sleep(0.02) + return gathered + + +async def test_broadcast_large_fan_out_returns_correlation_and_target_count( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """broadcast() returns a correlation id and target count at fleet scale.""" + if messaging_backend != "nats": + pytest.skip("large-fleet broadcast tests use registry-backed NATS discovery") + + fleet_size = _fleet_size() + prefix = f"itest-bclarge-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + expected_ids = await _spawn_sensor_fleet( + device_spawner, + prefix=prefix, + fleet_size=fleet_size, + location=location, + ) + + from device_connect_agent_tools import await_replies, broadcast, disconnect + + devices = await _wait_for_devices(messaging_url, expected_ids) + assert expected_ids <= {d.get("device_id") for d in devices} + + try: + result = await asyncio.to_thread( + broadcast, + f"device(location:{location}).function(get_reading)", + {"unit": "celsius"}, + ) + assert result["correlation_id"].startswith("br-") + assert result["candidates"] == fleet_size + assert result["function"] == "get_reading" + + replies = await asyncio.to_thread( + await_replies, + result["correlation_id"], + timeout=_reply_timeout(fleet_size), + until=fleet_size, + poll_interval=0.02, + ) + assert len(replies) == fleet_size + assert {reply["device_id"] for reply in replies} == expected_ids + assert all(reply["success"] is True for reply in replies) + assert all(reply["result"]["unit"] == "celsius" for reply in replies) + finally: + await asyncio.to_thread(disconnect) + + +async def test_broadcast_where_self_election_narrows_large_candidates( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """A broad broadcast can be narrowed by edge-side where self-election.""" + if messaging_backend != "nats": + pytest.skip("large-fleet broadcast tests use registry-backed NATS discovery") + pytest.importorskip("celpy") + + fleet_size = _fleet_size() + prefix = f"itest-bcwhere-{uuid.uuid4().hex[:8]}" + selected_location = f"{prefix}-selected" + other_location = f"{prefix}-other" + + def location_for(index): + return selected_location if index % 4 == 0 else other_location + + expected_ids = await _spawn_sensor_fleet( + device_spawner, + prefix=prefix, + fleet_size=fleet_size, + location_for=location_for, + ) + selected_ids = { + f"{prefix}-{i:04d}" for i in range(fleet_size) if i % 4 == 0 + } + assert selected_ids + + from device_connect_agent_tools import await_replies, broadcast, disconnect + + devices = await _wait_for_devices(messaging_url, expected_ids) + assert expected_ids <= {d.get("device_id") for d in devices} + + try: + result = await asyncio.to_thread( + broadcast, + f"device({prefix}-*).function(get_reading)", + {"unit": "celsius"}, + "labels.location == bindings.target_location", + {"target_location": selected_location}, + ) + assert result["candidates"] == fleet_size + + replies = await asyncio.to_thread( + await_replies, + result["correlation_id"], + timeout=_reply_timeout(fleet_size), + until=len(selected_ids), + poll_interval=0.02, + ) + assert len(replies) == len(selected_ids) + assert {reply["device_id"] for reply in replies} == selected_ids + assert all(reply["success"] is True for reply in replies) + finally: + await asyncio.to_thread(disconnect) + + +async def test_broadcast_fire_at_synchronizes_large_fan_out( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """fire_at keeps large fan-out reply fire times reasonably grouped.""" + if messaging_backend != "nats": + pytest.skip("large-fleet broadcast tests use registry-backed NATS discovery") + + fleet_size = _fleet_size() + prefix = f"itest-bcfire-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + expected_ids = await _spawn_sensor_fleet( + device_spawner, + prefix=prefix, + fleet_size=fleet_size, + location=location, + ) + + from device_connect_agent_tools import await_replies, broadcast, disconnect + + devices = await _wait_for_devices(messaging_url, expected_ids) + assert expected_ids <= {d.get("device_id") for d in devices} + + try: + scheduled = time.time() + 1.5 + result = await asyncio.to_thread( + broadcast, + f"device(location:{location}).function(get_reading)", + {"unit": "celsius"}, + None, + None, + scheduled, + "fire", + ) + assert result["candidates"] == fleet_size + + replies = await asyncio.to_thread( + await_replies, + result["correlation_id"], + timeout=_reply_timeout(fleet_size), + until=fleet_size, + poll_interval=0.02, + ) + assert len(replies) == fleet_size + assert {reply["device_id"] for reply in replies} == expected_ids + + fire_times = [reply["actually_fired_at"] for reply in replies] + assert min(fire_times) >= scheduled - 0.05 + spread = max(fire_times) - min(fire_times) + assert spread < _spread_threshold(fleet_size), ( + f"fire_at spread too wide for {fleet_size} targets: {spread:.3f}s" + ) + finally: + await asyncio.to_thread(disconnect) + + +async def test_subscribe_correlation_drains_large_reply_stream( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """subscribe('correlation:') drains a large broadcast reply stream.""" + if messaging_backend != "nats": + pytest.skip("large-fleet broadcast tests use registry-backed NATS discovery") + + fleet_size = _fleet_size() + prefix = f"itest-bcsub-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + expected_ids = await _spawn_sensor_fleet( + device_spawner, + prefix=prefix, + fleet_size=fleet_size, + location=location, + ) + + from device_connect_agent_tools import broadcast, disconnect + + devices = await _wait_for_devices(messaging_url, expected_ids) + assert expected_ids <= {d.get("device_id") for d in devices} + + try: + scheduled = time.time() + 1.5 + result = await asyncio.to_thread( + broadcast, + f"device(location:{location}).function(get_reading)", + {"unit": "celsius"}, + None, + None, + scheduled, + "fire", + ) + assert result["candidates"] == fleet_size + + replies = await asyncio.to_thread( + _collect_subscription_replies, + result["correlation_id"], + fleet_size, + _reply_timeout(fleet_size), + ) + assert len(replies) == fleet_size + assert {reply["device_id"] for reply in replies} == expected_ids + assert all( + reply["correlation_id"] == result["correlation_id"] + for reply in replies + ) + assert all(reply["success"] is True for reply in replies) + finally: + await asyncio.to_thread(disconnect) diff --git a/tests/tests/test_tools_large_fleet_discovery.py b/tests/tests/test_tools_large_fleet_discovery.py new file mode 100644 index 0000000..cf3a478 --- /dev/null +++ b/tests/tests/test_tools_large_fleet_discovery.py @@ -0,0 +1,360 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Slow NATS-only large-fleet discovery integration tests.""" + +import asyncio +import os +import time +import uuid + +import pytest + +SETTLE_TIME = 0.3 +DISCOVERY_TIMEOUT = 60.0 +DEFAULT_SCALE_FLEET_SIZE = 200 + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.integration, + pytest.mark.slow, + pytest.mark.timeout(240), + pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True), +] + + +def _scale_fleet_size() -> int: + return max(1, int(os.getenv("DC_SCALE_FLEET_SIZE", str(DEFAULT_SCALE_FLEET_SIZE)))) + + +async def _wait_for_devices(messaging_url, expected_ids, timeout=DISCOVERY_TIMEOUT): + """Connect and poll until all expected device ids are visible.""" + from device_connect_agent_tools import connect + from device_connect_agent_tools.connection import get_connection + + await asyncio.to_thread(connect, nats_url=messaging_url) + deadline = time.monotonic() + timeout + while True: + conn = get_connection() + conn.invalidate_cache() + devices = await asyncio.to_thread(conn.list_devices) + ids = {d.get("device_id") for d in devices} + if expected_ids <= ids or time.monotonic() > deadline: + return devices + await asyncio.sleep(0.25) + + +def _assert_device_row_compact(row): + assert "function_count" in row + assert "function_names" in row + assert "functions" not in row + assert "events" not in row + assert "capabilities" not in row + + +def _assert_device_row_expanded(row, expected_function): + assert "function_count" in row + assert "function_names" in row + assert "functions" in row + function = next( + (fn for fn in row["functions"] if fn["name"] == expected_function), + None, + ) + assert function is not None + assert "parameters" in function + + +async def test_large_device_set_summary_does_not_expand_full_schemas( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """A large matched device selector stays compact but keeps counts and labels.""" + fleet_size = max(_scale_fleet_size(), 6) + prefix = f"itest-lfs-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + expected_ids = {f"{prefix}-{i:04d}" for i in range(fleet_size)} + + await device_spawner.spawn_sensor_fleet( + prefix, + fleet_size, + location=location, + registration_timeout=30.0, + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, discover + + await _wait_for_devices(messaging_url, expected_ids) + try: + page_size = min(25, fleet_size - 1) + result = await asyncio.to_thread( + discover, f"device(location:{location})", 0, page_size + ) + + assert result["scope"] == "device_only" + assert result["matched"] == fleet_size + assert result["returned"] == page_size + assert result["next_offset"] == page_size + assert result["label_histogram"]["location"]["values"][location] == fleet_size + assert result["label_histogram"]["category"]["values"]["sensor"] == fleet_size + for row in result["results"]: + _assert_device_row_compact(row) + finally: + await asyncio.to_thread(disconnect) + + +async def test_small_matched_subset_expands_inside_large_global_fleet( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Expansion is based on matched selector cardinality, not global size.""" + total_size = max(_scale_fleet_size(), 8) + sensor_count = total_size - 2 + prefix = f"itest-lfm-{uuid.uuid4().hex[:8]}" + large_location = f"{prefix}-bulk" + small_location = f"{prefix}-inspection" + sensor_ids = {f"{prefix}-{i:04d}" for i in range(sensor_count)} + camera_ids = {f"{prefix}-cam-{i}" for i in range(2)} + + await device_spawner.spawn_sensor_fleet( + prefix, + sensor_count, + location=large_location, + registration_timeout=30.0, + ) + for device_id in sorted(camera_ids): + await device_spawner.spawn_camera(device_id, location=small_location) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, discover + + await _wait_for_devices(messaging_url, sensor_ids | camera_ids) + try: + large = await asyncio.to_thread( + discover, f"device(location:{large_location})", 0, 10 + ) + small = await asyncio.to_thread( + discover, f"device(location:{small_location})", 0, 10 + ) + + assert large["matched"] == sensor_count + assert large["returned"] == 10 + for row in large["results"]: + _assert_device_row_compact(row) + + assert small["matched"] == 2 + assert small["returned"] == 2 + assert {row["device_id"] for row in small["results"]} == camera_ids + for row in small["results"]: + _assert_device_row_expanded(row, "capture_image") + finally: + await asyncio.to_thread(disconnect) + + +async def test_long_tail_label_histogram_reports_more( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """The multi-axis vocabulary truncates long-tail values and reports more.""" + from device_connect_agent_tools import disconnect, discover_labels + from device_connect_agent_tools import tools as tools_mod + + top_n = max(1, min(tools_mod.LABEL_VALUES_TOP_N, 200)) + fleet_size = max(_scale_fleet_size(), top_n + 1) + prefix = f"itest-lfl-{uuid.uuid4().hex[:8]}" + expected_ids = {f"{prefix}-{i:04d}" for i in range(fleet_size)} + + await device_spawner.spawn_sensor_fleet( + prefix, + fleet_size, + location_for=lambda i: f"{prefix}-zone-{i:04d}", + registration_timeout=30.0, + ) + await asyncio.sleep(SETTLE_TIME) + + await _wait_for_devices(messaging_url, expected_ids) + try: + result = await asyncio.to_thread(discover_labels) + location = result["device_keys"]["location"] + + assert result["total_devices"] >= fleet_size + assert len(location["values"]) == top_n + assert location["more"] >= 1 + finally: + await asyncio.to_thread(disconnect) + + +async def test_per_key_label_drill_down_bypasses_truncation( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """The per-key vocabulary form paginates every value and omits more.""" + from device_connect_agent_tools import disconnect, discover_labels + from device_connect_agent_tools import tools as tools_mod + + top_n = max(1, min(tools_mod.LABEL_VALUES_TOP_N, 200)) + fleet_size = max(_scale_fleet_size(), top_n + 1) + prefix = f"itest-lfp-{uuid.uuid4().hex[:8]}" + expected_ids = {f"{prefix}-{i:04d}" for i in range(fleet_size)} + expected_locations = {f"{prefix}-zone-{i:04d}" for i in range(fleet_size)} + + await device_spawner.spawn_sensor_fleet( + prefix, + fleet_size, + location_for=lambda i: f"{prefix}-zone-{i:04d}", + registration_timeout=30.0, + ) + await asyncio.sleep(SETTLE_TIME) + + await _wait_for_devices(messaging_url, expected_ids) + try: + seen = {} + offset = 0 + limit = 7 + while True: + page = await asyncio.to_thread( + discover_labels, "device.location", offset, limit + ) + assert page["axis"] == "device" + assert page["key"] == "location" + assert page["matched"] >= fleet_size + assert page["axis_total"] >= fleet_size + assert "more" not in page + seen.update(page["values"]) + if page["next_offset"] is None: + break + offset = page["next_offset"] + + assert expected_locations <= set(seen) + assert all(seen[location] == 1 for location in expected_locations) + finally: + await asyncio.to_thread(disconnect) + + +async def test_device_label_or_and_filtering_over_large_fleet( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Device label OR within a key and AND across keys scale together.""" + total_size = max(_scale_fleet_size(), 12) + sensor_count = total_size - 5 + prefix = f"itest-lfo-{uuid.uuid4().hex[:8]}" + loc_a = f"{prefix}-alpha" + loc_b = f"{prefix}-bravo" + sensor_ids = {f"{prefix}-{i:04d}" for i in range(sensor_count)} + sensor_a_ids = {f"{prefix}-{i:04d}" for i in range(0, sensor_count, 2)} + camera_ids = {f"{prefix}-cam-{i}" for i in range(3)} + robot_ids = {f"{prefix}-robot-{i}" for i in range(2)} + + await device_spawner.spawn_sensor_fleet( + prefix, + sensor_count, + location_for=lambda i: loc_a if i % 2 == 0 else loc_b, + registration_timeout=30.0, + ) + await device_spawner.spawn_camera(f"{prefix}-cam-0", location=loc_a) + await device_spawner.spawn_camera(f"{prefix}-cam-1", location=loc_a) + await device_spawner.spawn_camera(f"{prefix}-cam-2", location=loc_b) + await device_spawner.spawn_robot(f"{prefix}-robot-0", location=loc_b) + await device_spawner.spawn_robot(f"{prefix}-robot-1", location=loc_b) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, discover + + await _wait_for_devices(messaging_url, sensor_ids | camera_ids | robot_ids) + try: + category_or = await asyncio.to_thread( + discover, + f"device(category:[sensor,camera],location:[{loc_a},{loc_b}])", + 0, + total_size, + ) + category_or_ids = {row["device_id"] for row in category_or["results"]} + assert category_or["matched"] == sensor_count + len(camera_ids) + assert category_or_ids == sensor_ids | camera_ids + assert category_or_ids.isdisjoint(robot_ids) + + sensor_at_loc_a = await asyncio.to_thread( + discover, f"device(category:sensor,location:{loc_a})", 0, sensor_count + ) + assert sensor_at_loc_a["matched"] == len(sensor_a_ids) + assert {row["device_id"] for row in sensor_at_loc_a["results"]} == sensor_a_ids + + mobile_at_loc_b = await asyncio.to_thread( + discover, f"device(category:[camera,robot],location:{loc_b})", 0, 10 + ) + assert mobile_at_loc_b["matched"] == 3 + assert {row["device_id"] for row in mobile_at_loc_b["results"]} == { + f"{prefix}-cam-2", + f"{prefix}-robot-0", + f"{prefix}-robot-1", + } + finally: + await asyncio.to_thread(disconnect) + + +async def test_function_label_selection_over_heterogeneous_fleet( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Function-label selectors return the expected heterogeneous matrix.""" + total_size = max(_scale_fleet_size(), 12) + sensor_count = total_size - 4 + prefix = f"itest-lff-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-floor" + sensor_ids = {f"{prefix}-{i:04d}" for i in range(sensor_count)} + camera_ids = {f"{prefix}-cam-{i}" for i in range(2)} + robot_ids = {f"{prefix}-robot-{i}" for i in range(2)} + + await device_spawner.spawn_sensor_fleet( + prefix, + sensor_count, + location=location, + registration_timeout=30.0, + ) + for device_id in sorted(camera_ids): + await device_spawner.spawn_camera(device_id, location=location) + for device_id in sorted(robot_ids): + await device_spawner.spawn_robot(device_id, location=location) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, discover + + await _wait_for_devices(messaging_url, sensor_ids | camera_ids | robot_ids) + try: + read_selector = f"device(location:{location}).function(direction:read)" + read_functions = await asyncio.to_thread( + discover, read_selector, 0, total_size + 10 + ) + read_pairs = { + (row["device_id"], row["name"]) for row in read_functions["results"] + } + assert read_functions["matched"] == sensor_count + len(robot_ids) + assert read_pairs == { + *((device_id, "get_reading") for device_id in sensor_ids), + *((device_id, "get_status") for device_id in robot_ids), + } + assert not any( + row["device_id"] in camera_ids for row in read_functions["results"] + ) + + camera_writes = await asyncio.to_thread( + discover, + f"device(category:camera,location:{location}).function(direction:write)", + 0, + 10, + ) + assert camera_writes["matched"] == len(camera_ids) + assert { + (row["device_id"], row["name"]) for row in camera_writes["results"] + } == {(device_id, "capture_image") for device_id in camera_ids} + + critical_sensor_selector = ( + f"device(category:sensor,location:{location})" + ".function(direction:write,safety:critical)" + ) + critical_sensor_writes = await asyncio.to_thread( + discover, critical_sensor_selector, 0, sensor_count + 1 + ) + assert critical_sensor_writes["matched"] == sensor_count + assert { + (row["device_id"], row["name"]) + for row in critical_sensor_writes["results"] + } == {(device_id, "set_threshold") for device_id in sensor_ids} + finally: + await asyncio.to_thread(disconnect) diff --git a/tests/tests/test_tools_large_fleet_heterogeneous.py b/tests/tests/test_tools_large_fleet_heterogeneous.py new file mode 100644 index 0000000..9307a37 --- /dev/null +++ b/tests/tests/test_tools_large_fleet_heterogeneous.py @@ -0,0 +1,310 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Large-fleet integration tests for heterogeneous discovery and operations.""" + +import asyncio +import os +import time +import uuid + +import pytest + +SETTLE_TIME = 0.3 +DISCOVERY_TIMEOUT = 60.0 +DEFAULT_SCALE_FLEET_SIZE = 200 +CAMERA_COUNT = 3 +ROBOT_COUNT = 2 + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.integration, + pytest.mark.slow, + pytest.mark.timeout(300), + pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True), +] + + +def _scale_fleet_size() -> int: + return max(12, int(os.getenv("DC_SCALE_FLEET_SIZE", str(DEFAULT_SCALE_FLEET_SIZE)))) + + +def _reply_timeout(fleet_size: int) -> float: + return max(15.0, min(60.0, fleet_size * 0.2)) + + +async def _wait_for_devices(messaging_url, expected_ids, timeout=DISCOVERY_TIMEOUT): + """Connect and poll until all expected device ids are visible.""" + from device_connect_agent_tools import connect + from device_connect_agent_tools.connection import get_connection + + await asyncio.to_thread(connect, nats_url=messaging_url) + deadline = time.monotonic() + timeout + while True: + conn = get_connection() + conn.invalidate_cache() + devices = await asyncio.to_thread(conn.list_devices) + ids = {d.get("device_id") for d in devices} + if expected_ids <= ids or time.monotonic() > deadline: + return devices + await asyncio.sleep(0.25) + + +def _assert_compact_function_rows(rows): + assert rows + for row in rows: + assert set(row) <= {"device_id", "name", "labels"} + assert "parameters" not in row + assert "description" not in row + + +def _assert_expanded_function_rows(rows): + assert rows + for row in rows: + assert "device_id" in row + assert "name" in row + assert "parameters" in row + assert "description" in row + + +async def _spawn_heterogeneous_fleet(device_spawner, prefix: str): + fleet_size = _scale_fleet_size() + sensor_count = fleet_size - CAMERA_COUNT - ROBOT_COUNT + location = f"{prefix}-mixed-room" + + sensor_ids = {f"{prefix}-sensor-{i:04d}" for i in range(sensor_count)} + camera_ids = {f"{prefix}-cam-{i:04d}" for i in range(CAMERA_COUNT)} + robot_ids = {f"{prefix}-robot-{i:04d}" for i in range(ROBOT_COUNT)} + + await device_spawner.spawn_sensor_fleet( + f"{prefix}-sensor", + sensor_count, + location=location, + initial_temp=21.0, + registration_timeout=45.0, + ) + await asyncio.gather(*[ + device_spawner.spawn_camera(device_id, location=location) + for device_id in sorted(camera_ids) + ]) + await asyncio.gather(*[ + device_spawner.spawn_robot(device_id, location=location) + for device_id in sorted(robot_ids) + ]) + await asyncio.sleep(SETTLE_TIME) + + return { + "fleet_size": fleet_size, + "sensor_count": sensor_count, + "camera_count": CAMERA_COUNT, + "robot_count": ROBOT_COUNT, + "location": location, + "sensor_ids": sensor_ids, + "camera_ids": camera_ids, + "robot_ids": robot_ids, + "all_ids": sensor_ids | camera_ids | robot_ids, + } + + +async def test_heterogeneous_discovery_outputs_expected_matrix( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Mixed device/function discovery returns the expected output contract.""" + if messaging_backend != "nats": + pytest.skip("large heterogeneous discovery uses registry-backed NATS discovery") + + from device_connect_agent_tools import disconnect, discover, discover_labels + from device_connect_agent_tools.tools import DC_FUNCTION_THRESHOLD + + prefix = f"itest-lfh-disc-{uuid.uuid4().hex[:8]}" + fleet = await _spawn_heterogeneous_fleet(device_spawner, prefix) + await _wait_for_devices(messaging_url, fleet["all_ids"]) + + try: + labels = await asyncio.to_thread(discover_labels) + device_categories = labels["device_keys"]["category"]["values"] + function_direction = labels["function_keys"]["direction"]["values"] + function_modality = labels["function_keys"]["modality"]["values"] + function_safety = labels["function_keys"]["safety"]["values"] + + assert device_categories["sensor"] == fleet["sensor_count"] + assert device_categories["camera"] == fleet["camera_count"] + assert device_categories["robot"] == fleet["robot_count"] + assert labels["device_keys"]["location"]["values"][fleet["location"]] == ( + fleet["fleet_size"] + ) + assert function_direction["read"] == fleet["sensor_count"] + fleet["robot_count"] + assert function_direction["write"] == ( + fleet["sensor_count"] * 2 + fleet["camera_count"] + fleet["robot_count"] + ) + assert function_modality["thermal"] == fleet["sensor_count"] + assert function_modality["rgb"] == fleet["camera_count"] + assert function_safety["critical"] == ( + fleet["sensor_count"] + fleet["robot_count"] + ) + + cases = [ + { + "name": "read functions exclude cameras", + "selector": f"device(location:{fleet['location']}).function(direction:read)", + "expected_pairs": { + *((device_id, "get_reading") for device_id in fleet["sensor_ids"]), + *((device_id, "get_status") for device_id in fleet["robot_ids"]), + }, + "histogram": { + ("direction", "read"): fleet["sensor_count"] + fleet["robot_count"], + ("modality", "thermal"): fleet["sensor_count"], + }, + }, + { + "name": "camera writes stay isolated", + "selector": ( + f"device(category:camera,location:{fleet['location']})" + ".function(direction:write)" + ), + "expected_pairs": { + (device_id, "capture_image") for device_id in fleet["camera_ids"] + }, + "histogram": { + ("direction", "write"): fleet["camera_count"], + ("modality", "rgb"): fleet["camera_count"], + }, + }, + { + "name": "critical sensor writes exclude robot dispatch", + "selector": ( + f"device(category:sensor,location:{fleet['location']})" + ".function(direction:write,safety:critical)" + ), + "expected_pairs": { + (device_id, "set_threshold") for device_id in fleet["sensor_ids"] + }, + "histogram": { + ("direction", "write"): fleet["sensor_count"], + ("safety", "critical"): fleet["sensor_count"], + }, + }, + ] + + for case in cases: + result = await asyncio.to_thread( + discover, + case["selector"], + 0, + fleet["fleet_size"] * 3, + ) + pairs = {(row["device_id"], row["name"]) for row in result["results"]} + assert result["scope"] == "device_function", case["name"] + assert result["matched"] == len(case["expected_pairs"]), case["name"] + assert result["returned"] == len(case["expected_pairs"]), case["name"] + assert pairs == case["expected_pairs"], case["name"] + + for (key, value), count in case["histogram"].items(): + assert result["label_histogram"][key]["values"][value] == count + + if result["matched"] <= DC_FUNCTION_THRESHOLD: + _assert_expanded_function_rows(result["results"]) + else: + _assert_compact_function_rows(result["results"]) + + broad = await asyncio.to_thread( + discover, + f"device(location:{fleet['location']}).function(*)", + 0, + 40, + ) + assert broad["matched"] == ( + fleet["sensor_count"] * 3 + fleet["camera_count"] + fleet["robot_count"] * 2 + ) + assert broad["returned"] == 40 + assert broad["next_offset"] == 40 + _assert_compact_function_rows(broad["results"]) + assert broad["label_histogram"]["direction"]["values"]["read"] == ( + fleet["sensor_count"] + fleet["robot_count"] + ) + assert broad["label_histogram"]["direction"]["values"]["write"] == ( + fleet["sensor_count"] * 2 + fleet["camera_count"] + fleet["robot_count"] + ) + assert broad["label_histogram"]["modality"]["values"]["thermal"] == ( + fleet["sensor_count"] + ) + assert broad["label_histogram"]["modality"]["values"]["rgb"] == ( + fleet["camera_count"] + ) + assert broad["label_histogram"]["safety"]["values"]["critical"] == ( + fleet["sensor_count"] + fleet["robot_count"] + ) + finally: + await asyncio.to_thread(disconnect) + + +async def test_heterogeneous_invoke_many_targets_only_matching_functions( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """invoke_many ignores unrelated devices in a mixed fleet.""" + if messaging_backend != "nats": + pytest.skip("large heterogeneous invoke_many uses registry-backed NATS discovery") + + from device_connect_agent_tools import disconnect, invoke_many + + prefix = f"itest-lfh-inv-{uuid.uuid4().hex[:8]}" + fleet = await _spawn_heterogeneous_fleet(device_spawner, prefix) + await _wait_for_devices(messaging_url, fleet["all_ids"]) + + try: + result = await asyncio.to_thread( + invoke_many, + f"device(location:{fleet['location']}).function(get_reading)", + {"unit": "celsius"}, + 10.0, + 64, + "heterogeneous invoke_many selector isolation test", + ) + assert result["candidates"] == fleet["sensor_count"] + assert result["matched"] == fleet["sensor_count"] + assert result["succeeded"] == fleet["sensor_count"] + assert result["failed"] == 0 + assert {row["device_id"] for row in result["results"]} == fleet["sensor_ids"] + assert all(row["function"] == "get_reading" for row in result["results"]) + assert all(row["result"]["unit"] == "celsius" for row in result["results"]) + finally: + await asyncio.to_thread(disconnect) + + +async def test_heterogeneous_broadcast_replies_only_from_matching_functions( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """broadcast resolves a mixed fleet to the matching function subset.""" + if messaging_backend != "nats": + pytest.skip("large heterogeneous broadcast uses registry-backed NATS discovery") + + from device_connect_agent_tools import await_replies, broadcast, disconnect + + prefix = f"itest-lfh-bc-{uuid.uuid4().hex[:8]}" + fleet = await _spawn_heterogeneous_fleet(device_spawner, prefix) + await _wait_for_devices(messaging_url, fleet["all_ids"]) + + try: + result = await asyncio.to_thread( + broadcast, + f"device(location:{fleet['location']}).function(get_reading)", + {"unit": "celsius"}, + ) + assert result["correlation_id"].startswith("br-") + assert result["candidates"] == fleet["sensor_count"] + assert result["function"] == "get_reading" + + replies = await asyncio.to_thread( + await_replies, + result["correlation_id"], + timeout=_reply_timeout(fleet["sensor_count"]), + until=fleet["sensor_count"], + poll_interval=0.02, + ) + assert len(replies) == fleet["sensor_count"] + assert {reply["device_id"] for reply in replies} == fleet["sensor_ids"] + assert all(reply["success"] is True for reply in replies) + assert all(reply["result"]["unit"] == "celsius" for reply in replies) + finally: + await asyncio.to_thread(disconnect) diff --git a/tests/tests/test_tools_large_fleet_invoke.py b/tests/tests/test_tools_large_fleet_invoke.py new file mode 100644 index 0000000..236b9ed --- /dev/null +++ b/tests/tests/test_tools_large_fleet_invoke.py @@ -0,0 +1,377 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Slow NATS-only large-fleet selector/invoke integration tests.""" + +import asyncio +import os +import time +import uuid + +import pytest +from device_connect_edge.drivers import DeviceDriver, rpc +from device_connect_edge.types import DeviceIdentity, DeviceStatus + +SETTLE_TIME = 0.3 +DEFAULT_SCALE_FLEET_SIZE = 200 +DISCOVERY_TIMEOUT = 60.0 + + +def _scale_fleet_size(*, minimum: int = 1) -> int: + return max(minimum, int(os.getenv("DC_SCALE_FLEET_SIZE", str(DEFAULT_SCALE_FLEET_SIZE)))) + + +async def _wait_for_devices(messaging_url, expected_ids, timeout=DISCOVERY_TIMEOUT): + """Connect and poll until all expected ``device_ids`` are visible.""" + from device_connect_agent_tools import connect + from device_connect_agent_tools.connection import get_connection + + await asyncio.to_thread(connect, nats_url=messaging_url) + deadline = time.monotonic() + timeout + while True: + conn = get_connection() + devices = await asyncio.to_thread(conn.list_devices) + ids = {d.get("device_id") for d in devices} + if expected_ids.issubset(ids) or time.monotonic() > deadline: + return devices + await asyncio.sleep(0.25) + + +def _assert_compact_function_rows(rows): + assert rows + for row in rows: + assert set(row) <= {"device_id", "name", "labels"} + assert "parameters" not in row + assert "description" not in row + + +def _assert_expanded_function_rows(rows): + assert rows + for row in rows: + assert "device_id" in row + assert "name" in row + assert "parameters" in row + assert "description" in row + + +class EmergencyStopDriver(DeviceDriver): + """Dedicated test driver for the documented ``function(estop)`` pattern.""" + + device_type = "test_estop_device" + labels = {"category": "estop_target"} + + def __init__(self, location: str): + super().__init__() + self._location = location + self.stopped = False + + @property + def identity(self) -> DeviceIdentity: + return DeviceIdentity( + device_type="estop_target", + manufacturer="TestCorp", + model="TestStop-1000", + firmware_version="1.0.0-test", + arch="x86_64", + ) + + @property + def status(self) -> DeviceStatus: + return DeviceStatus(location=self._location, availability="available") + + @rpc(labels={"direction": "write", "safety": "critical"}) + async def estop(self, reason: str = "operator_request") -> dict: + """Emergency-stop alias used by fleet-wide safety selectors.""" + self.stopped = True + return { + "status": "stopped", + "reason": reason, + "device_id": getattr(self, "_device_id", "unknown"), + } + + async def connect(self) -> None: + pass + + async def disconnect(self) -> None: + pass + + +async def _spawn_estop_fleet( + device_spawner, + prefix: str, + count: int, + *, + location: str, + registration_timeout: float = 45.0, +): + spawned = await asyncio.gather(*[ + device_spawner._spawn( + EmergencyStopDriver(location), + f"{prefix}-{i:04d}", + wait_for_registration=False, + ) + for i in range(count) + ]) + await asyncio.gather(*[ + device_spawner._wait_for_registration(device, registration_timeout) + for device, _ in spawned + ]) + return spawned + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.slow +@pytest.mark.timeout(240) +@pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True) +async def test_large_function_set_stays_compact_and_supports_drill_down( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Function discovery compacts broad results but expands narrow drill-downs.""" + if messaging_backend != "nats": + pytest.skip("large-fleet selector test uses registry-backed NATS discovery") + + from device_connect_agent_tools import disconnect, discover + from device_connect_agent_tools.tools import DC_FUNCTION_THRESHOLD + + total_size = _scale_fleet_size(minimum=(DC_FUNCTION_THRESHOLD // 3) + 3) + sensor_count = total_size - 2 + prefix = f"itest-lfi-fn-{uuid.uuid4().hex[:8]}" + sensor_location = f"{prefix}-sensor-room" + camera_location = f"{prefix}-camera-room" + sensor_ids = {f"{prefix}-sensor-{i:04d}" for i in range(sensor_count)} + camera_ids = {f"{prefix}-cam-{i:04d}" for i in range(2)} + + await device_spawner.spawn_sensor_fleet( + f"{prefix}-sensor", + sensor_count, + location=sensor_location, + initial_temp=20.0, + registration_timeout=45.0, + ) + await asyncio.gather(*[ + device_spawner.spawn_camera(device_id, location=camera_location) + for device_id in camera_ids + ]) + await asyncio.sleep(SETTLE_TIME) + + await _wait_for_devices(messaging_url, sensor_ids | camera_ids) + try: + broad = await asyncio.to_thread( + discover, f"device(location:{sensor_location}).function(*)", 0, 50, + ) + assert broad["scope"] == "device_function" + assert broad["matched"] == sensor_count * 3 + expected_returned = min(50, sensor_count * 3) + assert broad["returned"] == expected_returned + if sensor_count * 3 > 50: + assert broad["next_offset"] == 50 + _assert_compact_function_rows(broad["results"]) + assert broad["label_histogram"]["direction"]["values"]["read"] == sensor_count + assert broad["label_histogram"]["direction"]["values"]["write"] == sensor_count * 2 + + sensor_drill_down = await asyncio.to_thread( + discover, f"device({prefix}-sensor-0000).function(set_location)", + ) + assert sensor_drill_down["matched"] == 1 + _assert_expanded_function_rows(sensor_drill_down["results"]) + assert sensor_drill_down["results"][0]["name"] == "set_location" + assert "location" in sensor_drill_down["results"][0]["parameters"]["properties"] + + camera_drill_down = await asyncio.to_thread( + discover, f"device(location:{camera_location}).function(capture_image)", + ) + assert camera_drill_down["matched"] == 2 + _assert_expanded_function_rows(camera_drill_down["results"]) + assert {row["name"] for row in camera_drill_down["results"]} == {"capture_image"} + finally: + await asyncio.to_thread(disconnect) + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.slow +@pytest.mark.timeout(300) +@pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True) +async def test_invoke_many_estop_alias_targets_only_estop_functions_at_scale( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """``function(estop)`` fans out only to devices exposing the ESTOP alias.""" + if messaging_backend != "nats": + pytest.skip("large-fleet ESTOP test uses registry-backed NATS discovery") + + from device_connect_agent_tools.tools import DC_FUNCTION_THRESHOLD + + estop_count = _scale_fleet_size(minimum=DC_FUNCTION_THRESHOLD + 1) + decoy_count = 3 + prefix = f"itest-lfi-estop-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + estop_prefix = f"{prefix}-target" + decoy_prefix = f"{prefix}-sensor" + estop_ids = {f"{estop_prefix}-{i:04d}" for i in range(estop_count)} + decoy_ids = {f"{decoy_prefix}-{i:04d}" for i in range(decoy_count)} + + await _spawn_estop_fleet( + device_spawner, + estop_prefix, + estop_count, + location=location, + ) + await device_spawner.spawn_sensor_fleet( + decoy_prefix, + decoy_count, + location=location, + registration_timeout=45.0, + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, discover, invoke_many + + await _wait_for_devices(messaging_url, estop_ids | decoy_ids) + try: + discovered = await asyncio.to_thread(discover, "function(estop)", 0, 50) + assert discovered["scope"] == "function_only" + assert discovered["matched"] == estop_count + assert discovered["returned"] == min(50, estop_count) + if estop_count > 50: + assert discovered["next_offset"] == 50 + assert discovered["label_histogram"]["direction"]["values"]["write"] == ( + estop_count + ) + assert discovered["label_histogram"]["safety"]["values"]["critical"] == ( + estop_count + ) + assert not any(row["device_id"] in decoy_ids for row in discovered["results"]) + _assert_compact_function_rows(discovered["results"]) + + result = await asyncio.to_thread( + invoke_many, + "function(estop)", + {"reason": "release-qa"}, + 10.0, + 64, + "Large-fleet ESTOP alias release QA test", + ) + assert result["candidates"] == estop_count + assert result["matched"] == estop_count + assert result["succeeded"] == estop_count + assert result["failed"] == 0 + assert {row["device_id"] for row in result["results"]} == estop_ids + assert {row["function"] for row in result["results"]} == {"estop"} + assert all(row["result"]["status"] == "stopped" for row in result["results"]) + assert all(row["result"]["reason"] == "release-qa" for row in result["results"]) + finally: + await asyncio.to_thread(disconnect) + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.slow +@pytest.mark.timeout(240) +@pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True) +async def test_invoke_ambiguity_stays_bounded_in_large_fleet( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """A large ambiguous invoke reports a capped candidate preview.""" + if messaging_backend != "nats": + pytest.skip("large-fleet invoke test uses registry-backed NATS discovery") + + fleet_size = _scale_fleet_size(minimum=11) + prefix = f"itest-lfi-amb-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + expected_ids = {f"{prefix}-{i:04d}" for i in range(fleet_size)} + + await device_spawner.spawn_sensor_fleet( + prefix, + fleet_size, + location=location, + initial_temp=22.0, + registration_timeout=45.0, + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, invoke + + await _wait_for_devices(messaging_url, expected_ids) + try: + result = await asyncio.to_thread( + invoke, + f"device(location:{location}).function(get_reading)", + {"unit": "celsius"}, + "Large-fleet ambiguity bound test", + ) + assert result["success"] is False + assert result["error"]["code"] == "ambiguous_match" + assert f"matched {fleet_size} functions" in result["error"]["message"] + assert len(result["candidates"]) == 10 + assert all(set(row) == {"device_id", "function"} for row in result["candidates"]) + assert {row["function"] for row in result["candidates"]} == {"get_reading"} + finally: + await asyncio.to_thread(disconnect) + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.slow +@pytest.mark.timeout(300) +@pytest.mark.parametrize("messaging_backend", ["nats"], indirect=True) +async def test_invoke_many_partial_failure_accounting_at_scale( + messaging_backend, messaging_url, clear_registry, device_spawner +): + """Large fan-out keeps deterministic failures separate from successes.""" + if messaging_backend != "nats": + pytest.skip("large-fleet invoke_many test uses registry-backed NATS discovery") + + fleet_size = _scale_fleet_size(minimum=12) + failing_count = max(2, min(10, fleet_size // 10)) + healthy_count = fleet_size - failing_count + prefix = f"itest-lfi-pf-{uuid.uuid4().hex[:8]}" + location = f"{prefix}-room" + healthy_prefix = f"{prefix}-ok" + failing_prefix = f"{prefix}-fail" + healthy_ids = {f"{healthy_prefix}-{i:04d}" for i in range(healthy_count)} + failing_ids = {f"{failing_prefix}-{i:04d}" for i in range(failing_count)} + + await device_spawner.spawn_sensor_fleet( + healthy_prefix, + healthy_count, + location=location, + initial_temp=21.0, + registration_timeout=45.0, + ) + await device_spawner.spawn_sensor_fleet( + failing_prefix, + failing_count, + failure_rate=1.0, + location=location, + initial_temp=21.0, + registration_timeout=45.0, + ) + await asyncio.sleep(SETTLE_TIME) + + from device_connect_agent_tools import disconnect, invoke_many + + await _wait_for_devices(messaging_url, healthy_ids | failing_ids) + try: + result = await asyncio.to_thread( + invoke_many, + f"device(location:{location}).function(get_reading)", + {"unit": "celsius"}, + 10.0, + 64, + "Large-fleet partial failure accounting test", + ) + assert result["candidates"] == fleet_size + assert result["matched"] == fleet_size + assert result["succeeded"] == healthy_count + assert result["failed"] == failing_count + assert result["succeeded"] + result["failed"] == result["candidates"] + assert {row["device_id"] for row in result["results"]} == healthy_ids + assert {row["device_id"] for row in result["errors"]} == failing_ids + for row in result["errors"]: + assert row["function"] == "get_reading" + assert row["error"]["code"] + assert row["error"]["message"] + finally: + await asyncio.to_thread(disconnect) diff --git a/tests/tests/test_tools_selector.py b/tests/tests/test_tools_selector.py index 81f8a36..dcc79d1 100644 --- a/tests/tests/test_tools_selector.py +++ b/tests/tests/test_tools_selector.py @@ -570,12 +570,11 @@ async def test_discover_labels_per_key_pagination(device_spawner, messaging_url) await asyncio.to_thread(disconnect) -# -- PR 28 review follow-ups: end-to-end regression guards ------------ +# -- Selector regression guards --------------------------------------- # -# These tests cover behavior that was added or pinned in PR 28's review -# round: bracket character-class globs on the name axis, long-tail -# truncation in the multi-axis vocabulary form, case-sensitive selector -# matching, and the documented response shape contract. +# These tests cover bracket character-class globs on the name axis, +# long-tail truncation in the multi-axis vocabulary form, case-sensitive +# selector matching, and the documented response shape contract. @pytest.mark.asyncio @@ -585,8 +584,8 @@ async def test_discover_bracket_glob_name_match(device_spawner, messaging_url): axis routes through the matcher's fnmatch path and resolves against real device functions. - Regression guard: before PR 28's heuristic fix, patterns containing only - ``[``/``]`` (no ``*``/``?``) were treated as literal strings. + Regression guard: patterns containing only ``[``/``]`` (no ``*``/``?``) + should still route through glob matching instead of literal matching. """ await device_spawner.spawn_sensor("itest-sel-bgg-sensor", location="lab-A") await device_spawner.spawn_robot("itest-sel-bgg-robot", location="lab-A") From c90fd940e13ee8ee319db24327f3ddf60d7fc420 Mon Sep 17 00:00:00 2001 From: Sourav Pati Date: Tue, 26 May 2026 08:09:26 -0700 Subject: [PATCH 2/2] test: clear registry after isolated integration tests Large-fleet tests intentionally seed hundreds of registry rows. Clearing only before those tests let stale rows remain until TTL expiry and polluted later broad selector discovery tests in CI. --- tests/conftest.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 19f9612..0ea2f6f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -195,7 +195,11 @@ async def mock_orchestrator(infrastructure, messaging_backend, messaging_url): @pytest_asyncio.fixture async def clear_registry(infrastructure): - """Clear all devices from registry before test.""" + """Clear all devices from registry before and after a registry-isolated test.""" count = await clear_device_registry() logger.info(f"Registry cleared: {count} devices removed") - yield count + try: + yield count + finally: + count = await clear_device_registry() + logger.info(f"Registry cleared after test: {count} devices removed")