From 6e2ad46ce99a0806e2f5dca3df864a7a4f942b0e Mon Sep 17 00:00:00 2001 From: mikeschulte Date: Mon, 13 Apr 2026 08:41:07 -0500 Subject: [PATCH] force --name, auto-append on dupe - for 'auth' into gateway --- API.md | 30 +++++++++-------- CHANGELOG.md | 43 +++++++++++++++++++++++++ README.md | 15 +++++++++ plexus/client.py | 16 +++++++++- plexus/config.py | 34 ++++++++++++++++++++ plexus/ws.py | 33 +++++++++++++++++-- scripts/setup.sh | 56 +++++++++++++++++++++++++------- tests/test_ws.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++-- 8 files changed, 279 insertions(+), 31 deletions(-) diff --git a/API.md b/API.md index d8b9042..e55e2e5 100644 --- a/API.md +++ b/API.md @@ -152,34 +152,36 @@ For real-time UI-controlled streaming, devices connect via WebSocket. ### Device Authentication -Devices authenticate using an API key: +Devices authenticate using an API key. The `source_id` in the request is the device's *desired* name; the server may return a different, auto-suffixed name in the `authenticated` frame if the desired name is already claimed by another device (see [Device identity](../README.md#device-identity) in the README). ```json // Device → Server { "type": "device_auth", "api_key": "plx_xxxxx", - "source_id": "my-device-001", - "platform": "Linux", - "sensors": [ - { - "name": "MPU6050", - "description": "6-axis IMU", - "metrics": ["accel_x", "accel_y", "accel_z", "gyro_x", "gyro_y", "gyro_z"], - "sample_rate": 100, - "prefix": "", - "available": true - } - ] + "source_id": "drone-01", + "install_id": "c9f2e0b46f4a4f6a8c3e1d5b0a2e7f91", + "platform": "python-sdk", + "agent_version": "0.3.1" } // Server → Device { "type": "authenticated", - "source_id": "my-device-001" + "source_id": "drone-01" +} + +// Server → Device (collision case) +{ + "type": "authenticated", + "source_id": "drone-01_2" } ``` +The SDK **adopts** whatever `source_id` the server returns and uses it for all subsequent frames, heartbeats, and reconnects. It also persists the assigned name locally so reconnects go straight to the claimed slot. + +`install_id` is a stable per-installation UUID, generated on the device's first run and saved to `~/.plexus/config.json`. It lets the server distinguish a rebooting device from a new device trying to claim an existing name. Legacy SDKs that omit `install_id` continue to work as before (the server passes the declared `source_id` through unchanged). + ### Message Types (Dashboard → Device) | Type | Description | diff --git a/CHANGELOG.md b/CHANGELOG.md index f6532b7..02ae521 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,48 @@ # Changelog +## [Unreleased] - Stable device identity + +The gateway is now authoritative for a device's `source_id`. The SDK sends a +locally-generated `install_id` in the auth frame; the gateway atomically +claims `(org, source_id)` and, if the desired name is already owned by a +different install, returns an auto-suffixed name (`drone-01` → `drone-01_2` +→ `drone-01_3`…) in the `authenticated` frame. The SDK adopts and persists +the assigned name so subsequent reconnects are stable. + +This fixes the silent stream-merging that happened when cloned SD-card +images shared a hostname or when two operators picked the same name. + +### Added + +- `plexus.config.get_install_id()` — lazy per-installation UUID, persisted + to `~/.plexus/config.json`. **Not** written by `setup.sh`: it's minted by + the SDK on first run so pre-baked images get distinct IDs per boot. +- `plexus.config.set_source_id()` — persist the gateway-assigned name after + auto-suffix resolution. +- `WebSocketTransport(install_id=..., on_source_id_assigned=...)` — the + transport sends `install_id` in the `device_auth` frame and invokes the + callback whenever the gateway returns a different `source_id` than + requested. + +### Changed + +- `WebSocketTransport` now reads the `source_id` back from the + `authenticated` frame and updates `self.source_id` in place if the gateway + auto-suffixed. The rename is logged at INFO level on first occurrence. +- `Plexus` wires `install_id` into the transport and persists the assigned + `source_id` to config on rename. +- `scripts/setup.sh` — `--name` is **required**. The hostname fallback is + removed (it was the main source of cloned-image collisions). In a TTY the + script prompts interactively; in non-TTY it exits with an error. Names are + validated against `^[a-z0-9][a-z0-9_-]{1,62}$`. Stale `plexus start` / + `plexus reset` hints were dropped. + +### Wire-protocol (compatible) + +- `device_auth` frame gains an optional `install_id` field. The gateway + treats a missing `install_id` as legacy pass-through, so older SDKs and + the C SDK continue to work unchanged. + ## [0.3.0] - WebSocket transport Adds a wire-compatible WebSocket transport matching the `plexus-c` SDK. WS is now the default; failed sends transparently fall back to `POST /ingest`. diff --git a/README.md b/README.md index d1056c1..1288169 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,21 @@ px.send("temperature", 72.5) Get an API key at [app.plexus.company](https://app.plexus.company) → Devices → Add Device. +## Device identity + +Every device needs a unique `source_id`. The recommended way to set one on a real host is the bootstrap script, which requires a device name up front: + +```bash +curl -sL https://app.plexus.company/setup | bash -s -- \ + --key plx_xxx --name drone-01 +``` + +The name must match `^[a-z0-9][a-z0-9_-]{1,62}$`. `setup.sh` refuses to run without `--name` (or without a TTY to prompt for one) — this is deliberate, because the previous `hostname` fallback silently merged telemetry from cloned SD-card images that all booted as `raspberrypi`. + +**If two devices end up requesting the same name**, the gateway auto-suffixes: the first connection gets `drone-01`, the second gets `drone-01_2`, the third `drone-01_3`, and so on. The SDK logs the rename at INFO and persists the assigned name to `~/.plexus/config.json` so the device keeps its identity across reboots. Under the hood, a per-installation UUID (`install_id`, lazily generated on first run) is what lets the gateway tell "same device reconnecting" from "different device claiming the same name." + +In normal code, you usually just pass `source_id=...` explicitly to `Plexus(...)` and never have to think about it. + ## Usage ```python diff --git a/plexus/client.py b/plexus/client.py index a61feb0..fcd330a 100644 --- a/plexus/client.py +++ b/plexus/client.py @@ -49,7 +49,9 @@ get_endpoint, get_gateway_url, get_gateway_ws_url, + get_install_id, get_source_id, + set_source_id, ) logger = logging.getLogger(__name__) @@ -259,7 +261,7 @@ def send_batch( ("position", {"x": 1.0, "y": 2.0}), ]) """ - ts = timestamp or time.time() + ts = timestamp if timestamp is not None else time.time() data_points = [self._make_point(m, v, ts, tags) for m, v in points] return self._send_points(data_points) @@ -273,11 +275,23 @@ def _ensure_ws(self): api_key=self.api_key, source_id=self.source_id, ws_url=self._ws_url, + install_id=get_install_id(), agent_version=__version__, + on_source_id_assigned=self._on_source_id_assigned, ) self._ws.start() return self._ws + def _on_source_id_assigned(self, assigned: str) -> None: + """Callback from WebSocketTransport when the gateway returns an + auto-suffixed source_id. Persists it so subsequent runs (and the HTTP + fallback path in this process) use the assigned name directly.""" + self.source_id = assigned + try: + set_source_id(assigned) + except Exception as e: # pragma: no cover - persistence failure is non-fatal + logger.debug("failed to persist assigned source_id: %s", e) + def on_command( self, name: str, diff --git a/plexus/config.py b/plexus/config.py index b6b6b03..4b062bf 100644 --- a/plexus/config.py +++ b/plexus/config.py @@ -145,6 +145,40 @@ def get_source_id() -> Optional[str]: return source_id +def get_install_id() -> str: + """Get the device install ID, generating one if not set. + + The install_id is a stable per-installation UUID. It is generated lazily + on first run (NOT at image-build time) so that cloned SD-card images + naturally get distinct install_ids on their first boot. The gateway uses + it to tell "same device reconnecting" from "different device claiming the + same name" when resolving source_id collisions. + """ + config = load_config() + install_id = config.get("install_id") + + if not install_id: + import uuid + install_id = uuid.uuid4().hex + config["install_id"] = install_id + save_config(config) + + return install_id + + +def set_source_id(source_id: str) -> None: + """Persist an updated source_id to the config file. + + Called by the SDK when the gateway returns an auto-suffixed name so the + assigned name is stable across reconnects. + """ + config = load_config() + if config.get("source_id") == source_id: + return + config["source_id"] = source_id + save_config(config) + + def get_persistent_buffer() -> bool: """Get persistent buffer setting. Default True (store-and-forward enabled).""" config = load_config() diff --git a/plexus/ws.py b/plexus/ws.py index ba43477..3918321 100644 --- a/plexus/ws.py +++ b/plexus/ws.py @@ -5,8 +5,14 @@ `/ws/device` endpoint and exchanges the same JSON frames: client → {"type": "device_auth", "api_key": ..., "source_id": ..., - "platform": "python-sdk", "agent_version": ..., "commands": [...]} + "install_id": ..., "platform": "python-sdk", + "agent_version": ..., "commands": [...]} server → {"type": "authenticated", "source_id": ...} + +The server-returned `source_id` in the `authenticated` frame is +authoritative: if the gateway auto-suffixed on a collision (e.g. the +desired name was already claimed by a different install_id), the +client's `source_id` is updated in place to match. client → {"type": "telemetry", "points": [...]} client → {"type": "heartbeat", "source_id": ..., "agent_version": ...} # every 30s server → {"type": "typed_command", "id": ..., "command": ..., "params": {...}} @@ -78,9 +84,11 @@ def __init__( source_id: str, ws_url: str, *, + install_id: str = "", agent_version: str = "0.0.0", platform: str = "python-sdk", auto_reconnect: bool = True, + on_source_id_assigned: Optional[Callable[[str], None]] = None, ): if not api_key: raise ValueError("api_key required") @@ -89,10 +97,12 @@ def __init__( self.api_key = api_key self.source_id = source_id + self.install_id = install_id self.ws_url = _ensure_device_path(ws_url) self.agent_version = agent_version self.platform = platform self.auto_reconnect = auto_reconnect + self._on_source_id_assigned = on_source_id_assigned self._commands: Dict[str, _RegisteredCommand] = {} self._ws: Optional[websocket.WebSocket] = None @@ -184,13 +194,16 @@ def _connect_and_serve(self) -> None: self._ws = ws # 1. Send device_auth + desired_source_id = self.source_id auth = { "type": "device_auth", "api_key": self.api_key, - "source_id": self.source_id, + "source_id": desired_source_id, "platform": self.platform, "agent_version": self.agent_version, } + if self.install_id: + auth["install_id"] = self.install_id if self._commands: auth["commands"] = [c.to_manifest() for c in self._commands.values()] ws.send(json.dumps(auth)) @@ -206,6 +219,22 @@ def _connect_and_serve(self) -> None: if msg.get("type") != "authenticated": raise RuntimeError(f"auth failed: {msg}") + # The gateway may return a different source_id if the desired name + # was already claimed by another install — adopt the assigned value + # so all subsequent frames (heartbeats, future reconnects) use it. + assigned = msg.get("source_id") + if isinstance(assigned, str) and assigned and assigned != self.source_id: + logger.info( + "plexus ws source_id auto-suffixed: requested=%s assigned=%s", + desired_source_id, assigned, + ) + self.source_id = assigned + if self._on_source_id_assigned is not None: + try: + self._on_source_id_assigned(assigned) + except Exception as e: # pragma: no cover - callback errors must not break auth + logger.debug("on_source_id_assigned callback raised: %s", e) + self._authenticated.set() self._backoff_attempt = 0 logger.info("plexus ws authenticated as %s", self.source_id) diff --git a/scripts/setup.sh b/scripts/setup.sh index b5b3ecf..17393b6 100755 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -43,6 +43,41 @@ while [[ $# -gt 0 ]]; do esac done +# --name is required so every device lands on a deliberate, unique identifier. +# Previously we fell back to $(hostname), which silently merged streams on +# cloned SD-card images where every device shared a hostname. If --name is +# missing, prompt interactively in a TTY or fail loudly in non-interactive +# contexts. Validation keeps the name safe for Redis keys, URLs, and logs. +validate_device_name() { + local name="$1" + if [[ ! "$name" =~ ^[a-z0-9][a-z0-9_-]{1,62}$ ]]; then + echo -e " ${RED}Invalid device name: \"$name\"${NC}" + echo " Name must start with a letter or digit and contain only" + echo " lowercase letters, digits, '-', or '_' (2-63 chars total)." + return 1 + fi + return 0 +} + +if [ -z "$DEVICE_NAME" ]; then + if [ -t 0 ]; then + echo "" + echo " Every device needs a unique name (e.g. drone-01, greenhouse-north)." + while [ -z "$DEVICE_NAME" ]; do + read -rp " Device name: " DEVICE_NAME + if [ -n "$DEVICE_NAME" ] && ! validate_device_name "$DEVICE_NAME"; then + DEVICE_NAME="" + fi + done + else + echo -e " ${RED}Error: --name is required${NC}" >&2 + echo " Example: curl -sL app.plexus.company/setup | bash -s -- --key plx_... --name drone-01" >&2 + exit 1 + fi +elif ! validate_device_name "$DEVICE_NAME"; then + exit 1 +fi + echo "" echo "┌─────────────────────────────────────────┐" echo "│ Plexus Agent Setup │" @@ -282,14 +317,17 @@ echo "" if [ -n "$API_KEY" ]; then mkdir -p "$HOME/.plexus" ENDPOINT="https://app.plexus.company" - SOURCE_ID="${DEVICE_NAME:-$(hostname)}" - echo "{\"api_key\":\"$API_KEY\",\"endpoint\":\"$ENDPOINT\",\"source_id\":\"$SOURCE_ID\"}" > "$HOME/.plexus/config.json" + # install_id is intentionally NOT written here. The SDK generates it + # lazily on first run (plexus.config.get_install_id) so that pre-baked + # SD-card images get distinct install_ids per boot rather than sharing + # whatever we'd stamp here. + echo "{\"api_key\":\"$API_KEY\",\"endpoint\":\"$ENDPOINT\",\"source_id\":\"$DEVICE_NAME\"}" > "$HOME/.plexus/config.json" export PLEXUS_API_KEY="$API_KEY" echo -e " ${GREEN}✓ API key configured${NC}" - if [ -n "$ORG_ID" ]; then - echo -e " ${GREEN}✓ Organization resolved${NC}" - fi + echo -e " ${GREEN}✓ Device name: ${CYAN}$DEVICE_NAME${NC}" + echo " (the gateway may auto-suffix this if the name is already taken;" + echo " the assigned name will be logged on first connect)" echo "" else echo " No API key provided." @@ -297,9 +335,7 @@ else echo " To authenticate this device:" echo "" echo " 1. Get an API key from ${CYAN}https://app.plexus.company${NC} → Settings → Developer" - echo " 2. Run: ${CYAN}plexus start --key plx_xxxxx${NC}" - echo "" - echo " Run ${CYAN}plexus start${NC} to sign in and connect." + echo " 2. Re-run this installer with: ${CYAN}--key plx_xxxxx --name $DEVICE_NAME${NC}" echo "" fi @@ -308,10 +344,6 @@ echo "──────────────────────── echo "" echo -e " ${GREEN}Setup complete!${NC}" echo "" -echo " Quick commands:" -echo " plexus start # Set up and stream" -echo " plexus reset # Clear config and start over" -echo "" echo " Dashboard: ${CYAN}https://app.plexus.company${NC}" echo "" echo " To uninstall:" diff --git a/tests/test_ws.py b/tests/test_ws.py index 0631134..8bb8386 100644 --- a/tests/test_ws.py +++ b/tests/test_ws.py @@ -26,9 +26,13 @@ class _StubGateway: """Minimal gateway stub. Records every frame the client sends.""" - def __init__(self): + def __init__(self, assigned_source_id: str | None = None): self.received: List[Dict[str, Any]] = [] self.auth_frame: Dict[str, Any] = {} + # If set, the stub returns this value in the authenticated frame + # regardless of what the client asked for — used to exercise the + # auto-suffix path. + self.assigned_source_id = assigned_source_id self._loop: asyncio.AbstractEventLoop | None = None self._server = None self._thread: threading.Thread | None = None @@ -53,9 +57,10 @@ async def _handler(self, ws, path="/ws/device"): raw = await ws.recv() msg = json.loads(raw) self.auth_frame = msg + returned_source_id = self.assigned_source_id or msg.get("source_id") await ws.send(json.dumps({ "type": "authenticated", - "source_id": msg.get("source_id"), + "source_id": returned_source_id, })) try: async for raw in ws: @@ -120,6 +125,7 @@ def test_auth_handshake_and_telemetry(gateway): api_key="plx_test_abc", source_id="drone-001", ws_url=_url(gateway.port), + install_id="install-A", agent_version="9.9.9", ) t.start() @@ -130,6 +136,7 @@ def test_auth_handshake_and_telemetry(gateway): assert gateway.auth_frame["type"] == "device_auth" assert gateway.auth_frame["api_key"] == "plx_test_abc" assert gateway.auth_frame["source_id"] == "drone-001" + assert gateway.auth_frame["install_id"] == "install-A" assert gateway.auth_frame["platform"] == "python-sdk" assert gateway.auth_frame["agent_version"] == "9.9.9" # commands is omitted when none registered @@ -248,6 +255,78 @@ def bad(name, params): t.stop() +def test_install_id_omitted_when_empty(): + # Default install_id="" should not leak an empty install_id field into + # the auth frame — that keeps the wire shape identical for legacy SDK + # builds that don't set one. + g = _StubGateway() + g.start() + try: + t = WebSocketTransport( + api_key="plx_test_abc", + source_id="drone-001", + ws_url=_url(g.port), + ) + t.start() + try: + assert t.wait_authenticated(timeout=3) + assert "install_id" not in g.auth_frame + finally: + t.stop() + finally: + g.stop() + + +def test_server_assigned_source_id_is_adopted(): + # Simulate the auto-suffix path: SDK asks for "drone-01", the gateway + # returns "drone-01_2" in the authenticated frame. The transport must + # adopt the assigned name and fire the on_source_id_assigned callback. + g = _StubGateway(assigned_source_id="drone-01_2") + g.start() + try: + seen: List[str] = [] + t = WebSocketTransport( + api_key="plx_test_abc", + source_id="drone-01", + ws_url=_url(g.port), + install_id="install-B", + on_source_id_assigned=lambda s: seen.append(s), + ) + t.start() + try: + assert t.wait_authenticated(timeout=3) + assert t.source_id == "drone-01_2" + assert seen == ["drone-01_2"] + finally: + t.stop() + finally: + g.stop() + + +def test_same_assigned_source_id_does_not_fire_callback(): + # Happy path — gateway returns the same name. No callback, source_id unchanged. + g = _StubGateway() # echoes whatever was sent + g.start() + try: + seen: List[str] = [] + t = WebSocketTransport( + api_key="plx_test_abc", + source_id="drone-01", + ws_url=_url(g.port), + install_id="install-A", + on_source_id_assigned=lambda s: seen.append(s), + ) + t.start() + try: + assert t.wait_authenticated(timeout=3) + assert t.source_id == "drone-01" + assert seen == [] + finally: + t.stop() + finally: + g.stop() + + def test_ensure_device_path(): from plexus.ws import _ensure_device_path assert _ensure_device_path("wss://foo") == "wss://foo/ws/device"