From 9b0a0a4340738f2db19459481dbc087a753d89f5 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Wed, 29 Apr 2026 14:55:46 -0700 Subject: [PATCH 1/2] add agents reporter with env --- observability/agentsv2obs/agent.go | 42 ++++ observability/agentsv2obs/gen_reporter.go | 141 ++++++++++++ .../agentsv2obs/gen_reporter_noop.go | 212 ++++++++++++++++++ observability/agentsv2obs/gen_source.go | 55 +++++ observability/reporter.go | 6 + 5 files changed, 456 insertions(+) create mode 100644 observability/agentsv2obs/agent.go create mode 100644 observability/agentsv2obs/gen_reporter.go create mode 100644 observability/agentsv2obs/gen_reporter_noop.go create mode 100644 observability/agentsv2obs/gen_source.go diff --git a/observability/agentsv2obs/agent.go b/observability/agentsv2obs/agent.go new file mode 100644 index 000000000..8e7860680 --- /dev/null +++ b/observability/agentsv2obs/agent.go @@ -0,0 +1,42 @@ +package agentsv2obs + +import "github.com/livekit/protocol/livekit" + +func JobKindFromProto(kind livekit.JobType) JobKind { + switch kind { + case livekit.JobType_JT_ROOM: + return JobKindRoom + case livekit.JobType_JT_PUBLISHER: + return JobKindPublisher + case livekit.JobType_JT_PARTICIPANT: + return JobKindParticipant + default: + return JobKindUndefined + } +} + +func JobStatusFromProto(status livekit.JobStatus) JobStatus { + switch status { + case livekit.JobStatus_JS_PENDING: + return JobStatusPending + case livekit.JobStatus_JS_RUNNING: + return JobStatusRunning + case livekit.JobStatus_JS_SUCCESS: + return JobStatusSuccess + case livekit.JobStatus_JS_FAILED: + return JobStatusFailed + default: + return JobStatusUndefined + } +} + +func WorkerStatusFromProto(status livekit.WorkerStatus) WorkerStatus { + switch status { + case livekit.WorkerStatus_WS_AVAILABLE: + return WorkerStatusAvailable + case livekit.WorkerStatus_WS_FULL: + return WorkerStatusFull + default: + return WorkerStatusUndefined + } +} diff --git a/observability/agentsv2obs/gen_reporter.go b/observability/agentsv2obs/gen_reporter.go new file mode 100644 index 000000000..5989a6192 --- /dev/null +++ b/observability/agentsv2obs/gen_reporter.go @@ -0,0 +1,141 @@ +// Code generated; DO NOT EDIT. + +package agentsv2obs + +import ( + "time" +) + +const Version_4FC3258 = true + +type KeyResolver interface { + Resolve(string) + Reset() +} + +type Reporter interface { + WithProject(id string) ProjectReporter + WithDeferredProject() (ProjectReporter, KeyResolver) +} + +type projectReporter interface { +} + +type ProjectTx interface { + projectReporter +} + +type ProjectReporter interface { + RegisterFunc(func(ts time.Time, tx ProjectTx) bool) + Tx(func(tx ProjectTx)) + TxAt(time.Time, func(tx ProjectTx)) + WithEnv(name string) EnvReporter + WithDeferredEnv() (EnvReporter, KeyResolver) + projectReporter +} + +type envReporter interface { +} + +type EnvTx interface { + Project() ProjectTx + envReporter +} + +type EnvReporter interface { + RegisterFunc(func(ts time.Time, tx EnvTx) bool) + Tx(func(tx EnvTx)) + TxAt(time.Time, func(tx EnvTx)) + WithCloudAgent(id string) CloudAgentReporter + WithDeferredCloudAgent() (CloudAgentReporter, KeyResolver) + envReporter +} + +type cloudAgentReporter interface { +} + +type CloudAgentTx interface { + Env() EnvTx + cloudAgentReporter +} + +type CloudAgentReporter interface { + RegisterFunc(func(ts time.Time, tx CloudAgentTx) bool) + Tx(func(tx CloudAgentTx)) + TxAt(time.Time, func(tx CloudAgentTx)) + WithAgent(name string) AgentReporter + WithDeferredAgent() (AgentReporter, KeyResolver) + cloudAgentReporter +} + +type agentReporter interface { +} + +type AgentTx interface { + CloudAgent() CloudAgentTx + agentReporter +} + +type AgentReporter interface { + RegisterFunc(func(ts time.Time, tx AgentTx) bool) + Tx(func(tx AgentTx)) + TxAt(time.Time, func(tx AgentTx)) + WithWorker(id string) WorkerReporter + WithDeferredWorker() (WorkerReporter, KeyResolver) + agentReporter +} + +type workerReporter interface { + ReportLoad(v float32) + ReportStatus(v WorkerStatus) + ReportStartTime(v time.Time) + ReportEndTime(v time.Time) + ReportJobsCurrent(v uint32) + ReportLive(v uint8) + ReportCPU(v int64) + ReportCPULimit(v int64) + ReportMem(v int64) + ReportMemLimit(v int64) + ReportRegion(v string) + ReportVersion(v string) + ReportSdkVersion(v string) + ReportState(v WorkerState) +} + +type WorkerTx interface { + Agent() AgentTx + workerReporter +} + +type WorkerReporter interface { + RegisterFunc(func(ts time.Time, tx WorkerTx) bool) + Tx(func(tx WorkerTx)) + TxAt(time.Time, func(tx WorkerTx)) + WithJob(id string) JobReporter + WithDeferredJob() (JobReporter, KeyResolver) + workerReporter +} + +type jobReporter interface { + ReportRoomSessionID(v string) + ReportKind(v JobKind) + ReportWorkerKind(v WorkerKind) + ReportStatus(v JobStatus) + ReportDuration(v uint32) + ReportDurationMinutes(v uint8) + ReportStartTime(v time.Time) + ReportEndTime(v time.Time) + ReportJoinLatency(v uint32) +} + +type JobTx interface { + Worker() WorkerTx + jobReporter +} + +type JobReporter interface { + RegisterFunc(func(ts time.Time, tx JobTx) bool) + Tx(func(tx JobTx)) + TxAt(time.Time, func(tx JobTx)) + jobReporter +} diff --git a/observability/agentsv2obs/gen_reporter_noop.go b/observability/agentsv2obs/gen_reporter_noop.go new file mode 100644 index 000000000..e8324e47d --- /dev/null +++ b/observability/agentsv2obs/gen_reporter_noop.go @@ -0,0 +1,212 @@ +// Code generated; DO NOT EDIT. + +package agentsv2obs + +import ( + "time" +) + +var ( + _ Reporter = (*noopReporter)(nil) + _ ProjectReporter = (*noopProjectReporter)(nil) + _ ProjectTx = (*noopProjectTx)(nil) + _ EnvReporter = (*noopEnvReporter)(nil) + _ EnvTx = (*noopEnvTx)(nil) + _ CloudAgentReporter = (*noopCloudAgentReporter)(nil) + _ CloudAgentTx = (*noopCloudAgentTx)(nil) + _ AgentReporter = (*noopAgentReporter)(nil) + _ AgentTx = (*noopAgentTx)(nil) + _ WorkerReporter = (*noopWorkerReporter)(nil) + _ WorkerTx = (*noopWorkerTx)(nil) + _ JobReporter = (*noopJobReporter)(nil) + _ JobTx = (*noopJobTx)(nil) +) + +type noopKeyResolver struct{} + +func (noopKeyResolver) Resolve(string) {} +func (noopKeyResolver) Reset() {} + +type noopReporter struct{} + +func NewNoopReporter() Reporter { + return &noopReporter{} +} + +func (r *noopReporter) WithProject(id string) ProjectReporter { + return &noopProjectReporter{} +} + +func (r *noopReporter) WithDeferredProject() (ProjectReporter, KeyResolver) { + return &noopProjectReporter{}, noopKeyResolver{} +} + +type noopProjectReporter struct{} + +func NewNoopProjectReporter() ProjectReporter { + return &noopProjectReporter{} +} + +func (r *noopProjectReporter) RegisterFunc(f func(ts time.Time, tx ProjectTx) bool) {} +func (r *noopProjectReporter) Tx(f func(ProjectTx)) {} +func (r *noopProjectReporter) TxAt(ts time.Time, f func(ProjectTx)) {} +func (r *noopProjectReporter) WithEnv(name string) EnvReporter { + return &noopEnvReporter{} +} +func (r *noopProjectReporter) WithDeferredEnv() (EnvReporter, KeyResolver) { + return &noopEnvReporter{}, noopKeyResolver{} +} + +type noopProjectTx struct{} + +type noopEnvReporter struct{} + +func NewNoopEnvReporter() EnvReporter { + return &noopEnvReporter{} +} + +func (r *noopEnvReporter) RegisterFunc(f func(ts time.Time, tx EnvTx) bool) {} +func (r *noopEnvReporter) Tx(f func(EnvTx)) {} +func (r *noopEnvReporter) TxAt(ts time.Time, f func(EnvTx)) {} +func (r *noopEnvReporter) WithCloudAgent(id string) CloudAgentReporter { + return &noopCloudAgentReporter{} +} +func (r *noopEnvReporter) WithDeferredCloudAgent() (CloudAgentReporter, KeyResolver) { + return &noopCloudAgentReporter{}, noopKeyResolver{} +} + +type noopEnvTx struct{} + +func (t *noopEnvTx) Project() ProjectTx { + return &noopProjectTx{} +} + +type noopCloudAgentReporter struct{} + +func NewNoopCloudAgentReporter() CloudAgentReporter { + return &noopCloudAgentReporter{} +} + +func (r *noopCloudAgentReporter) RegisterFunc(f func(ts time.Time, tx CloudAgentTx) bool) {} +func (r *noopCloudAgentReporter) Tx(f func(CloudAgentTx)) {} +func (r *noopCloudAgentReporter) TxAt(ts time.Time, f func(CloudAgentTx)) {} +func (r *noopCloudAgentReporter) WithAgent(name string) AgentReporter { + return &noopAgentReporter{} +} +func (r *noopCloudAgentReporter) WithDeferredAgent() (AgentReporter, KeyResolver) { + return &noopAgentReporter{}, noopKeyResolver{} +} + +type noopCloudAgentTx struct{} + +func (t *noopCloudAgentTx) Env() EnvTx { + return &noopEnvTx{} +} + +type noopAgentReporter struct{} + +func NewNoopAgentReporter() AgentReporter { + return &noopAgentReporter{} +} + +func (r *noopAgentReporter) RegisterFunc(f func(ts time.Time, tx AgentTx) bool) {} +func (r *noopAgentReporter) Tx(f func(AgentTx)) {} +func (r *noopAgentReporter) TxAt(ts time.Time, f func(AgentTx)) {} +func (r *noopAgentReporter) WithWorker(id string) WorkerReporter { + return &noopWorkerReporter{} +} +func (r *noopAgentReporter) WithDeferredWorker() (WorkerReporter, KeyResolver) { + return &noopWorkerReporter{}, noopKeyResolver{} +} + +type noopAgentTx struct{} + +func (t *noopAgentTx) CloudAgent() CloudAgentTx { + return &noopCloudAgentTx{} +} + +type noopWorkerReporter struct{} + +func NewNoopWorkerReporter() WorkerReporter { + return &noopWorkerReporter{} +} + +func (r *noopWorkerReporter) RegisterFunc(f func(ts time.Time, tx WorkerTx) bool) {} +func (r *noopWorkerReporter) Tx(f func(WorkerTx)) {} +func (r *noopWorkerReporter) TxAt(ts time.Time, f func(WorkerTx)) {} +func (r *noopWorkerReporter) ReportLoad(v float32) {} +func (r *noopWorkerReporter) ReportStatus(v WorkerStatus) {} +func (r *noopWorkerReporter) ReportStartTime(v time.Time) {} +func (r *noopWorkerReporter) ReportEndTime(v time.Time) {} +func (r *noopWorkerReporter) ReportJobsCurrent(v uint32) {} +func (r *noopWorkerReporter) ReportLive(v uint8) {} +func (r *noopWorkerReporter) ReportCPU(v int64) {} +func (r *noopWorkerReporter) ReportCPULimit(v int64) {} +func (r *noopWorkerReporter) ReportMem(v int64) {} +func (r *noopWorkerReporter) ReportMemLimit(v int64) {} +func (r *noopWorkerReporter) ReportRegion(v string) {} +func (r *noopWorkerReporter) ReportVersion(v string) {} +func (r *noopWorkerReporter) ReportSdkVersion(v string) {} +func (r *noopWorkerReporter) ReportState(v WorkerState) {} +func (r *noopWorkerReporter) WithJob(id string) JobReporter { + return &noopJobReporter{} +} +func (r *noopWorkerReporter) WithDeferredJob() (JobReporter, KeyResolver) { + return &noopJobReporter{}, noopKeyResolver{} +} + +type noopWorkerTx struct{} + +func (t *noopWorkerTx) Agent() AgentTx { + return &noopAgentTx{} +} + +func (t *noopWorkerTx) ReportLoad(v float32) {} +func (t *noopWorkerTx) ReportStatus(v WorkerStatus) {} +func (t *noopWorkerTx) ReportStartTime(v time.Time) {} +func (t *noopWorkerTx) ReportEndTime(v time.Time) {} +func (t *noopWorkerTx) ReportJobsCurrent(v uint32) {} +func (t *noopWorkerTx) ReportLive(v uint8) {} +func (t *noopWorkerTx) ReportCPU(v int64) {} +func (t *noopWorkerTx) ReportCPULimit(v int64) {} +func (t *noopWorkerTx) ReportMem(v int64) {} +func (t *noopWorkerTx) ReportMemLimit(v int64) {} +func (t *noopWorkerTx) ReportRegion(v string) {} +func (t *noopWorkerTx) ReportVersion(v string) {} +func (t *noopWorkerTx) ReportSdkVersion(v string) {} +func (t *noopWorkerTx) ReportState(v WorkerState) {} + +type noopJobReporter struct{} + +func NewNoopJobReporter() JobReporter { + return &noopJobReporter{} +} + +func (r *noopJobReporter) RegisterFunc(f func(ts time.Time, tx JobTx) bool) {} +func (r *noopJobReporter) Tx(f func(JobTx)) {} +func (r *noopJobReporter) TxAt(ts time.Time, f func(JobTx)) {} +func (r *noopJobReporter) ReportRoomSessionID(v string) {} +func (r *noopJobReporter) ReportKind(v JobKind) {} +func (r *noopJobReporter) ReportWorkerKind(v WorkerKind) {} +func (r *noopJobReporter) ReportStatus(v JobStatus) {} +func (r *noopJobReporter) ReportDuration(v uint32) {} +func (r *noopJobReporter) ReportDurationMinutes(v uint8) {} +func (r *noopJobReporter) ReportStartTime(v time.Time) {} +func (r *noopJobReporter) ReportEndTime(v time.Time) {} +func (r *noopJobReporter) ReportJoinLatency(v uint32) {} + +type noopJobTx struct{} + +func (t *noopJobTx) Worker() WorkerTx { + return &noopWorkerTx{} +} + +func (t *noopJobTx) ReportRoomSessionID(v string) {} +func (t *noopJobTx) ReportKind(v JobKind) {} +func (t *noopJobTx) ReportWorkerKind(v WorkerKind) {} +func (t *noopJobTx) ReportStatus(v JobStatus) {} +func (t *noopJobTx) ReportDuration(v uint32) {} +func (t *noopJobTx) ReportDurationMinutes(v uint8) {} +func (t *noopJobTx) ReportStartTime(v time.Time) {} +func (t *noopJobTx) ReportEndTime(v time.Time) {} +func (t *noopJobTx) ReportJoinLatency(v uint32) {} diff --git a/observability/agentsv2obs/gen_source.go b/observability/agentsv2obs/gen_source.go new file mode 100644 index 000000000..30680c903 --- /dev/null +++ b/observability/agentsv2obs/gen_source.go @@ -0,0 +1,55 @@ +// Code generated; DO NOT EDIT. +package agentsv2obs + +type WorkerStatus string + +const ( + WorkerStatusUndefined WorkerStatus = "" + WorkerStatusAvailable WorkerStatus = "available" + WorkerStatusFull WorkerStatus = "full" +) + +type WorkerState string + +const ( + WorkerStateUndefined WorkerState = "" + WorkerStateOnline WorkerState = "online" + WorkerStateOffline WorkerState = "offline" +) + +type JobKind string + +const ( + JobKindUndefined JobKind = "" + JobKindRoom JobKind = "room" + JobKindPublisher JobKind = "publisher" + JobKindParticipant JobKind = "participant" +) + +type WorkerKind string + +const ( + WorkerKindUndefined WorkerKind = "" + WorkerKindCloud WorkerKind = "cloud" + WorkerKindSelfhost WorkerKind = "selfhost" +) + +type JobStatus string + +const ( + JobStatusUndefined JobStatus = "" + JobStatusPending JobStatus = "pending" + JobStatusRunning JobStatus = "running" + JobStatusSuccess JobStatus = "success" + JobStatusFailed JobStatus = "failed" +) + +type Rollup string + +const ( + RollupUndefined Rollup = "" + RollupAgent Rollup = "agent" + RollupWorker Rollup = "worker" + RollupWorkerSeries Rollup = "worker_series" + RollupJob Rollup = "job" +) diff --git a/observability/reporter.go b/observability/reporter.go index e7fd236b7..eff488b9f 100644 --- a/observability/reporter.go +++ b/observability/reporter.go @@ -3,6 +3,7 @@ package observability import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/observability/agentsobs" + "github.com/livekit/protocol/observability/agentsv2obs" "github.com/livekit/protocol/observability/corecallobs" "github.com/livekit/protocol/observability/egressobs" "github.com/livekit/protocol/observability/gatewayobs" @@ -19,6 +20,7 @@ type Reporter interface { Logger(name, projectID string) (logger.Logger, error) Room() roomobs.Reporter Agent() agentsobs.Reporter + AgentV2() agentsv2obs.Reporter Gateway() gatewayobs.Reporter Telephony() telephonyobs.Reporter Egress() egressobs.Reporter @@ -47,6 +49,10 @@ func (reporter) Agent() agentsobs.Reporter { return agentsobs.NewNoopReporter() } +func (reporter) AgentV2() agentsv2obs.Reporter { + return agentsv2obs.NewNoopReporter() +} + func (reporter) Gateway() gatewayobs.Reporter { return gatewayobs.NewNoopReporter() } From d38aa7618b17876c60ef86bf1f230472bb7299bd Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Wed, 29 Apr 2026 15:04:06 -0700 Subject: [PATCH 2/2] Create khaki-drinks-mate.md --- .changeset/khaki-drinks-mate.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/khaki-drinks-mate.md diff --git a/.changeset/khaki-drinks-mate.md b/.changeset/khaki-drinks-mate.md new file mode 100644 index 000000000..1f95fccb2 --- /dev/null +++ b/.changeset/khaki-drinks-mate.md @@ -0,0 +1,5 @@ +--- +"@livekit/protocol": patch +--- + +add agents reporter with env