Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions docs/discovery-release-test-summary.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Discovery and Operations Release Test Summary

Date: 2026-05-21

This document summarizes the test coverage behind the discovery and operations
release. It is written as source material for release notes and public release
documentation.

## Summary

The release has strong automated coverage for selector-based discovery, label
vocabulary, pagination, compact and expanded result shapes, synchronous
invocation, async broadcast, subscriptions, and large simulated fleets. The
largest real integration runs use 200 NATS-backed simulated devices by default.

## Evidence

| Evidence | Scope | Result |
|---|---|---|
| Agent-tools unit tests | `discover`, `discover_labels`, `invoke`, `invoke_many`, `broadcast`, `subscribe`, `await_replies`, error envelopes, truncation, safety warnings | `94 passed in 0.42s` |
| Edge predicate unit tests | Edge-side `where` dependency warning, bounded predicate evaluation, CEL compile/eval behavior | `19 passed in 0.23s` |
| Broadcast/subscription integration tests | Broadcast replies, `where`, bindings, `fire_at`, `on_late`, safety warning, live and snapshot event subscriptions | `12 passed, 12 skipped in 49.53s` on NATS run |
| Large-fleet integration suite | 200-device discovery, pagination, truncation, heterogeneous fleets, ESTOP alias, `invoke_many`, broadcast, `where`, `fire_at`, correlation subscribe | `18 passed in 37.05s` |
| Integration inventory | Hierarchical tools, selector discovery, invoke, broadcast, subscribe, large-fleet tests across NATS/Zenoh where applicable | `144 collected` |

## Coverage Matrix

| Area | What Was Tested | Representative Tests | Confidence |
|---|---|---|---|
| Hierarchical discovery compatibility | `describe_fleet`, `list_devices`, `get_device_functions`, small-fleet expansion, pagination, missing device, status/function counts, deprecation warnings | `test_tools_hierarchical.py`, `packages/.../test_discover.py` | Medium. Compatibility path is covered on small fleets; large-fleet stress is on the selector APIs. |
| Selector discovery | Device, function, and event scopes; bare names; globs; `*`; `key:value`; OR within key; AND across keys; case sensitivity; invalid selectors | `test_tools_selector.py`, `packages/.../test_discover.py` | High. Core selector behavior is covered in unit and integration tests. |
| Labels and truncation | Multi-axis `discover_labels`, per-key pagination, `label_histogram`, `more`, multivalued labels, compact broad rows, expanded drill-down schemas | `test_long_tail_label_histogram_reports_more`, `test_per_key_label_drill_down_bypasses_truncation`, `TestLongTailTruncation` | High for device/function labels; event-label scale remains thinner. |
| Function schemas | Schemas returned for narrow function result sets; broad function queries compact above threshold; drill-down returns `parameters` | `test_large_function_set_stays_compact_and_supports_drill_down`, `test_heterogeneous_discovery_outputs_expected_matrix` | High. Verifies progressive narrowing before returning schemas. |
| Heterogeneous fleets | Sensors, cameras, and robots with different functions/labels; exact category/function histograms; exact `(device_id, function)` matrix | `test_heterogeneous_discovery_outputs_expected_matrix` | High for mixed function discovery. |
| `invoke` | Single-target success, no-match, ambiguous-match, invalid scope, event-scope rejection, JSON-RPC/connection errors, bounded ambiguity preview | `test_tools_invoke.py`, `packages/.../test_invoke.py`, `test_invoke_ambiguity_stays_bounded_in_large_fleet` | High. Exactly-one semantics and common error cases are covered. |
| `invoke_many` | Fan-out success, zero candidates, function-only selector, partial failures, per-target timeout forwarding, concurrency cap, heterogeneous target isolation | `test_invoke_many_partial_failure_accounting_at_scale`, `test_heterogeneous_invoke_many_targets_only_matching_functions` | High. Sync fan-out behavior is covered at small and large scale. |
| ESTOP alias | `function(estop)` discovery and `invoke_many` fan-out target only functions named `estop`, excluding unrelated `safety:critical` decoys | `test_invoke_many_estop_alias_targets_only_estop_functions_at_scale` | Medium-high. API selector/fan-out behavior is covered; this is not a physical safety certification. |
| `broadcast` | Correlation envelope, candidate count, target envelope, zero matches, invalid scope, invalid predicate, publish failure, safety warning, large fan-out replies | `test_broadcast_large_fan_out_returns_correlation_and_target_count`, `test_heterogeneous_broadcast_replies_only_from_matching_functions` | High for async fan-out core. |
| `where` and bindings | CEL predicate filtering, namespaced `bindings`, edge self-election at scale, startup warning for missing predicate support, bounded fail-closed evaluation | `test_broadcast_where_self_election_narrows_large_candidates`, `test_broadcast_where_with_bindings`, `test_device_where.py` | High for current predicate behavior. |
| `fire_at` | Scheduled fan-out, `actually_fired_at` spread, late `on_late=skip` behavior | `test_broadcast_fire_at_synchronizes_large_fan_out`, `test_broadcast_fire_at_late_with_skip_drops` | Medium-high. Large-fleet happy path and small-fleet late/skip behavior are covered. |
| Subscriptions and replies | `subscribe("correlation:<id>")`, live top-level `event(...)`, snapshot `device(...).event(...)`, iterator protocol, race-safe buffer drain, `await_replies` partial/complete collection | `test_subscribe_correlation_drains_large_reply_stream`, `test_subscribe_top_level_event_selector_includes_late_joiners`, `test_subscribe_device_event_selector_is_snapshot`, `test_subscribe.py` | Medium-high. Correlation replies and event subscription semantics are explicitly covered. |

## Large-Scale Testing Approach

Routine PR and CI validation should continue to use 200 real NATS-backed
simulated devices. That size exercises registry discovery, selectors,
pagination, truncation, fan-out, and correlation replies while keeping routine
validation fast and reliable.

10K-device confidence should come from a synthetic registry/load test in a
nightly or release workflow. That test should seed or emulate the registry at
10K devices and measure selector, label, pagination, and operation-resolution
behavior without launching 10K live edge runtimes. Real 10K runtime validation
should wait until server-side selector push-down exists, because the current
implementation still loads the full registry roster before matching.

## Release Notes

| Topic | Release Note |
|---|---|
| Event subscriptions | `subscribe("event(...)")` is live by event name and includes late-joining devices that emit those event names. `subscribe("device(...).event(...)")` snapshots the resolved device set at subscription time. |
| Predicate support | `where` predicates require predicate support at the dispatcher and participating edges. Missing edge support logs a startup warning and fails closed for `where` broadcasts. Predicate evaluation is bounded and fail-closed on timeout. |
| Discovery scale | The current implementation is validated with 200 real simulated devices in PR/CI. Server-side selector push-down remains future work for larger production-scale fleets. |
| CLI migration | Local mDNS scanning is `devctl mdns-scan` or `devctl scan`. Selector-based fleet discovery uses `devctl discover`. |
| Extension metadata | Protocol/vendor-specific schema extensions are treated as opaque by Device Connect and should be tested by the owning protocol integration. |
| ESTOP | The `function(estop)` selector pattern is covered for API discovery and fan-out. This is not a safety certification of physical devices, policy enforcement, or fail-safe behavior. |
42 changes: 36 additions & 6 deletions docs/discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ is whether the caller waits for replies and how they arrive.
| `invoke(selector, params)` | exactly one (device, function) tuple | sync, single result |
| `invoke_many(selector, params, timeout=)` | any number of (device, function) tuples | sync, aggregated |
| `broadcast(selector, params, where=, bindings=, fire_at=, on_late=)` | any number of (device, function) tuples | async; correlation-tagged replies stream as events |
| `subscribe(selector)` | events, or `"correlation:<id>"` for broadcast replies | live stream (`Subscription` handle) |
| `subscribe(selector)` | events, or `"correlation:<id>"` for broadcast replies | stream (`Subscription` handle) |
| `await_replies(correlation_id, timeout=, until=)` | replies for one broadcast | sync helper that subscribes, collects, returns |

`invoke_many` runs every target's call in parallel and returns when each
Expand All @@ -152,6 +152,15 @@ failures do not abort siblings; the response carries both `results` and
subject keyed by that id. Subscribe with `subscribe("correlation:<id>")`
or block with `await_replies(correlation_id, timeout=...)`.

`subscribe("event(...)")` is fleet-live by event name: it resolves the
matching event names once, then subscribes with a device wildcard so
late-joining devices that emit those event names are included. In
contrast, `subscribe("device(...).event(...)")` is device-set snapshot:
it resolves the matching devices at subscription time and subscribes to
those concrete device/event subjects. Use top-level `event(...)` for
long-running fleet monitors and the device-anchored form when you want a
predictable fixed target set.

### Edge-side `where` predicate

`broadcast` accepts an optional `where` expression that runs at each
Expand All @@ -178,11 +187,16 @@ predicate evaluator is an optional install:

```
pip install device-connect-agent-tools[predicate]
pip install device-connect-edge[predicate]
```

Without the extra, calling `broadcast(..., where=...)` returns an
`invalid_predicate` error immediately at the dispatcher; calls without a
`where` work unchanged.
`invalid_predicate` error immediately at the dispatcher. Edges that do
not have the predicate extra log a startup warning and fail closed for
`where` broadcasts, skipping execution rather than running an
unevaluated predicate. Predicate evaluation is also wall-clock bounded
at the edge; timeouts are logged with the broadcast correlation id and
fail closed. Calls without a `where` work unchanged.

### Synchronized fan-out (`fire_at` + `on_late`)

Expand Down Expand Up @@ -211,8 +225,9 @@ broadcast would produce.

The three response shapes below — one for `discover`, two for
`discover_labels` — are the source of truth for callers. Fields not
listed are reserved for forward-compatible extensions; do not rely on
field order.
listed are reserved for forward-compatible, protocol-specific, or
vendor-specific extensions. Treat those extension fields as opaque unless
they are documented by Device Connect; do not rely on field order.

### `discover`

Expand Down Expand Up @@ -479,13 +494,28 @@ with subscribe("device(location:lab-A/*).event(modality:motion)") as sub:
handle(event)
```

Use `subscribe("event(modality:motion)")` instead for a long-running
fleet-wide monitor that should also receive matching events from devices
that join after the subscription starts.

## CLI

The same selector syntax drives the operator CLIs. Every CLI command
maps to the matching Python tool call.

### CLI migration note

This release splits local mDNS scanning from selector discovery. Use
`devctl mdns-scan --timeout 5` or the shorter `devctl scan --timeout 5`
to find uncommissioned devices on the local network. `devctl discover`
now owns selector-driven discovery over the registered fleet.

```
# Discovery (devctl)
# Local mDNS scan (devctl)
devctl mdns-scan [--timeout T]
devctl scan [--timeout T]

# Selector discovery (devctl)
devctl discover "<selector>" [--offset N] [--limit M]
devctl discover-labels [--key K] [--offset N] [--limit M]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,12 +1064,61 @@ def _correlation_subjects(conn: Any, correlation_id: str) -> list[str]:
]


def _event_names_for_filter(selector: str) -> tuple[list[str] | None, dict[str, Any] | None]:
"""Resolve top-level ``event(...)`` to event names for live wildcard subs."""
try:
sel = parse_selector(selector)
except SelectorParseError as e:
return None, _empty_envelope(error=_error("selector_parse_error", str(e)))
if sel.scope != Scope.EVENT_ONLY:
return None, _empty_envelope(
scope=sel.scope.value,
error=_error(
"invalid_subscribe_scope",
"top-level live event subscriptions require event(...) scope; "
f"got scope={sel.scope.value!r}",
),
)

rows: list[dict] = []
offset = 0
while True:
page = discover(selector, offset=offset, limit=DISCOVER_HARD_LIMIT)
if "error" in page:
return None, page
rows.extend(page["results"])
if page["next_offset"] is None:
break
offset = page["next_offset"]

names = sorted({
row.get("name") for row in rows
if row.get("name")
})
return names, None


def _event_subjects_for_selector(selector: str) -> tuple[list[str] | None, dict[str, Any] | None]:
"""Resolve an event-scoped selector to per-device subjects.

Returns ``(subjects, None)`` on success or ``(None, error_envelope)``
if the selector failed to parse or used a non-event scope.
"""
try:
sel = parse_selector(selector)
except SelectorParseError as e:
return None, _empty_envelope(error=_error("selector_parse_error", str(e)))

if sel.scope == Scope.EVENT_ONLY:
names, error = _event_names_for_filter(selector)
if error is not None:
return None, error
conn = get_connection()
return [
f"device-connect.{conn.zone}.*.event.{name}"
for name in (names or [])
], None

rows: list[dict] = []
offset = 0
while True:
Expand Down Expand Up @@ -1113,8 +1162,11 @@ def subscribe(selector: str) -> Subscription:
Args:
selector: One of:
- ``"correlation:<id>"`` for broadcast replies of a prior call.
- An event-scoped selector (``event(<name>)`` or
``device(...).event(<name>)``) for live event streams.
- ``event(...)`` for live event streams. Matching event names are
resolved once, then subscribed with a device wildcard so devices
that join later and emit those event names are included.
- ``device(...).event(...)`` for a snapshot event stream over the
devices resolved when the subscription is created.

Returns:
A :class:`Subscription` handle. Iterate with ``sub.iter(timeout)``
Expand Down
7 changes: 7 additions & 0 deletions packages/device-connect-agent-tools/tests/test_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ def test_event_selector_subscribes_per_device(self, fake_conn):
assert subj.endswith(".event.object_detected")
sub.close()

def test_top_level_event_selector_subscribes_by_event_name_wildcard(self, fake_conn):
sub = tools_mod.subscribe("event(object_detected)")
assert fake_conn.subscribed_subjects == [
"device-connect.default.*.event.object_detected"
]
sub.close()

def test_event_selector_zero_matches_returns_idle(self, fake_conn):
sub = tools_mod.subscribe("event(no_such_event)")
assert fake_conn.subscribed_subjects == []
Expand Down
51 changes: 50 additions & 1 deletion packages/device-connect-edge/device_connect_edge/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def capture_image(self, resolution: str = "1080p") -> dict:
import logging
import os
import re
import threading
import time
import uuid
from pathlib import Path
Expand Down Expand Up @@ -1317,7 +1318,9 @@ def _evaluate_where(
"status": status_dict,
"bindings": bindings or {},
}
return bool(predicate.evaluate(context))
return self._evaluate_where_with_timeout(
predicate, context, correlation_id,
)
except Exception as e:
self._logger.warning(
"Broadcast %s: where predicate failed (skipping): %s",
Expand All @@ -1326,6 +1329,50 @@ def _evaluate_where(
return False


def _evaluate_where_with_timeout(
self,
predicate: Any,
context: Dict[str, Any],
correlation_id: str,
timeout_s: float = 0.05,
) -> bool:
"""Evaluate a predicate with a short wall-clock deadline."""
result: Dict[str, Any] = {}

def _run() -> None:
try:
result["value"] = bool(predicate.evaluate(context))
except Exception as e:
result["error"] = e

thread = threading.Thread(target=_run, daemon=True)
thread.start()
thread.join(timeout_s)
if thread.is_alive():
self._logger.warning(
"Broadcast %s: where predicate timed out after %.3fs (skipping)",
correlation_id, timeout_s,
)
return False
if "error" in result:
raise result["error"]
return bool(result.get("value", False))


def _warn_if_predicate_extra_missing(self) -> None:
"""Log once at startup when edge-side CEL predicates are unavailable."""
try:
from device_connect_edge.predicate import compile_where
compile_where("true")
except Exception as e:
self._logger.warning(
"Edge-side where predicates are unavailable on this device: %s. "
"Install the predicate extra on every edge that should evaluate "
"broadcast where clauses: pip install 'device-connect-edge[predicate]'",
e,
)


async def _event_dispatch_loop(self) -> None:
"""Send queued events, retrying on failure."""

Expand Down Expand Up @@ -1552,6 +1599,8 @@ async def run(self) -> None:
# Set up DeviceDriver capabilities (router, registry, subscriptions)
await self._setup_agentic_driver()

self._warn_if_predicate_extra_missing()

# Start device routines (@periodic decorated methods)
await self._driver._start_routines()

Expand Down
67 changes: 67 additions & 0 deletions packages/device-connect-edge/tests/test_device_where.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0

"""Unit tests for edge-side broadcast ``where`` runtime behavior."""

import time
from unittest.mock import Mock

from device_connect_edge import DeviceRuntime


class _SlowPredicate:
def evaluate(self, context):
time.sleep(0.2)
return True


class _RaisingPredicate:
def evaluate(self, context):
raise RuntimeError("bad predicate")


def test_where_eval_timeout_fails_closed_and_warns():
runtime = DeviceRuntime(device_id="where-timeout-test")
runtime._logger = Mock()

result = runtime._evaluate_where_with_timeout(
_SlowPredicate(), {}, "corr-timeout", timeout_s=0.01,
)

assert result is False
runtime._logger.warning.assert_called_once()
assert "where predicate timed out" in (
runtime._logger.warning.call_args.args[0]
)


def test_where_eval_error_propagates_to_caller():
runtime = DeviceRuntime(device_id="where-error-test")

try:
runtime._evaluate_where_with_timeout(
_RaisingPredicate(), {}, "corr-error", timeout_s=0.1,
)
except RuntimeError as exc:
assert "bad predicate" in str(exc)
else:
raise AssertionError("expected predicate exception to propagate")


def test_missing_predicate_extra_warns_at_startup(monkeypatch):
import device_connect_edge.predicate as predicate_mod

def _missing_extra(expression):
raise predicate_mod.PredicateCompileError("cel-python missing")

runtime = DeviceRuntime(device_id="where-missing-extra-test")
runtime._logger = Mock()
monkeypatch.setattr(predicate_mod, "compile_where", _missing_extra)

runtime._warn_if_predicate_extra_missing()

runtime._logger.warning.assert_called_once()
assert "Edge-side where predicates are unavailable" in (
runtime._logger.warning.call_args.args[0]
)
Loading
Loading