From c22f977e77179b1a2955b1a22112b5a16941f676 Mon Sep 17 00:00:00 2001 From: nagar-decart Date: Fri, 29 May 2026 15:19:00 +0300 Subject: [PATCH 1/6] realtime(observability): forward diagnostics + stats over the signaling WS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pipes client-side WebRTC / ICE / networking observability events through the existing realtime WebSocket as `{type: "observability", data}` messages. Bouncer logs them under the session's structured-log context in Datadog (DecartAI/api#1882), so SDK-side observations land correlated with our server-side LiveKit / inference traces without any new endpoints, auth, or transport. What flows over the WS now: - Every diagnostic emitted from `RealtimeObservability.diagnostic()` (e.g. `client-session-connection-breakdown`, `reconnect`, `videoStall`) - Every periodic WebRTC stats snapshot collected by the in-process stats collector Wiring: - `SignalingChannel.sendObservability(data)` — new public, fire-and-forget method that writes `{type: "observability", data}` (drops silently if the socket isn't open; never throws). - `RealtimeObservability.setObservabilityForwarder(fn | null)` — sink set by `StreamSession.createTransport()` after the signaling channel is constructed, and cleared on `tearDown()`. - `ObservabilityMessage` added to `OutgoingRealtimeMessage` union. This is intentionally additive to existing observability: the local `onDiagnostic`/`onStats` callbacks and the platform telemetry POSTs keep working as before. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../observability/realtime-observability.ts | 14 ++++++++++++++ packages/sdk/src/realtime/signaling-channel.ts | 15 +++++++++++++++ packages/sdk/src/realtime/stream-session.ts | 7 +++++++ packages/sdk/src/realtime/types.ts | 14 +++++++++++++- 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/realtime/observability/realtime-observability.ts b/packages/sdk/src/realtime/observability/realtime-observability.ts index 7eefe60..a55ef6a 100644 --- a/packages/sdk/src/realtime/observability/realtime-observability.ts +++ b/packages/sdk/src/realtime/observability/realtime-observability.ts @@ -51,13 +51,26 @@ export class RealtimeObservability { private videoStalled = false; private stallStartMs = 0; private connectionBreakdown: ConnectionBreakdownBuffer | null = null; + /** + * Sink for forwarding diagnostics and stats over the existing realtime + * WebSocket (bouncer logs them to Datadog under the session's context). + * Set by `StreamSession` after the signaling channel is created; + * cleared on tearDown. + */ + private observabilityForwarder: ((payload: unknown) => void) | null = null; constructor(private readonly options: RealtimeObservabilityOptions) {} + /** Wire/unwire the WebSocket-side observability sink. Idempotent. */ + setObservabilityForwarder(fn: ((payload: unknown) => void) | null): void { + this.observabilityForwarder = fn; + } + diagnostic(name: K, data: DiagnosticEvents[K], timestamp: number = Date.now()): void { this.options.logger.debug(name, data as Record); this.options.onDiagnostic?.({ name, data } as DiagnosticEvent); this.addTelemetryDiagnostic(name, data, timestamp); + this.observabilityForwarder?.({ kind: "diagnostic", name, data, timestamp }); } beginConnectionBreakdown(attempt: number, initialImageSizeKb: number | null): void { @@ -204,6 +217,7 @@ export class RealtimeObservability { this.options.onStats?.(stats); this.telemetryReporter.addStats(stats); this.detectVideoStall(stats); + this.observabilityForwarder?.({ kind: "stats", stats }); } private detectVideoStall(stats: WebRTCStats): void { diff --git a/packages/sdk/src/realtime/signaling-channel.ts b/packages/sdk/src/realtime/signaling-channel.ts index 6b873e4..449667f 100644 --- a/packages/sdk/src/realtime/signaling-channel.ts +++ b/packages/sdk/src/realtime/signaling-channel.ts @@ -165,6 +165,21 @@ export class SignalingChannel { if (!ack.success) throw new Error(ack.error ?? "Failed to send prompt"); } + /** + * Fire-and-forget client-side observability event. Goes out over the + * existing realtime WebSocket as `{type: "observability", data}` and is + * logged by bouncer under the current session's log context. Never + * throws; quietly drops if the socket isn't open. + */ + sendObservability(data: unknown): void { + if (this.ws?.readyState !== WebSocket.OPEN) return; + try { + this.writeMessage({ type: "observability", data }); + } catch { + // Best-effort; never disrupt the session for a telemetry hiccup. + } + } + async setImage(payload: SetImagePayload, opts: ImageSetOptions = {}): Promise { const message: OutgoingRealtimeMessage = payload.kind === "ref" diff --git a/packages/sdk/src/realtime/stream-session.ts b/packages/sdk/src/realtime/stream-session.ts index 014baee..82b43c0 100644 --- a/packages/sdk/src/realtime/stream-session.ts +++ b/packages/sdk/src/realtime/stream-session.ts @@ -318,11 +318,18 @@ export class StreamSession { logger: this.logger, videoCodec: this.config.videoCodec, }); + // Forward client-side diagnostics + WebRTC stats back over the + // realtime WS so bouncer can log them to Datadog under the + // session's existing log context. + this.config.observability?.setObservabilityForwarder((payload) => { + this.signaling.sendObservability(payload); + }); this.wireSignalingEvents(); this.wireMediaEvents(); } private tearDown(): void { + this.config.observability?.setObservabilityForwarder(null); this.signaling.close(); this.media.disconnect(); this.initialStateGate.reset(); diff --git a/packages/sdk/src/realtime/types.ts b/packages/sdk/src/realtime/types.ts index 961800a..3907f36 100644 --- a/packages/sdk/src/realtime/types.ts +++ b/packages/sdk/src/realtime/types.ts @@ -129,7 +129,19 @@ export type IncomingRealtimeMessage = | LiveKitRoomInfoMessage | QueuePositionMessage; +// Client-side WebRTC / ICE / networking observability events. Free-form +// payload; logged by bouncer under the session's existing log context and +// not forwarded upstream. +export type ObservabilityMessage = { + type: "observability"; + data: unknown; +}; + // Outgoing message types (to server) -export type OutgoingRealtimeMessage = LiveKitJoinMessage | PromptMessage | SetImageMessage; +export type OutgoingRealtimeMessage = + | LiveKitJoinMessage + | PromptMessage + | SetImageMessage + | ObservabilityMessage; export type OutgoingMessage = PromptMessage | SetImageMessage; From 91a4cb1f6525b8e2260883c8f929fc583182ccc4 Mon Sep 17 00:00:00 2001 From: nagar-decart Date: Fri, 29 May 2026 15:20:17 +0300 Subject: [PATCH 2/6] style: biome format OutgoingRealtimeMessage union Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/sdk/src/realtime/types.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/sdk/src/realtime/types.ts b/packages/sdk/src/realtime/types.ts index 3907f36..2e329d0 100644 --- a/packages/sdk/src/realtime/types.ts +++ b/packages/sdk/src/realtime/types.ts @@ -138,10 +138,6 @@ export type ObservabilityMessage = { }; // Outgoing message types (to server) -export type OutgoingRealtimeMessage = - | LiveKitJoinMessage - | PromptMessage - | SetImageMessage - | ObservabilityMessage; +export type OutgoingRealtimeMessage = LiveKitJoinMessage | PromptMessage | SetImageMessage | ObservabilityMessage; export type OutgoingMessage = PromptMessage | SetImageMessage; From f58f9c782d213932a8bfb2de34f108f4ddb09710 Mon Sep 17 00:00:00 2001 From: nagar-decart Date: Fri, 29 May 2026 16:09:34 +0300 Subject: [PATCH 3/6] realtime(observability): instrument WebRTC/ICE/signaling for debug logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing observability sink only emitted three diagnostic types (client-session-connection-breakdown, reconnect, videoStall) plus periodic WebRTC stats — useful for metrics, useless for actually debugging an ICE failure. This adds raw event-level instrumentation that mirrors the iOS PR DecartAI/decart-ios#28's DecartLiveKitLogger + NetworkPathObserver coverage. New file: observability/network-instrumentation.ts Attaches addEventListener-based listeners to: - The LiveKit Room (Connected/Disconnected/Reconnecting/Reconnected, SignalReconnecting, ConnectionStateChanged, ConnectionQualityChanged, MediaDevicesError, LocalTrackPublished, ParticipantConnected, TrackSubscribed/Muted/Unmuted). - The underlying publisher + subscriber RTCPeerConnections (private access via room.engine.pcManager, with addEventListener so we never displace LiveKit's own handlers): icecandidate, icecandidateerror, iceconnectionstatechange, connectionstatechange, icegatheringstatechange, signalingstatechange, negotiationneeded, datachannel, track. ICE candidates carry full address/port/type /priority/foundation. Selected candidate pair (with RTT + addresses) is snapshotted from getStats() when ICE settles. - Browser network state (initial snapshot of navigator.connection + online/offline + visibilitychange + NetworkInformation 'change'). New methods: - RealtimeObservability.emitInstrumentationEvent(name, data) bypasses the strict DiagnosticEvents type union so per-candidate events can flow without bloating the public type surface. Signaling traffic: - SignalingChannel.writeMessage emits 'signaling-sent' (skips 'observability' to avoid recursing). - SignalingChannel.handleMessage emits 'signaling-received'. All events flow through the same observabilityForwarder -> realtime WS -> bouncer -> Datadog @event.* path that the previous change established. No new transport, no new endpoints. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/sdk/src/realtime/media-channel.ts | 9 + .../observability/network-instrumentation.ts | 370 ++++++++++++++++++ .../observability/realtime-observability.ts | 15 + .../sdk/src/realtime/signaling-channel.ts | 9 + 4 files changed, 403 insertions(+) create mode 100644 packages/sdk/src/realtime/observability/network-instrumentation.ts diff --git a/packages/sdk/src/realtime/media-channel.ts b/packages/sdk/src/realtime/media-channel.ts index 725943d..8823d42 100644 --- a/packages/sdk/src/realtime/media-channel.ts +++ b/packages/sdk/src/realtime/media-channel.ts @@ -12,6 +12,7 @@ import mitt, { type Emitter } from "mitt"; import { createConsoleLogger, type Logger } from "../utils/logger"; import { REALTIME_CONFIG } from "./config-realtime"; +import { attachRoomInstrumentation } from "./observability/network-instrumentation"; import type { RealtimeObservability } from "./observability/realtime-observability"; export type VideoCodec = "h264" | "vp8" | "vp9" | "av1"; @@ -57,6 +58,7 @@ export class MediaChannel { private remoteStream: MediaStream | null = null; private events: Emitter = mitt(); private readonly logger: Logger; + private detachInstrumentation: (() => void) | null = null; constructor(private readonly config: MediaChannelConfig) { this.logger = config.logger ?? createConsoleLogger("warn"); @@ -105,6 +107,9 @@ export class MediaChannel { await room.connect(opts.url, opts.token); this.config.observability?.endPhase("webrtc-handshake", { success: true }); this.config.observability?.setLiveKitRoom(room); + if (this.config.observability) { + this.detachInstrumentation = attachRoomInstrumentation(room, this.config.observability); + } } async publishLocalTracks(): Promise { @@ -118,6 +123,10 @@ export class MediaChannel { const room = this.room; this.room = null; this.remoteStream = null; + if (this.detachInstrumentation) { + this.detachInstrumentation(); + this.detachInstrumentation = null; + } this.config.observability?.setLiveKitRoom(null); if (room) { room.disconnect().catch(() => {}); diff --git a/packages/sdk/src/realtime/observability/network-instrumentation.ts b/packages/sdk/src/realtime/observability/network-instrumentation.ts new file mode 100644 index 0000000..0a002fe --- /dev/null +++ b/packages/sdk/src/realtime/observability/network-instrumentation.ts @@ -0,0 +1,370 @@ +/** + * WebRTC / LiveKit / browser-network instrumentation that emits raw debug + * data over the SDK observability sink (which the realtime WS forwards to + * bouncer, where Datadog ingests it under the session's log context). + * + * Captures the kind of data that's actually useful when diagnosing an ICE + * failure — every candidate gathered (host/srflx/relay/prflx, address, + * port, priority, foundation), every state transition on both transports + * (publisher/subscriber: ice/peer/gathering/signaling), candidate errors, + * the selected pair on success, signaling traffic in/out, and the + * browser's view of its own network state. + * + * The browser-side `Room` doesn't publicly expose the underlying + * `RTCPeerConnection` objects, so we reach through `room.engine.pcManager` + * via a typed-cast access path. `addEventListener` is used everywhere so + * we never displace LiveKit's own handlers. + */ + +import { Room, RoomEvent, Track, type DisconnectReason } from "livekit-client"; +import type { RealtimeObservability } from "./realtime-observability"; + +type Side = "publisher" | "subscriber"; + +// Loose typings for the parts of LiveKit's engine we touch. Everything is +// optional / `unknown` because these are private APIs that have moved +// across LiveKit versions; we degrade gracefully if a field is missing. +type EngineLike = { + pcManager?: PcManagerLike; +}; +type PcManagerLike = { + publisher?: PcTransportLike; + subscriber?: PcTransportLike; + getConnectedAddress?: (target?: unknown) => Promise; +}; +type PcTransportLike = { + pc?: RTCPeerConnection; + // some livekit builds expose it as `_pc` + _pc?: RTCPeerConnection; +}; +type RoomWithEngine = Room & { engine?: EngineLike }; + +type ConnectionInfo = { + effectiveType?: string; + downlinkMbps?: number; + rttMs?: number; + saveData?: boolean; + type?: string; + online?: boolean; +}; + +function summarizeCandidate(candidate: RTCIceCandidate | null): Record { + if (!candidate) return { eof: true }; + const c = candidate as RTCIceCandidate & { + address?: string; + relatedAddress?: string; + relatedPort?: number; + tcpType?: string; + networkType?: string; + url?: string; + }; + return { + candidate: c.candidate, + foundation: c.foundation, + component: c.component, + protocol: c.protocol, + address: c.address ?? null, + port: c.port, + priority: c.priority, + type: c.type, + tcpType: c.tcpType ?? null, + relatedAddress: c.relatedAddress ?? null, + relatedPort: c.relatedPort ?? null, + sdpMid: c.sdpMid, + sdpMLineIndex: c.sdpMLineIndex, + usernameFragment: c.usernameFragment, + networkType: c.networkType ?? null, + url: c.url ?? null, + }; +} + +function summarizeCandidateError(ev: RTCPeerConnectionIceErrorEvent): Record { + return { + address: ev.address ?? null, + port: ev.port ?? null, + url: ev.url ?? null, + errorCode: ev.errorCode, + errorText: ev.errorText, + hostCandidate: (ev as unknown as { hostCandidate?: string }).hostCandidate ?? null, + }; +} + +function snapshotConnection(): ConnectionInfo { + const info: ConnectionInfo = {}; + if (typeof navigator !== "undefined") { + info.online = navigator.onLine; + const conn = (navigator as Navigator & { connection?: Record }).connection; + if (conn) { + info.effectiveType = conn.effectiveType as string | undefined; + info.downlinkMbps = conn.downlink as number | undefined; + info.rttMs = conn.rtt as number | undefined; + info.saveData = conn.saveData as boolean | undefined; + info.type = conn.type as string | undefined; + } + } + return info; +} + +async function snapshotSelectedPair(pc: RTCPeerConnection): Promise | null> { + try { + const report = await pc.getStats(); + let pair: RTCIceCandidatePairStats | null = null; + const candidates = new Map(); + report.forEach((stat) => { + if (stat.type === "candidate-pair" && stat.state === "succeeded" && stat.nominated) { + pair = stat as RTCIceCandidatePairStats; + } + if (stat.type === "local-candidate" || stat.type === "remote-candidate") { + candidates.set(stat.id, stat as RTCIceCandidateStats); + } + }); + if (!pair) return null; + const local = candidates.get(pair.localCandidateId); + const remote = candidates.get(pair.remoteCandidateId); + return { + currentRoundTripTimeMs: pair.currentRoundTripTime != null ? pair.currentRoundTripTime * 1000 : null, + availableOutgoingBitrate: pair.availableOutgoingBitrate ?? null, + local: local + ? { + type: local.candidateType, + protocol: local.protocol, + address: (local as RTCIceCandidateStats & { address?: string }).address, + port: local.port, + networkType: (local as RTCIceCandidateStats & { networkType?: string }).networkType, + } + : null, + remote: remote + ? { + type: remote.candidateType, + protocol: remote.protocol, + address: (remote as RTCIceCandidateStats & { address?: string }).address, + port: remote.port, + } + : null, + }; + } catch { + return null; + } +} + +function getPc(transport: PcTransportLike | undefined): RTCPeerConnection | undefined { + if (!transport) return undefined; + return transport.pc ?? transport._pc; +} + +/** + * Attach low-level instrumentation to a connected LiveKit `Room`. Safe to + * call once per room. Returns a cleanup function that detaches all listeners. + */ +export function attachRoomInstrumentation(room: Room, observability: RealtimeObservability): () => void { + const emit = (name: string, data: Record = {}): void => { + observability.emitInstrumentationEvent(name, data); + }; + + // Initial browser network snapshot — gives a baseline to compare against. + emit("network-state", snapshotConnection()); + + // Browser network events. + const onOnline = () => emit("browser-online", { ...snapshotConnection() }); + const onOffline = () => emit("browser-offline", { ...snapshotConnection() }); + const onVisibility = () => + emit("page-visibility", { state: typeof document !== "undefined" ? document.visibilityState : null }); + const conn = + typeof navigator !== "undefined" ? (navigator as Navigator & { connection?: EventTarget }).connection : undefined; + const onConnChange = () => emit("network-change", snapshotConnection()); + if (typeof window !== "undefined") { + window.addEventListener("online", onOnline); + window.addEventListener("offline", onOffline); + } + if (typeof document !== "undefined") { + document.addEventListener("visibilitychange", onVisibility); + } + conn?.addEventListener?.("change", onConnChange); + + // LiveKit Room lifecycle events worth seeing in Datadog. + const onConnected = () => emit("room-connected", { name: room.name, sid: room.localParticipant?.sid }); + const onDisconnected = (reason?: DisconnectReason) => emit("room-disconnected", { reason }); + const onReconnecting = () => emit("room-reconnecting"); + const onSignalReconnecting = () => emit("room-signal-reconnecting"); + const onReconnected = () => emit("room-reconnected"); + const onConnectionStateChanged = (state: unknown) => emit("room-connection-state", { state }); + const onConnectionQualityChanged = (quality: unknown, participant: unknown) => + emit("connection-quality", { + quality, + participantIdentity: (participant as { identity?: string } | undefined)?.identity, + }); + const onMediaDevicesError = (e: Error) => emit("media-devices-error", { name: e.name, message: e.message }); + const onLocalTrackPublished = (pub: unknown) => { + const p = pub as { kind?: string; source?: string; trackSid?: string; mimeType?: string } | undefined; + emit("local-track-published", { kind: p?.kind, source: p?.source, trackSid: p?.trackSid, mimeType: p?.mimeType }); + }; + const onParticipantConnected = (participant: unknown) => + emit("participant-connected", { identity: (participant as { identity?: string } | undefined)?.identity }); + const onTrackSubscribed = (track: unknown, _pub: unknown, participant: unknown) => { + const t = track as { kind?: string; sid?: string } | undefined; + const p = participant as { identity?: string } | undefined; + emit("track-subscribed", { kind: t?.kind, trackSid: t?.sid, fromIdentity: p?.identity }); + }; + const onTrackMuted = (_pub: unknown, participant: unknown) => + emit("track-muted", { identity: (participant as { identity?: string } | undefined)?.identity }); + const onTrackUnmuted = (_pub: unknown, participant: unknown) => + emit("track-unmuted", { identity: (participant as { identity?: string } | undefined)?.identity }); + + room.on(RoomEvent.Connected, onConnected); + room.on(RoomEvent.Disconnected, onDisconnected); + room.on(RoomEvent.Reconnecting, onReconnecting); + room.on(RoomEvent.SignalReconnecting, onSignalReconnecting); + room.on(RoomEvent.Reconnected, onReconnected); + room.on(RoomEvent.ConnectionStateChanged, onConnectionStateChanged); + room.on(RoomEvent.ConnectionQualityChanged, onConnectionQualityChanged); + room.on(RoomEvent.MediaDevicesError, onMediaDevicesError); + room.on(RoomEvent.LocalTrackPublished, onLocalTrackPublished); + room.on(RoomEvent.ParticipantConnected, onParticipantConnected); + room.on(RoomEvent.TrackSubscribed, onTrackSubscribed); + room.on(RoomEvent.TrackMuted, onTrackMuted); + room.on(RoomEvent.TrackUnmuted, onTrackUnmuted); + + // Attach to the underlying RTCPeerConnections for ICE-level visibility. + // Done lazily on next tick — LiveKit creates the PC transports during + // `room.connect()`, and the engine may not be wired up yet at the + // moment we register Room events. Polling for a short window covers + // both early and late attach scenarios. + const pcCleanups: Array<() => void> = []; + const attachedSides = new Set(); + const attachPcEventsIfReady = (): boolean => { + const engine = (room as RoomWithEngine).engine; + const mgr = engine?.pcManager; + if (!mgr) return false; + for (const side of ["publisher", "subscriber"] as Side[]) { + if (attachedSides.has(side)) continue; + const pc = getPc(side === "publisher" ? mgr.publisher : mgr.subscriber); + if (!pc) continue; + attachedSides.add(side); + pcCleanups.push(attachPeerConnectionInstrumentation(pc, side, emit, mgr)); + } + return attachedSides.size === 2; + }; + // Try immediately, then poll briefly for the late case. + if (!attachPcEventsIfReady()) { + let attempts = 0; + const poll = setInterval(() => { + attempts++; + if (attachPcEventsIfReady() || attempts > 50) { + clearInterval(poll); + } + }, 100); + pcCleanups.push(() => clearInterval(poll)); + } + + return () => { + try { + room.off(RoomEvent.Connected, onConnected); + room.off(RoomEvent.Disconnected, onDisconnected); + room.off(RoomEvent.Reconnecting, onReconnecting); + room.off(RoomEvent.SignalReconnecting, onSignalReconnecting); + room.off(RoomEvent.Reconnected, onReconnected); + room.off(RoomEvent.ConnectionStateChanged, onConnectionStateChanged); + room.off(RoomEvent.ConnectionQualityChanged, onConnectionQualityChanged); + room.off(RoomEvent.MediaDevicesError, onMediaDevicesError); + room.off(RoomEvent.LocalTrackPublished, onLocalTrackPublished); + room.off(RoomEvent.ParticipantConnected, onParticipantConnected); + room.off(RoomEvent.TrackSubscribed, onTrackSubscribed); + room.off(RoomEvent.TrackMuted, onTrackMuted); + room.off(RoomEvent.TrackUnmuted, onTrackUnmuted); + } catch { + // ignore detach errors during teardown + } + if (typeof window !== "undefined") { + window.removeEventListener("online", onOnline); + window.removeEventListener("offline", onOffline); + } + if (typeof document !== "undefined") { + document.removeEventListener("visibilitychange", onVisibility); + } + conn?.removeEventListener?.("change", onConnChange); + for (const fn of pcCleanups) { + try { + fn(); + } catch { + // ignore + } + } + }; +} + +function attachPeerConnectionInstrumentation( + pc: RTCPeerConnection, + side: Side, + emit: (name: string, data: Record) => void, + pcManager?: PcManagerLike, +): () => void { + emit("pc-attached", { + side, + iceConnectionState: pc.iceConnectionState, + connectionState: pc.connectionState, + iceGatheringState: pc.iceGatheringState, + signalingState: pc.signalingState, + }); + + const onIceCandidate = (ev: RTCPeerConnectionIceEvent) => { + emit("ice-candidate", { side, ...summarizeCandidate(ev.candidate) }); + }; + const onIceCandidateError = (ev: Event) => { + emit("ice-candidate-error", { side, ...summarizeCandidateError(ev as RTCPeerConnectionIceErrorEvent) }); + }; + const onIceConnectionStateChange = async () => { + emit("ice-connection-state", { side, state: pc.iceConnectionState }); + // Snapshot the winning candidate pair when ICE settles (connected/completed) + // or capture stats at the moment of failure for forensics. + if (pc.iceConnectionState === "connected" || pc.iceConnectionState === "completed") { + const pair = await snapshotSelectedPair(pc); + if (pair) emit("selected-candidate-pair", { side, ...pair }); + if (pcManager?.getConnectedAddress) { + try { + const addr = await pcManager.getConnectedAddress(); + if (addr) emit("connected-address", { side, address: addr }); + } catch { + // ignore + } + } + } + }; + const onConnectionStateChange = () => emit("pc-connection-state", { side, state: pc.connectionState }); + const onIceGatheringStateChange = () => emit("ice-gathering-state", { side, state: pc.iceGatheringState }); + const onSignalingStateChange = () => emit("signaling-state", { side, state: pc.signalingState }); + const onNegotiationNeeded = () => emit("negotiation-needed", { side }); + const onDataChannel = (ev: RTCDataChannelEvent) => + emit("data-channel-opened", { + side, + label: ev.channel.label, + ordered: ev.channel.ordered, + protocol: ev.channel.protocol, + }); + const onTrack = (ev: RTCTrackEvent) => + emit("track-received", { side, kind: ev.track.kind, id: ev.track.id, label: ev.track.label }); + + pc.addEventListener("icecandidate", onIceCandidate); + pc.addEventListener("icecandidateerror", onIceCandidateError); + pc.addEventListener("iceconnectionstatechange", onIceConnectionStateChange); + pc.addEventListener("connectionstatechange", onConnectionStateChange); + pc.addEventListener("icegatheringstatechange", onIceGatheringStateChange); + pc.addEventListener("signalingstatechange", onSignalingStateChange); + pc.addEventListener("negotiationneeded", onNegotiationNeeded); + pc.addEventListener("datachannel", onDataChannel); + pc.addEventListener("track", onTrack); + + return () => { + pc.removeEventListener("icecandidate", onIceCandidate); + pc.removeEventListener("icecandidateerror", onIceCandidateError); + pc.removeEventListener("iceconnectionstatechange", onIceConnectionStateChange); + pc.removeEventListener("connectionstatechange", onConnectionStateChange); + pc.removeEventListener("icegatheringstatechange", onIceGatheringStateChange); + pc.removeEventListener("signalingstatechange", onSignalingStateChange); + pc.removeEventListener("negotiationneeded", onNegotiationNeeded); + pc.removeEventListener("datachannel", onDataChannel); + pc.removeEventListener("track", onTrack); + }; +} + +// Re-export for convenience so consumers know what's available. +export { Track }; diff --git a/packages/sdk/src/realtime/observability/realtime-observability.ts b/packages/sdk/src/realtime/observability/realtime-observability.ts index a55ef6a..ae6dc54 100644 --- a/packages/sdk/src/realtime/observability/realtime-observability.ts +++ b/packages/sdk/src/realtime/observability/realtime-observability.ts @@ -66,6 +66,21 @@ export class RealtimeObservability { this.observabilityForwarder = fn; } + /** + * Emit a raw instrumentation event over the observability sink. Used by + * `network-instrumentation.ts` to forward per-ICE-candidate / signaling / + * peer-connection-state events that don't fit the strict `DiagnosticEvents` + * type union. Best-effort; silently dropped if the sink isn't attached. + */ + emitInstrumentationEvent(name: string, data: unknown): void { + this.observabilityForwarder?.({ + kind: "instrumentation", + name, + data, + timestamp: Date.now(), + }); + } + diagnostic(name: K, data: DiagnosticEvents[K], timestamp: number = Date.now()): void { this.options.logger.debug(name, data as Record); this.options.onDiagnostic?.({ name, data } as DiagnosticEvent); diff --git a/packages/sdk/src/realtime/signaling-channel.ts b/packages/sdk/src/realtime/signaling-channel.ts index 449667f..7d4a4dd 100644 --- a/packages/sdk/src/realtime/signaling-channel.ts +++ b/packages/sdk/src/realtime/signaling-channel.ts @@ -355,10 +355,19 @@ export class SignalingChannel { private writeMessage(message: OutgoingRealtimeMessage): boolean { if (this.ws?.readyState !== WebSocket.OPEN) return false; this.ws.send(JSON.stringify(message)); + // Best-effort signaling-trace for non-self-referential messages + // (don't trace our own observability frames or we recurse). + if (message.type !== "observability") { + this.config.observability?.emitInstrumentationEvent("signaling-sent", { + type: message.type, + size: 0, + }); + } return true; } private handleMessage(msg: IncomingRealtimeMessage): void { + this.config.observability?.emitInstrumentationEvent("signaling-received", { type: msg.type }); for (const ack of [...this.pendingAcks]) { if (ack.matches(msg)) { ack.onMatch(msg); From 451808727cb694707704dcbc2c706b46931380d7 Mon Sep 17 00:00:00 2001 From: nagar-decart Date: Fri, 29 May 2026 16:27:28 +0300 Subject: [PATCH 4/6] realtime(observability): trim WS-forwarded events to ICE/connection-debug signal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Earlier instrumentation pass dumped everything over the WS — periodic WebRTC stats (every 1s), every signaling frame, every signalingstatechange, every track/visibility/connection-quality event. In a 15-min Datadog window with 3 successful sessions, stats alone produced 141 of 250 events (56%) while actual ICE-level data was ~7%. That signal-to-noise ratio makes the stream useless for debugging connection failures. Forward only what helps diagnose WHY a connection fails or stalls: DROPPED (noise during a healthy session, not useful for failure debug): - Periodic WebRTC stats (still local via onStats + platform telemetry) - signaling-sent for every outbound frame - signaling-received for routine generation_tick / prompt_ack / set_image_ack - signaling-state on PC (SDP renegotiation chatter) - negotiation-needed, track-received, data-channel-opened on PC - track-subscribed/-muted/-unmuted, local-track-published, participant-connected, connection-quality on Room - page-visibility KEPT (the ICE / connection debug picture): - ice-candidate, ice-candidate-past, ice-candidate-error - ice-connection-state, ice-gathering-state, pc-connection-state - pc-attached (with snapshot of current state) - selected-candidate-pair (with local/remote addresses + RTT) - connected-address (LiveKit's getter for the chosen address) - room-connected/-disconnected/-reconnecting/-signal-reconnecting/-reconnected - room-connection-state - media-devices-error - network-state (initial + on change), browser-online/-offline - signaling-received for livekit_room_info / session_id (the only signaling messages that matter for connection setup) - client-session-connection-breakdown (the breakdown diagnostic) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../observability/network-instrumentation.ts | 134 ++++++++++-------- .../observability/realtime-observability.ts | 6 +- .../sdk/src/realtime/signaling-channel.ts | 15 +- 3 files changed, 84 insertions(+), 71 deletions(-) diff --git a/packages/sdk/src/realtime/observability/network-instrumentation.ts b/packages/sdk/src/realtime/observability/network-instrumentation.ts index 0a002fe..7cbf8ef 100644 --- a/packages/sdk/src/realtime/observability/network-instrumentation.ts +++ b/packages/sdk/src/realtime/observability/network-instrumentation.ts @@ -152,6 +152,46 @@ function getPc(transport: PcTransportLike | undefined): RTCPeerConnection | unde return transport.pc ?? transport._pc; } +/** + * Walk `pc.getStats()` once and emit one synthetic `ice-candidate-past` + * event per local-candidate (and `remote-candidate-past` per remote one) + * already known to the PC. Covers the very common case where ICE + * gathering finished before our `icecandidate` listener was attached. + */ +async function snapshotPastCandidates( + pc: RTCPeerConnection, + side: Side, + emit: (name: string, data: Record) => void, +): Promise { + try { + const report = await pc.getStats(); + report.forEach((stat) => { + if (stat.type === "local-candidate" || stat.type === "remote-candidate") { + const c = stat as RTCIceCandidateStats & { + address?: string; + networkType?: string; + relayProtocol?: string; + url?: string; + }; + emit("ice-candidate-past", { + side, + source: stat.type, // local-candidate | remote-candidate + candidateType: c.candidateType, + protocol: c.protocol, + address: c.address ?? null, + port: c.port, + priority: c.priority, + networkType: c.networkType ?? null, + relayProtocol: c.relayProtocol ?? null, + url: c.url ?? null, + }); + } + }); + } catch { + // ignore + } +} + /** * Attach low-level instrumentation to a connected LiveKit `Room`. Safe to * call once per room. Returns a cleanup function that detaches all listeners. @@ -164,11 +204,11 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs // Initial browser network snapshot — gives a baseline to compare against. emit("network-state", snapshotConnection()); - // Browser network events. + // Browser network events relevant to ICE failure debugging. Page + // visibility transitions are intentionally not forwarded — they're + // noise for connection diagnostics. const onOnline = () => emit("browser-online", { ...snapshotConnection() }); const onOffline = () => emit("browser-offline", { ...snapshotConnection() }); - const onVisibility = () => - emit("page-visibility", { state: typeof document !== "undefined" ? document.visibilityState : null }); const conn = typeof navigator !== "undefined" ? (navigator as Navigator & { connection?: EventTarget }).connection : undefined; const onConnChange = () => emit("network-change", snapshotConnection()); @@ -176,39 +216,21 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs window.addEventListener("online", onOnline); window.addEventListener("offline", onOffline); } - if (typeof document !== "undefined") { - document.addEventListener("visibilitychange", onVisibility); - } conn?.addEventListener?.("change", onConnChange); - // LiveKit Room lifecycle events worth seeing in Datadog. + // LiveKit Room lifecycle events that matter for connection debugging. + // Steady-state events (connection-quality, track-subscribed/muted/unmuted, + // local-track-published, participant-connected, page-visibility) are + // intentionally not forwarded — they're noise once a session is up and + // running, and the goal of this stream is to debug WHY connections fail + // or take too long, not narrate a healthy session. const onConnected = () => emit("room-connected", { name: room.name, sid: room.localParticipant?.sid }); const onDisconnected = (reason?: DisconnectReason) => emit("room-disconnected", { reason }); const onReconnecting = () => emit("room-reconnecting"); const onSignalReconnecting = () => emit("room-signal-reconnecting"); const onReconnected = () => emit("room-reconnected"); const onConnectionStateChanged = (state: unknown) => emit("room-connection-state", { state }); - const onConnectionQualityChanged = (quality: unknown, participant: unknown) => - emit("connection-quality", { - quality, - participantIdentity: (participant as { identity?: string } | undefined)?.identity, - }); const onMediaDevicesError = (e: Error) => emit("media-devices-error", { name: e.name, message: e.message }); - const onLocalTrackPublished = (pub: unknown) => { - const p = pub as { kind?: string; source?: string; trackSid?: string; mimeType?: string } | undefined; - emit("local-track-published", { kind: p?.kind, source: p?.source, trackSid: p?.trackSid, mimeType: p?.mimeType }); - }; - const onParticipantConnected = (participant: unknown) => - emit("participant-connected", { identity: (participant as { identity?: string } | undefined)?.identity }); - const onTrackSubscribed = (track: unknown, _pub: unknown, participant: unknown) => { - const t = track as { kind?: string; sid?: string } | undefined; - const p = participant as { identity?: string } | undefined; - emit("track-subscribed", { kind: t?.kind, trackSid: t?.sid, fromIdentity: p?.identity }); - }; - const onTrackMuted = (_pub: unknown, participant: unknown) => - emit("track-muted", { identity: (participant as { identity?: string } | undefined)?.identity }); - const onTrackUnmuted = (_pub: unknown, participant: unknown) => - emit("track-unmuted", { identity: (participant as { identity?: string } | undefined)?.identity }); room.on(RoomEvent.Connected, onConnected); room.on(RoomEvent.Disconnected, onDisconnected); @@ -216,13 +238,7 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs room.on(RoomEvent.SignalReconnecting, onSignalReconnecting); room.on(RoomEvent.Reconnected, onReconnected); room.on(RoomEvent.ConnectionStateChanged, onConnectionStateChanged); - room.on(RoomEvent.ConnectionQualityChanged, onConnectionQualityChanged); room.on(RoomEvent.MediaDevicesError, onMediaDevicesError); - room.on(RoomEvent.LocalTrackPublished, onLocalTrackPublished); - room.on(RoomEvent.ParticipantConnected, onParticipantConnected); - room.on(RoomEvent.TrackSubscribed, onTrackSubscribed); - room.on(RoomEvent.TrackMuted, onTrackMuted); - room.on(RoomEvent.TrackUnmuted, onTrackUnmuted); // Attach to the underlying RTCPeerConnections for ICE-level visibility. // Done lazily on next tick — LiveKit creates the PC transports during @@ -264,13 +280,7 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs room.off(RoomEvent.SignalReconnecting, onSignalReconnecting); room.off(RoomEvent.Reconnected, onReconnected); room.off(RoomEvent.ConnectionStateChanged, onConnectionStateChanged); - room.off(RoomEvent.ConnectionQualityChanged, onConnectionQualityChanged); room.off(RoomEvent.MediaDevicesError, onMediaDevicesError); - room.off(RoomEvent.LocalTrackPublished, onLocalTrackPublished); - room.off(RoomEvent.ParticipantConnected, onParticipantConnected); - room.off(RoomEvent.TrackSubscribed, onTrackSubscribed); - room.off(RoomEvent.TrackMuted, onTrackMuted); - room.off(RoomEvent.TrackUnmuted, onTrackUnmuted); } catch { // ignore detach errors during teardown } @@ -278,9 +288,6 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs window.removeEventListener("online", onOnline); window.removeEventListener("offline", onOffline); } - if (typeof document !== "undefined") { - document.removeEventListener("visibilitychange", onVisibility); - } conn?.removeEventListener?.("change", onConnChange); for (const fn of pcCleanups) { try { @@ -306,6 +313,27 @@ function attachPeerConnectionInstrumentation( signalingState: pc.signalingState, }); + // The PC may already have gathered all its candidates by the time we + // attach. addEventListener('icecandidate', ...) only catches FUTURE + // events, so we walk getStats() once to surface what was already + // produced. This is the data we'd care about most for an ICE-failure + // post-mortem (srflx address, candidate types, the winning pair). + void snapshotPastCandidates(pc, side, emit); + if (pc.iceConnectionState === "connected" || pc.iceConnectionState === "completed") { + void (async () => { + const pair = await snapshotSelectedPair(pc); + if (pair) emit("selected-candidate-pair", { side, ...pair, snapshot: true }); + if (pcManager?.getConnectedAddress) { + try { + const addr = await pcManager.getConnectedAddress(); + if (addr) emit("connected-address", { side, address: addr, snapshot: true }); + } catch { + // ignore + } + } + })(); + } + const onIceCandidate = (ev: RTCPeerConnectionIceEvent) => { emit("ice-candidate", { side, ...summarizeCandidate(ev.candidate) }); }; @@ -331,27 +359,15 @@ function attachPeerConnectionInstrumentation( }; const onConnectionStateChange = () => emit("pc-connection-state", { side, state: pc.connectionState }); const onIceGatheringStateChange = () => emit("ice-gathering-state", { side, state: pc.iceGatheringState }); - const onSignalingStateChange = () => emit("signaling-state", { side, state: pc.signalingState }); - const onNegotiationNeeded = () => emit("negotiation-needed", { side }); - const onDataChannel = (ev: RTCDataChannelEvent) => - emit("data-channel-opened", { - side, - label: ev.channel.label, - ordered: ev.channel.ordered, - protocol: ev.channel.protocol, - }); - const onTrack = (ev: RTCTrackEvent) => - emit("track-received", { side, kind: ev.track.kind, id: ev.track.id, label: ev.track.label }); + // signalingstatechange + negotiationneeded + track + datachannel are + // intentionally not hooked — they fire on every SDP renegotiation cycle + // (each prompt / set_image triggers one) and bury the ICE-level signal. pc.addEventListener("icecandidate", onIceCandidate); pc.addEventListener("icecandidateerror", onIceCandidateError); pc.addEventListener("iceconnectionstatechange", onIceConnectionStateChange); pc.addEventListener("connectionstatechange", onConnectionStateChange); pc.addEventListener("icegatheringstatechange", onIceGatheringStateChange); - pc.addEventListener("signalingstatechange", onSignalingStateChange); - pc.addEventListener("negotiationneeded", onNegotiationNeeded); - pc.addEventListener("datachannel", onDataChannel); - pc.addEventListener("track", onTrack); return () => { pc.removeEventListener("icecandidate", onIceCandidate); @@ -359,10 +375,6 @@ function attachPeerConnectionInstrumentation( pc.removeEventListener("iceconnectionstatechange", onIceConnectionStateChange); pc.removeEventListener("connectionstatechange", onConnectionStateChange); pc.removeEventListener("icegatheringstatechange", onIceGatheringStateChange); - pc.removeEventListener("signalingstatechange", onSignalingStateChange); - pc.removeEventListener("negotiationneeded", onNegotiationNeeded); - pc.removeEventListener("datachannel", onDataChannel); - pc.removeEventListener("track", onTrack); }; } diff --git a/packages/sdk/src/realtime/observability/realtime-observability.ts b/packages/sdk/src/realtime/observability/realtime-observability.ts index ae6dc54..2d10d2c 100644 --- a/packages/sdk/src/realtime/observability/realtime-observability.ts +++ b/packages/sdk/src/realtime/observability/realtime-observability.ts @@ -232,7 +232,11 @@ export class RealtimeObservability { this.options.onStats?.(stats); this.telemetryReporter.addStats(stats); this.detectVideoStall(stats); - this.observabilityForwarder?.({ kind: "stats", stats }); + // Stats intentionally not forwarded over the WS. They fire every 1s + // and would dominate Datadog's signal-to-noise ratio while not + // helping diagnose connection-establishment failures (which is what + // the WS-forwarded observability stream is for). They still flow to + // the local `onStats` callback and the platform telemetry POST. } private detectVideoStall(stats: WebRTCStats): void { diff --git a/packages/sdk/src/realtime/signaling-channel.ts b/packages/sdk/src/realtime/signaling-channel.ts index 7d4a4dd..a0a19df 100644 --- a/packages/sdk/src/realtime/signaling-channel.ts +++ b/packages/sdk/src/realtime/signaling-channel.ts @@ -355,19 +355,16 @@ export class SignalingChannel { private writeMessage(message: OutgoingRealtimeMessage): boolean { if (this.ws?.readyState !== WebSocket.OPEN) return false; this.ws.send(JSON.stringify(message)); - // Best-effort signaling-trace for non-self-referential messages - // (don't trace our own observability frames or we recurse). - if (message.type !== "observability") { - this.config.observability?.emitInstrumentationEvent("signaling-sent", { - type: message.type, - size: 0, - }); - } return true; } private handleMessage(msg: IncomingRealtimeMessage): void { - this.config.observability?.emitInstrumentationEvent("signaling-received", { type: msg.type }); + // Only trace connection-establishment-relevant signaling messages. + // Routine generation_tick / prompt_ack / set_image_ack chatter is + // suppressed to keep Datadog's signal-to-noise ratio usable. + if (msg.type === "livekit_room_info" || msg.type === "session_id") { + this.config.observability?.emitInstrumentationEvent("signaling-received", { type: msg.type }); + } for (const ack of [...this.pendingAcks]) { if (ack.matches(msg)) { ack.onMatch(msg); From 7ac0c7da3a46aae8a67c5439186f4cfbb1bf5f95 Mon Sep 17 00:00:00 2001 From: nagar-decart Date: Fri, 29 May 2026 16:40:32 +0300 Subject: [PATCH 5/6] realtime(observability): tighten event shapes per audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pc-attached now carries the PC's iceServers config (URLs only; credentials redacted to presence-bools) + iceTransportPolicy. Directly answers "did the JoinResponse give us STUN/TURN at all?", which was the core unknown in the 29CM ICE-failure investigation. - room-disconnected now decodes the numeric DisconnectReason enum into a readable string (signal_close, connection_timeout, media_failure, etc.). Mirrored in the test mock. - livekit_room_info signaling event now carries livekitUrl / roomName / sessionId — tells us which SFU node owns the room without needing to grep server-side logs. - Dropped room-connection-state (duplicated room-connected / -disconnected / -reconnecting). - Dropped connected-address (redundant with selected-candidate-pair.remote.address). - ice-candidate trimmed from 15 fields to 12: drop raw SDP `candidate` string, sdpMid, sdpMLineIndex, usernameFragment. Keep type, protocol, address, port, priority, foundation, component, tcpType, relatedAddress/Port, networkType, url. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../observability/network-instrumentation.ts | 87 ++++++++++++------- .../sdk/src/realtime/signaling-channel.ts | 12 ++- packages/sdk/tests/realtime.unit.test.ts | 24 ++++- 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/packages/sdk/src/realtime/observability/network-instrumentation.ts b/packages/sdk/src/realtime/observability/network-instrumentation.ts index 7cbf8ef..e26b21c 100644 --- a/packages/sdk/src/realtime/observability/network-instrumentation.ts +++ b/packages/sdk/src/realtime/observability/network-instrumentation.ts @@ -16,9 +16,33 @@ * we never displace LiveKit's own handlers. */ -import { Room, RoomEvent, Track, type DisconnectReason } from "livekit-client"; +import { DisconnectReason, Room, RoomEvent, Track } from "livekit-client"; import type { RealtimeObservability } from "./realtime-observability"; +const DISCONNECT_REASON_NAMES: Record = { + [DisconnectReason.UNKNOWN_REASON]: "unknown", + [DisconnectReason.CLIENT_INITIATED]: "client_initiated", + [DisconnectReason.DUPLICATE_IDENTITY]: "duplicate_identity", + [DisconnectReason.SERVER_SHUTDOWN]: "server_shutdown", + [DisconnectReason.PARTICIPANT_REMOVED]: "participant_removed", + [DisconnectReason.ROOM_DELETED]: "room_deleted", + [DisconnectReason.STATE_MISMATCH]: "state_mismatch", + [DisconnectReason.JOIN_FAILURE]: "join_failure", + [DisconnectReason.MIGRATION]: "migration", + [DisconnectReason.SIGNAL_CLOSE]: "signal_close", + [DisconnectReason.ROOM_CLOSED]: "room_closed", + [DisconnectReason.USER_UNAVAILABLE]: "user_unavailable", + [DisconnectReason.USER_REJECTED]: "user_rejected", + [DisconnectReason.SIP_TRUNK_FAILURE]: "sip_trunk_failure", + [DisconnectReason.CONNECTION_TIMEOUT]: "connection_timeout", + [DisconnectReason.MEDIA_FAILURE]: "media_failure", +}; + +function disconnectReasonString(reason: number | undefined): string | undefined { + if (reason === undefined) return undefined; + return DISCONNECT_REASON_NAMES[reason] ?? `unknown(${reason})`; +} + type Side = "publisher" | "subscriber"; // Loose typings for the parts of LiveKit's engine we touch. Everything is @@ -50,6 +74,10 @@ type ConnectionInfo = { function summarizeCandidate(candidate: RTCIceCandidate | null): Record { if (!candidate) return { eof: true }; + // Keep only the fields useful for ICE debugging. Dropped vs. raw RTCIceCandidate: + // - candidate (raw SDP string — redundant with type/address/port/protocol) + // - sdpMid / sdpMLineIndex (mux indexing, not network-debug) + // - usernameFragment (ICE ufrag — auth, not network-debug) const c = candidate as RTCIceCandidate & { address?: string; relatedAddress?: string; @@ -59,20 +87,16 @@ function summarizeCandidate(candidate: RTCIceCandidate | null): Record emit("room-connected", { name: room.name, sid: room.localParticipant?.sid }); - const onDisconnected = (reason?: DisconnectReason) => emit("room-disconnected", { reason }); + const onDisconnected = (reason?: DisconnectReason) => + emit("room-disconnected", { reason, reasonName: disconnectReasonString(reason) }); const onReconnecting = () => emit("room-reconnecting"); const onSignalReconnecting = () => emit("room-signal-reconnecting"); const onReconnected = () => emit("room-reconnected"); - const onConnectionStateChanged = (state: unknown) => emit("room-connection-state", { state }); const onMediaDevicesError = (e: Error) => emit("media-devices-error", { name: e.name, message: e.message }); room.on(RoomEvent.Connected, onConnected); @@ -237,8 +261,9 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs room.on(RoomEvent.Reconnecting, onReconnecting); room.on(RoomEvent.SignalReconnecting, onSignalReconnecting); room.on(RoomEvent.Reconnected, onReconnected); - room.on(RoomEvent.ConnectionStateChanged, onConnectionStateChanged); room.on(RoomEvent.MediaDevicesError, onMediaDevicesError); + // RoomEvent.ConnectionStateChanged is intentionally not hooked — its + // states duplicate room-connected / -reconnecting / -disconnected. // Attach to the underlying RTCPeerConnections for ICE-level visibility. // Done lazily on next tick — LiveKit creates the PC transports during @@ -279,7 +304,6 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs room.off(RoomEvent.Reconnecting, onReconnecting); room.off(RoomEvent.SignalReconnecting, onSignalReconnecting); room.off(RoomEvent.Reconnected, onReconnected); - room.off(RoomEvent.ConnectionStateChanged, onConnectionStateChanged); room.off(RoomEvent.MediaDevicesError, onMediaDevicesError); } catch { // ignore detach errors during teardown @@ -299,18 +323,36 @@ export function attachRoomInstrumentation(room: Room, observability: RealtimeObs }; } +function summarizeIceServers(pc: RTCPeerConnection): Array> { + try { + const cfg = pc.getConfiguration(); + return (cfg.iceServers ?? []).map((s) => ({ + urls: Array.isArray(s.urls) ? s.urls : [s.urls], + hasUsername: !!s.username, + hasCredential: !!s.credential, + })); + } catch { + return []; + } +} + function attachPeerConnectionInstrumentation( pc: RTCPeerConnection, side: Side, emit: (name: string, data: Record) => void, - pcManager?: PcManagerLike, + _pcManager?: PcManagerLike, ): () => void { + // pc-attached carries the PC's iceServers config — directly answers + // "did the SDK get STUN/TURN URLs from the JoinResponse?". Credentials + // are redacted (we only log whether they were present). emit("pc-attached", { side, iceConnectionState: pc.iceConnectionState, connectionState: pc.connectionState, iceGatheringState: pc.iceGatheringState, signalingState: pc.signalingState, + iceServers: summarizeIceServers(pc), + iceTransportPolicy: pc.getConfiguration().iceTransportPolicy ?? null, }); // The PC may already have gathered all its candidates by the time we @@ -323,14 +365,6 @@ function attachPeerConnectionInstrumentation( void (async () => { const pair = await snapshotSelectedPair(pc); if (pair) emit("selected-candidate-pair", { side, ...pair, snapshot: true }); - if (pcManager?.getConnectedAddress) { - try { - const addr = await pcManager.getConnectedAddress(); - if (addr) emit("connected-address", { side, address: addr, snapshot: true }); - } catch { - // ignore - } - } })(); } @@ -342,19 +376,10 @@ function attachPeerConnectionInstrumentation( }; const onIceConnectionStateChange = async () => { emit("ice-connection-state", { side, state: pc.iceConnectionState }); - // Snapshot the winning candidate pair when ICE settles (connected/completed) - // or capture stats at the moment of failure for forensics. + // Snapshot the winning candidate pair when ICE settles. if (pc.iceConnectionState === "connected" || pc.iceConnectionState === "completed") { const pair = await snapshotSelectedPair(pc); if (pair) emit("selected-candidate-pair", { side, ...pair }); - if (pcManager?.getConnectedAddress) { - try { - const addr = await pcManager.getConnectedAddress(); - if (addr) emit("connected-address", { side, address: addr }); - } catch { - // ignore - } - } } }; const onConnectionStateChange = () => emit("pc-connection-state", { side, state: pc.connectionState }); diff --git a/packages/sdk/src/realtime/signaling-channel.ts b/packages/sdk/src/realtime/signaling-channel.ts index a0a19df..c8daac4 100644 --- a/packages/sdk/src/realtime/signaling-channel.ts +++ b/packages/sdk/src/realtime/signaling-channel.ts @@ -362,7 +362,17 @@ export class SignalingChannel { // Only trace connection-establishment-relevant signaling messages. // Routine generation_tick / prompt_ack / set_image_ack chatter is // suppressed to keep Datadog's signal-to-noise ratio usable. - if (msg.type === "livekit_room_info" || msg.type === "session_id") { + if (msg.type === "livekit_room_info") { + // Capture which LiveKit SFU node the bouncer steered this session + // to — useful for "which node was bad?" debugging. Token is not + // logged (auth material). + this.config.observability?.emitInstrumentationEvent("signaling-received", { + type: msg.type, + livekitUrl: msg.livekit_url, + roomName: msg.room_name, + sessionId: msg.session_id, + }); + } else if (msg.type === "session_id") { this.config.observability?.emitInstrumentationEvent("signaling-received", { type: msg.type }); } for (const ack of [...this.pendingAcks]) { diff --git a/packages/sdk/tests/realtime.unit.test.ts b/packages/sdk/tests/realtime.unit.test.ts index 270d1f4..b951fb8 100644 --- a/packages/sdk/tests/realtime.unit.test.ts +++ b/packages/sdk/tests/realtime.unit.test.ts @@ -23,6 +23,27 @@ const liveKitMock = vi.hoisted(() => { SignalReconnecting: "signalReconnecting", Disconnected: "disconnected", } as const; + // Mirrors livekit-client's DisconnectReason enum well enough for our + // network-instrumentation lookup table. Real numeric values from the + // protocol; safe to extend without breaking tests. + const DisconnectReason = { + UNKNOWN_REASON: 0, + CLIENT_INITIATED: 1, + DUPLICATE_IDENTITY: 2, + SERVER_SHUTDOWN: 3, + PARTICIPANT_REMOVED: 4, + ROOM_DELETED: 5, + STATE_MISMATCH: 6, + JOIN_FAILURE: 7, + MIGRATION: 8, + SIGNAL_CLOSE: 9, + ROOM_CLOSED: 10, + USER_UNAVAILABLE: 11, + USER_REJECTED: 12, + SIP_TRUNK_FAILURE: 13, + CONNECTION_TIMEOUT: 14, + MEDIA_FAILURE: 15, + } as const; class MockRoom { handlers = new Map void>>(); @@ -49,7 +70,7 @@ const liveKitMock = vi.hoisted(() => { } } - return { roomInstances, RoomEvent, Track, TrackEvent, ConnectionState, MockRoom }; + return { roomInstances, RoomEvent, Track, TrackEvent, ConnectionState, DisconnectReason, MockRoom }; }); vi.mock("livekit-client", () => ({ @@ -58,6 +79,7 @@ vi.mock("livekit-client", () => ({ Track: liveKitMock.Track, TrackEvent: liveKitMock.TrackEvent, ConnectionState: liveKitMock.ConnectionState, + DisconnectReason: liveKitMock.DisconnectReason, })); class FakeMediaStream { From deeaca726577f12c7c83f23d5fa2d80ff512fac8 Mon Sep 17 00:00:00 2001 From: nagar-decart Date: Fri, 29 May 2026 16:49:34 +0300 Subject: [PATCH 6/6] realtime(observability): replace DOM-lib types with structural shapes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TS lib used in CI typecheck (different from the DOM types vitest uses locally) doesn't expose RTCIceCandidateStats. The RTCIceCandidatePairStats reference also tripped because the same lib exposes it under a slightly different shape — narrowing failed and field accesses landed on 'never'. Replaced with two minimal structural types (IceCandidateStat, IceCandidatePairStat) covering only the fields we actually read from getStats(). Functional behavior is unchanged. Also cast the session_id signaling case through `unknown` — it's sent by the bouncer but not in the typed IncomingRealtimeMessage union, so TS narrowed the comparison to never. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../observability/network-instrumentation.ts | 70 +++++++++++++------ .../sdk/src/realtime/signaling-channel.ts | 6 +- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/packages/sdk/src/realtime/observability/network-instrumentation.ts b/packages/sdk/src/realtime/observability/network-instrumentation.ts index e26b21c..7ec90a1 100644 --- a/packages/sdk/src/realtime/observability/network-instrumentation.ts +++ b/packages/sdk/src/realtime/observability/network-instrumentation.ts @@ -72,6 +72,21 @@ type ConnectionInfo = { online?: boolean; }; +// Structural shape of an RTCIceCandidateStats entry. The DOM type isn't +// in every TS lib build, so we redeclare what we read. +type IceCandidateStat = { + id: string; + type: "local-candidate" | "remote-candidate"; + candidateType?: string; + protocol?: string; + port?: number; + priority?: number; + address?: string; + networkType?: string; + relayProtocol?: string; + url?: string; +}; + function summarizeCandidate(candidate: RTCIceCandidate | null): Record { if (!candidate) return { eof: true }; // Keep only the fields useful for ICE debugging. Dropped vs. raw RTCIceCandidate: @@ -129,39 +144,56 @@ function snapshotConnection(): ConnectionInfo { return info; } +// Structural shape of an RTCIceCandidatePairStats entry — same reason +// as IceCandidateStat above (some TS lib builds don't expose this). +type IceCandidatePairStat = { + type: "candidate-pair"; + state: string; + nominated?: boolean; + localCandidateId?: string; + remoteCandidateId?: string; + currentRoundTripTime?: number; + availableOutgoingBitrate?: number; +}; + async function snapshotSelectedPair(pc: RTCPeerConnection): Promise | null> { try { const report = await pc.getStats(); - let pair: RTCIceCandidatePairStats | null = null; - const candidates = new Map(); + let pair: IceCandidatePairStat | null = null; + const candidates = new Map(); report.forEach((stat) => { - if (stat.type === "candidate-pair" && stat.state === "succeeded" && stat.nominated) { - pair = stat as RTCIceCandidatePairStats; + const s = stat as unknown as { type: string; state?: string; nominated?: boolean; id: string }; + if (s.type === "candidate-pair" && s.state === "succeeded" && s.nominated) { + pair = stat as unknown as IceCandidatePairStat; } - if (stat.type === "local-candidate" || stat.type === "remote-candidate") { - candidates.set(stat.id, stat as RTCIceCandidateStats); + if (s.type === "local-candidate" || s.type === "remote-candidate") { + candidates.set(s.id, stat as unknown as IceCandidateStat); } }); if (!pair) return null; - const local = candidates.get(pair.localCandidateId); - const remote = candidates.get(pair.remoteCandidateId); + const localId = (pair as IceCandidatePairStat).localCandidateId; + const remoteId = (pair as IceCandidatePairStat).remoteCandidateId; + const local = localId ? candidates.get(localId) : undefined; + const remote = remoteId ? candidates.get(remoteId) : undefined; + const rtt = (pair as IceCandidatePairStat).currentRoundTripTime; + const aob = (pair as IceCandidatePairStat).availableOutgoingBitrate; return { - currentRoundTripTimeMs: pair.currentRoundTripTime != null ? pair.currentRoundTripTime * 1000 : null, - availableOutgoingBitrate: pair.availableOutgoingBitrate ?? null, + currentRoundTripTimeMs: rtt != null ? rtt * 1000 : null, + availableOutgoingBitrate: aob ?? null, local: local ? { type: local.candidateType, protocol: local.protocol, - address: (local as RTCIceCandidateStats & { address?: string }).address, + address: local.address, port: local.port, - networkType: (local as RTCIceCandidateStats & { networkType?: string }).networkType, + networkType: local.networkType, } : null, remote: remote ? { type: remote.candidateType, protocol: remote.protocol, - address: (remote as RTCIceCandidateStats & { address?: string }).address, + address: remote.address, port: remote.port, } : null, @@ -190,16 +222,12 @@ async function snapshotPastCandidates( try { const report = await pc.getStats(); report.forEach((stat) => { - if (stat.type === "local-candidate" || stat.type === "remote-candidate") { - const c = stat as RTCIceCandidateStats & { - address?: string; - networkType?: string; - relayProtocol?: string; - url?: string; - }; + const s = stat as unknown as IceCandidateStat; + if (s.type === "local-candidate" || s.type === "remote-candidate") { + const c = s; emit("ice-candidate-past", { side, - source: stat.type, // local-candidate | remote-candidate + source: c.type, // local-candidate | remote-candidate candidateType: c.candidateType, protocol: c.protocol, address: c.address ?? null, diff --git a/packages/sdk/src/realtime/signaling-channel.ts b/packages/sdk/src/realtime/signaling-channel.ts index c8daac4..cffcc05 100644 --- a/packages/sdk/src/realtime/signaling-channel.ts +++ b/packages/sdk/src/realtime/signaling-channel.ts @@ -372,8 +372,10 @@ export class SignalingChannel { roomName: msg.room_name, sessionId: msg.session_id, }); - } else if (msg.type === "session_id") { - this.config.observability?.emitInstrumentationEvent("signaling-received", { type: msg.type }); + } else if ((msg as { type?: string }).type === "session_id") { + // `session_id` isn't in the typed IncomingRealtimeMessage union but is + // sent by the bouncer; trace it for the connection timeline. + this.config.observability?.emitInstrumentationEvent("signaling-received", { type: "session_id" }); } for (const ack of [...this.pendingAcks]) { if (ack.matches(msg)) {