diff --git a/docs/auth0_event-streams_subscribe.md b/docs/auth0_event-streams_subscribe.md index 169441139..4442e9c7e 100644 --- a/docs/auth0_event-streams_subscribe.md +++ b/docs/auth0_event-streams_subscribe.md @@ -12,7 +12,11 @@ By default, every received event is rendered as a single, color-coded summary li Use --verbose to also print the full JSON payload after each summary, or --json / --json-compact to emit raw JSON suitable for piping into `jq`. -Heartbeat (`offset-only`) messages are suppressed by default and surfaced via a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats to render each one. Press Ctrl+C to disconnect; a per-type summary and the latest cursor will be printed so you can resume with --from. +Heartbeat (`offset-only`) messages are suppressed by default and surfaced via a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats to render each one. + +The server rotates long-lived connections every few minutes; the command transparently resumes from the last cursor so the session stays continuous. Use --no-reconnect to exit when the connection ends, or --max-reconnects to cap the number of consecutive failed reconnect attempts. Press Ctrl+C to disconnect; a per-type summary and the latest cursor are printed so you can resume with --from. + +Run with --list-event-types to print every value accepted by --event-type. ## Usage ``` @@ -23,6 +27,7 @@ auth0 event-streams subscribe [flags] ``` auth0 event-streams subscribe + auth0 event-streams subscribe --list-event-types auth0 event-streams subscribe --event-type user.created auth0 event-streams subscribe --event-type user.created --event-type user.updated auth0 event-streams subscribe --from-timestamp 2026-05-01T00:00:00Z @@ -42,6 +47,9 @@ auth0 event-streams subscribe [flags] --from-timestamp string RFC-3339 timestamp indicating where to start streaming events from. Use this on the initial query when no cursor (--from) is available; prefer --from on subsequent runs as it is more accurate. --json Output each event as JSON (one indented object per event). --json-compact Output each event as compact, single-line JSON (newline-delimited). + --list-event-types Print every event type accepted by --event-type and exit, without opening a subscription. + --max-reconnects int Maximum number of consecutive failed reconnect attempts before giving up. 0 (default) keeps retrying as long as the stream is making progress. Ignored when --no-reconnect is set. + --no-reconnect Disable automatic reconnection. By default the stream resumes from the last cursor after the server drops the connection, so no events are missed. With this flag the command exits when the stream ends. --output-file string Append every received event as a JSON line to this file (raw payload). Independent of the stdout format. --show-heartbeats offset-only Show every offset-only heartbeat as its own line. By default heartbeats are silently tracked and only the latest cursor is reported on disconnect. -v, --verbose Print the full JSON payload after each event summary line. diff --git a/internal/cli/event_streams_subscribe.go b/internal/cli/event_streams_subscribe.go index fcf605d6e..ec1b78502 100644 --- a/internal/cli/event_streams_subscribe.go +++ b/internal/cli/event_streams_subscribe.go @@ -1,10 +1,12 @@ package cli import ( + "context" "encoding/json" "errors" "fmt" "io" + "math/rand/v2" "os" "os/signal" "sort" @@ -15,9 +17,11 @@ import ( "time" managementv2 "github.com/auth0/go-auth0/v2/management" + managementoption "github.com/auth0/go-auth0/v2/management/option" "github.com/spf13/cobra" "github.com/auth0/auth0-cli/internal/ansi" + "github.com/auth0/auth0-cli/internal/auth0" ) var ( @@ -66,8 +70,60 @@ var ( Help: "Append every received event as a JSON line to this file (raw payload). " + "Independent of the stdout format.", } + + eventSubscribeNoReconnect = Flag{ + Name: "No Reconnect", + LongForm: "no-reconnect", + Help: "Disable automatic reconnection. By default the stream resumes from " + + "the last cursor after the server drops the connection, so no events " + + "are missed. With this flag the command exits when the stream ends.", + } + + eventSubscribeMaxReconnects = Flag{ + Name: "Max Reconnects", + LongForm: "max-reconnects", + Help: "Maximum number of consecutive failed reconnect attempts before " + + "giving up. 0 (default) keeps retrying as long as the stream is " + + "making progress. Ignored when --no-reconnect is set.", + } + + eventSubscribeListEventTypes = Flag{ + Name: "List Event Types", + LongForm: "list-event-types", + Help: "Print every event type accepted by --event-type and exit, " + + "without opening a subscription.", + } ) +// supportedEventTypes drives --list-event-types and validates --event-type +// values up front so all bad values can be reported in one error. Sourced from +// SDK enum constants so a rename or removal in go-auth0 is a compile-time +// failure here. New types added to the SDK still need a manual append. +var supportedEventTypes = []string{ + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupCreated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupMemberAdded), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupMemberDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupRoleAssigned), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupRoleDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupUpdated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationConnectionAdded), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationConnectionRemoved), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationConnectionUpdated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationCreated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationGroupRoleAssigned), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationGroupRoleDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberAdded), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberRoleAssigned), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberRoleDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationUpdated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumUserCreated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumUserDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumUserUpdated), +} + type subscribeInputs struct { From string FromTimestamp string @@ -75,6 +131,9 @@ type subscribeInputs struct { Verbose bool ShowHeartbeats bool OutputFile string + NoReconnect bool + MaxReconnects int + ListEventTypes bool } func subscribeEventStreamCmd(cli *cli) *cobra.Command { @@ -91,9 +150,15 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { "or --json / --json-compact to emit raw JSON suitable for piping into `jq`.\n\n" + "Heartbeat (`offset-only`) messages are suppressed by default and surfaced via " + "a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats " + - "to render each one. Press Ctrl+C to disconnect; a per-type summary and the " + - "latest cursor will be printed so you can resume with --from.", + "to render each one.\n\n" + + "The server rotates long-lived connections every few minutes; the command " + + "transparently resumes from the last cursor so the session stays continuous. " + + "Use --no-reconnect to exit when the connection ends, or --max-reconnects to cap " + + "the number of consecutive failed reconnect attempts. Press Ctrl+C to disconnect; " + + "a per-type summary and the latest cursor are printed so you can resume with --from.\n\n" + + "Run with --list-event-types to print every value accepted by --event-type.", Example: ` auth0 event-streams subscribe + auth0 event-streams subscribe --list-event-types auth0 event-streams subscribe --event-type user.created auth0 event-streams subscribe --event-type user.created --event-type user.updated auth0 event-streams subscribe --from-timestamp 2026-05-01T00:00:00Z @@ -103,6 +168,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { auth0 event-streams subscribe --output-file events.jsonl auth0 event-streams subscribe --json | jq .`, RunE: func(cmd *cobra.Command, args []string) error { + if inputs.ListEventTypes { + cli.renderer.Infof(ansi.Bold("Supported event types:")) + for _, t := range supportedEventTypes { + cli.renderer.Output(" " + t) + } + return nil + } + req := &managementv2.SubscribeEventsRequestParameters{} if inputs.From != "" { @@ -113,6 +186,7 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } if len(inputs.EventTypes) > 0 { eventTypes := make([]*managementv2.EventStreamSubscribeEventsEventTypeEnum, 0, len(inputs.EventTypes)) + var invalid []string for _, t := range inputs.EventTypes { t = strings.TrimSpace(t) if t == "" { @@ -120,10 +194,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } enum, err := managementv2.NewEventStreamSubscribeEventsEventTypeEnumFromString(t) if err != nil { - return fmt.Errorf("invalid --event-type value %q: %w", t, err) + invalid = append(invalid, t) + continue } eventTypes = append(eventTypes, enum.Ptr()) } + if len(invalid) > 0 { + return invalidEventTypesError(invalid) + } req.EventType = eventTypes } @@ -137,12 +215,6 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { outFile = f } - stream, err := cli.apiv2.Events.Subscribe(cmd.Context(), req) - if err != nil { - return fmt.Errorf("failed to subscribe to events: %w", err) - } - defer func() { _ = stream.Close() }() - useJSON := cli.json || cli.jsonCompact if !useJSON { cli.renderer.Infof(ansi.Faint("Subscribed to event stream. Press Ctrl+C to disconnect.")) @@ -151,12 +223,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { counts := map[string]int{} var ( totalEvents uint64 + errorEvents uint64 heartbeats uint64 lastOffset string lastHeartbeatAt time.Time countsMu sync.Mutex summaryOnce sync.Once - streamClosed atomic.Bool + userInterrupted atomic.Bool + startedAt = time.Now() ) flushSummary := func() { @@ -167,18 +241,39 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { countsMu.Lock() defer countsMu.Unlock() + total := atomic.LoadUint64(&totalEvents) + errs := atomic.LoadUint64(&errorEvents) + hbs := atomic.LoadUint64(&heartbeats) + duration := time.Since(startedAt).Round(time.Second) + cli.renderer.Newline() cli.renderer.Infof(ansi.Bold("Disconnected. Summary:")) - cli.renderer.Infof(" Events received: %d", atomic.LoadUint64(&totalEvents)) - cli.renderer.Infof(" Heartbeats: %d", atomic.LoadUint64(&heartbeats)) + cli.renderer.Infof(" Duration: %s", duration) + cli.renderer.Infof(" Events received: %d", total) + if errs > 0 { + cli.renderer.Infof(" Errors: %s", ansi.BrightRed(fmt.Sprintf("%d", errs))) + } + cli.renderer.Infof(" Heartbeats: %d", hbs) if len(counts) > 0 { types := make([]string, 0, len(counts)) + maxLen := 0 for t := range counts { types = append(types, t) + if len(t) > maxLen { + maxLen = len(t) + } } - sort.Strings(types) + // Sort by count desc; tie-break alphabetically so output is stable. + sort.Slice(types, func(i, j int) bool { + if counts[types[i]] != counts[types[j]] { + return counts[types[i]] > counts[types[j]] + } + return types[i] < types[j] + }) + cli.renderer.Newline() + cli.renderer.Infof(" By type:") for _, t := range types { - cli.renderer.Infof(" %s %s %d", ansi.Faint("·"), t, counts[t]) + cli.renderer.Infof(" %s %d", padRight(t, maxLen), counts[t]) } } if lastOffset != "" { @@ -190,8 +285,10 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { // The root command installs a SIGINT handler that calls os.Exit(0) // from a goroutine, which would skip our deferred summary. Reset - // it first so only our handler runs, then print the summary and - // exit cleanly ourselves. + // it first so only our handler runs: it cancels the stream context + // so the resume loop unwinds cleanly, prints the summary, and exits. + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() signal.Reset(os.Interrupt, syscall.SIGTERM) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) @@ -200,38 +297,27 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { if _, ok := <-sigCh; !ok { return } - streamClosed.Store(true) - _ = stream.Close() - flushSummary() - os.Exit(0) + userInterrupted.Store(true) + cancel() }() - for { - event, err := stream.Recv() - if err != nil { - /* Treat as graceful shutdown if - - EOF (server closed) - - context cancelled - - we closed the stream ourselves (Ctrl+C) - - http2 body closed error (result of stream.Close()) */ - isGraceful := errors.Is(err, io.EOF) || - errors.Is(err, cmd.Context().Err()) || - cmd.Context().Err() != nil || - streamClosed.Load() || - strings.Contains(err.Error(), "response body closed") - if isGraceful { - flushSummary() - return nil - } - return fmt.Errorf("error receiving event: %w", err) - } - + // Per-event handler shared across reconnects. Returns true when an + // event counts as forward progress (a real event or heartbeat), + // which resets the reconnect backoff. + handleEvent := func(event managementv2.EventStreamSubscribeEventsResponseContent) bool { summary := summarizeEvent(&event) if summary.offset != "" { lastOffset = summary.offset } + // Connection_timeout is emitted by the server right before it + // drops the SSE connection; we resume from the cursor, so it's + // a protocol artifact and must not surface in any output sink. + if summary.isError && summary.errorCode == "connection_timeout" { + return false + } + // Always persist raw payload to file if requested. if outFile != nil { raw, mErr := json.Marshal(&event) @@ -247,7 +333,7 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } else { cli.renderer.JSONResult(&event) } - continue + return true } if summary.isHeartbeat { @@ -259,15 +345,19 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { lastHeartbeatAt = time.Now() renderHeartbeatPulse(cli, summary) } - continue + return true } - atomic.AddUint64(&totalEvents, 1) - countsMu.Lock() - counts[summary.eventType]++ - countsMu.Unlock() - - renderEventSummary(cli, summary) + if summary.isError { + atomic.AddUint64(&errorEvents, 1) + renderErrorEvent(cli, summary) + } else { + atomic.AddUint64(&totalEvents, 1) + countsMu.Lock() + counts[summary.eventType]++ + countsMu.Unlock() + renderEventSummary(cli, summary) + } if inputs.Verbose { payload, mErr := json.MarshalIndent(&event, "", " ") @@ -277,6 +367,86 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { cli.renderer.Output(ansi.ColorizeJSON(string(payload))) } } + return true + } + + // Outer resume loop. The server drops long-lived SSE connections + // every few minutes by design; rather than treating that as the + // end of the stream, we transparently re-subscribe from the last + // cursor so the session feels continuous (the same model used by + // `stripe listen`, `kubectl logs -f`, and `aws logs tail --follow`). + // + // Backoff only kicks in for consecutive failures that made zero + // progress. Any forward progress resets it, so a healthy stream + // that simply rotates connections reconnects instantly. We give up + // only after --max-reconnects consecutive zero-progress failures + // (0 = retry indefinitely while progress is being made). + consecutiveFailures := 0 + for { + if ctx.Err() != nil { + flushSummary() + return nil + } + + attemptStart := time.Now() + progressed, serverRetry, err := runStreamSession(ctx, cli.apiv2, req, lastOffset, handleEvent) + sessionDuration := time.Since(attemptStart) + + // Ctrl+C or parent cancellation: always a graceful exit. + if userInterrupted.Load() || errors.Is(err, context.Canceled) || ctx.Err() != nil { + flushSummary() + return nil + } + + if inputs.NoReconnect { + if err != nil { + return fmt.Errorf("error receiving event: %w", err) + } + flushSummary() + return nil + } + + // A clean end-of-stream (err == nil) or any session that made + // forward progress resets the backoff so connection rotations + // reconnect instantly. Only consecutive zero-progress failures + // accrue toward the --max-reconnects ceiling. + if err == nil || progressed { + consecutiveFailures = 0 + } else { + consecutiveFailures++ + } + + if inputs.MaxReconnects > 0 && consecutiveFailures > inputs.MaxReconnects { + flushSummary() + if !useJSON { + cli.renderer.Errorf("Giving up after %d consecutive reconnect attempts without progress.", inputs.MaxReconnects) + } + return fmt.Errorf("stream reconnection failed after %d consecutive attempts: %w", inputs.MaxReconnects, err) + } + + delay := reconnectBackoff(consecutiveFailures, serverRetry) + // Guard against a flapping connection that delivers a frame and + // dies immediately: each such session "progresses" and resets + // the backoff, which would otherwise spin in a tight reconnect + // loop. Enforce a minimum spacing between attempts based on how + // long the session actually lasted, regardless of progress. + if remaining := reconnectMinInterval - sessionDuration; remaining > delay { + delay = remaining + } + // Healthy connection rotations resume silently (even when the + // server asks for a short `retry:` wait) so a normal long-lived + // session looks seamless. Only surface a notice once we're + // actually backing off real, consecutive failures. + if !useJSON && consecutiveFailures > 0 { + cli.renderer.Infof(ansi.Faint(fmt.Sprintf("· connection lost, reconnecting in %s…", delay.Round(time.Millisecond)))) + } + + select { + case <-ctx.Done(): + flushSummary() + return nil + case <-time.After(delay): + } } }, } @@ -287,6 +457,10 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { eventSubscribeVerbose.RegisterBool(cmd, &inputs.Verbose, false) eventSubscribeShowHeartbeats.RegisterBool(cmd, &inputs.ShowHeartbeats, false) eventSubscribeOutputFile.RegisterString(cmd, &inputs.OutputFile, "") + eventSubscribeNoReconnect.RegisterBool(cmd, &inputs.NoReconnect, false) + eventSubscribeMaxReconnects.RegisterInt(cmd, &inputs.MaxReconnects, 0) + eventSubscribeListEventTypes.RegisterBool(cmd, &inputs.ListEventTypes, false) + cmd.MarkFlagsMutuallyExclusive("no-reconnect", "max-reconnects") cmd.Flags().BoolVar(&cli.json, "json", false, "Output each event as JSON (one indented object per event).") cmd.Flags().BoolVar(&cli.jsonCompact, "json-compact", false, "Output each event as compact, single-line JSON (newline-delimited).") @@ -295,16 +469,99 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { return cmd } +// runStreamSession opens a single SSE subscription and pumps events through +// handleEvent until the connection ends or ctx is cancelled. SDK-internal +// reconnection is disabled so this function owns exactly one connection; the +// caller's outer loop is the single source of truth for reconnection. When +// resumeFrom is set it overrides req.From so each reconnect picks up exactly +// where the last delivered event left off. It returns whether the session made +// any forward progress (used to reset the caller's backoff), the server's most +// recently advertised SSE `retry:` interval (0 if none) so the caller never +// reconnects faster than the server asked, and the terminating error (nil on a +// clean server close). +func runStreamSession( + ctx context.Context, + api *auth0.APIV2, + req *managementv2.SubscribeEventsRequestParameters, + resumeFrom string, + handleEvent func(managementv2.EventStreamSubscribeEventsResponseContent) bool, +) (progressed bool, serverRetry time.Duration, err error) { + if resumeFrom != "" { + req.From = &resumeFrom + } + + stream, err := api.Events.Subscribe(ctx, req, managementoption.WithoutStreamReconnection()) + if err != nil { + return false, 0, err + } + defer func() { _ = stream.Close() }() + + for { + event, recvErr := stream.Recv() + if recvErr != nil { + // The server may advertise an SSE `retry:` directive telling + // clients how long to wait before reconnecting; honor it as a + // floor so we respect server-side load shedding. + retry := time.Duration(stream.LastRetryMs()) * time.Millisecond + // EOF means the server closed the stream normally (including the + // periodic connection rotation); surface it as a clean end so the + // caller resumes without counting it as a failure. + if errors.Is(recvErr, io.EOF) { + return progressed, retry, nil + } + return progressed, retry, recvErr + } + if handleEvent(event) { + progressed = true + } + } +} + +// reconnectBackoff returns the delay before the next reconnect attempt. It is +// the larger of the server-advertised SSE `retry:` interval (serverRetry) and +// our own exponential backoff with full jitter, so we never reconnect faster +// than the server asked. With no consecutive failures and no server directive +// the delay is 0 (a healthy connection rotation reconnects instantly). Each +// subsequent consecutive failure doubles the jitter ceiling up to +// reconnectMaxBackoff; the actual jitter is a random value in [0, ceiling) to +// avoid thundering-herd reconnects across many clients. +func reconnectBackoff(consecutiveFailures int, serverRetry time.Duration) time.Duration { + var jittered time.Duration + if consecutiveFailures > 0 { + ceiling := reconnectBaseBackoff << (consecutiveFailures - 1) + if ceiling > reconnectMaxBackoff || ceiling <= 0 { + ceiling = reconnectMaxBackoff + } + jittered = time.Duration(rand.Int64N(int64(ceiling))) + } + if serverRetry > jittered { + return serverRetry + } + return jittered +} + +const ( + reconnectBaseBackoff = 500 * time.Millisecond + reconnectMaxBackoff = 30 * time.Second + // Floor between consecutive reconnect attempts regardless of progress, so a + // connection that flaps (delivers a frame then dies immediately) can't spin + // in a tight reconnect loop. + reconnectMinInterval = 1 * time.Second +) + // eventSummary is a generic, payload-agnostic projection of an SSE message // extracted by re-marshalling the SDK union type and pulling the standard // CloudEvents envelope fields. type eventSummary struct { - eventType string - isHeartbeat bool - offset string - id string - source string - time time.Time + eventType string + isHeartbeat bool + isError bool + offset string + id string + source string + time time.Time + errorCode string + errorMessage string } func summarizeEvent(ev *managementv2.EventStreamSubscribeEventsResponseContent) eventSummary { @@ -317,6 +574,20 @@ func summarizeEvent(ev *managementv2.EventStreamSubscribeEventsResponseContent) return s } + if s.eventType == "error" { + s.isError = true + if em := ev.GetError(); em != nil { + if d := em.GetError(); d != nil { + s.errorCode = string(d.GetCode()) + s.errorMessage = d.GetMessage() + if o := d.Offset; o != nil { + s.offset = *o + } + } + } + return s + } + // All concrete event payloads share the same shape: // { "type": "...", "offset": "...", "event": { "id", "time", "source", ... } } // Re-marshal once and extract the envelope generically so we don't have @@ -343,6 +614,15 @@ func summarizeEvent(ev *managementv2.EventStreamSubscribeEventsResponseContent) return s } +// Column widths for the rendered event line. Padded before coloring so that +// ANSI escape sequences don't throw off visual alignment. Values that exceed +// the width are kept full-length (we never truncate forensic data like event +// IDs or sources); only that one row jitters. +const ( + eventTypeColWidth = 32 + eventSourceColWidth = 32 +) + func renderEventSummary(cli *cli, s eventSummary) { ts := s.time if ts.IsZero() { @@ -351,13 +631,53 @@ func renderEventSummary(cli *cli, s eventSummary) { line := fmt.Sprintf( "%s %s %s %s", ansi.Faint(ts.Local().Format("15:04:05")), - ansi.Bold(colorForEventType(s.eventType)), - s.source, + ansi.Bold(colorForEventType(padRight(s.eventType, eventTypeColWidth))), + padRight(s.source, eventSourceColWidth), ansi.Faint(s.id), ) cli.renderer.Output(line) } +func renderErrorEvent(cli *cli, s eventSummary) { + ts := time.Now() + line := fmt.Sprintf( + "%s %s %s %s", + ansi.Faint(ts.Local().Format("15:04:05")), + ansi.Bold(colorForEventType(padRight("error", eventTypeColWidth))), + ansi.BrightRed(padRight(s.errorCode, eventSourceColWidth)), + s.errorMessage, + ) + cli.renderer.Output(line) +} + +// padRight pads s with spaces to width, or returns s unchanged if it already +// exceeds width. +func padRight(s string, width int) string { + if len(s) >= width { + return s + } + return s + strings.Repeat(" ", width-len(s)) +} + +// invalidEventTypesError builds a user-friendly error for one or more unknown +// --event-type values and points to --list-event-types for the full list. The +// SDK's underlying error mentions internal Go type names, so we don't surface +// it. +func invalidEventTypesError(values []string) error { + var b strings.Builder + if len(values) == 1 { + fmt.Fprintf(&b, "invalid --event-type value %q", values[0]) + } else { + quoted := make([]string, 0, len(values)) + for _, v := range values { + quoted = append(quoted, fmt.Sprintf("%q", v)) + } + fmt.Fprintf(&b, "invalid --event-type values: %s", strings.Join(quoted, ", ")) + } + b.WriteString("\nRun `auth0 event-streams subscribe --list-event-types` to see all supported types") + return errors.New(b.String()) +} + func renderHeartbeatLine(cli *cli, s eventSummary) { cli.renderer.Output(fmt.Sprintf( "%s %s %s", diff --git a/internal/cli/event_streams_subscribe_test.go b/internal/cli/event_streams_subscribe_test.go new file mode 100644 index 000000000..cb98aee13 --- /dev/null +++ b/internal/cli/event_streams_subscribe_test.go @@ -0,0 +1,212 @@ +package cli + +import ( + "encoding/json" + "strings" + "testing" + "time" + + managementv2 "github.com/auth0/go-auth0/v2/management" + "github.com/stretchr/testify/assert" +) + +func unmarshalSubscribeEvent(t *testing.T, raw string) *managementv2.EventStreamSubscribeEventsResponseContent { + t.Helper() + var ev managementv2.EventStreamSubscribeEventsResponseContent + if err := json.Unmarshal([]byte(raw), &ev); err != nil { + t.Fatalf("unmarshal subscribe event: %v", err) + } + return &ev +} + +func TestSummarizeEvent_Heartbeat(t *testing.T) { + ev := unmarshalSubscribeEvent(t, `{"type":"offset-only","offset":"abc123"}`) + + got := summarizeEvent(ev) + + assert.Equal(t, "offset-only", got.eventType) + assert.True(t, got.isHeartbeat) + assert.False(t, got.isError) + assert.Equal(t, "abc123", got.offset) +} + +func TestSummarizeEvent_Error(t *testing.T) { + ev := unmarshalSubscribeEvent(t, `{ + "type":"error", + "error":{ + "code":"rate_limited", + "message":"too many requests", + "offset":"cursor-9" + } + }`) + + got := summarizeEvent(ev) + + assert.Equal(t, "error", got.eventType) + assert.True(t, got.isError) + assert.False(t, got.isHeartbeat) + assert.Equal(t, "rate_limited", got.errorCode) + assert.Equal(t, "too many requests", got.errorMessage) + assert.Equal(t, "cursor-9", got.offset) +} + +func TestSummarizeEvent_ConnectionTimeoutIsRecognized(t *testing.T) { + // Connection_timeout is filtered upstream; summarizeEvent must still + // surface it as an error with the exact code so the filter can catch it. + ev := unmarshalSubscribeEvent(t, `{ + "type":"error", + "error":{ + "code":"connection_timeout", + "message":"server is closing the SSE connection" + } + }`) + + got := summarizeEvent(ev) + + assert.True(t, got.isError) + assert.Equal(t, "connection_timeout", got.errorCode) +} + +func TestSummarizeEvent_ConcreteEventEnvelope(t *testing.T) { + ev := unmarshalSubscribeEvent(t, `{ + "type":"user.created", + "offset":"off-42", + "event":{ + "specversion":"1.0", + "type":"user.created", + "source":"urn:auth0:tenant.example.com", + "id":"evt_abcdef123456", + "time":"2026-05-28T10:11:12Z", + "a0tenant":"tenant", + "a0stream":"es_xyz", + "data":{"user":{"user_id":"auth0|1"}} + } + }`) + + got := summarizeEvent(ev) + + assert.Equal(t, "user.created", got.eventType) + assert.False(t, got.isHeartbeat) + assert.False(t, got.isError) + assert.Equal(t, "off-42", got.offset) + assert.Equal(t, "evt_abcdef123456", got.id) + assert.Equal(t, "urn:auth0:tenant.example.com", got.source) + assert.Equal(t, 2026, got.time.Year()) +} + +func TestPadRight(t *testing.T) { + tests := []struct { + name string + in string + width int + want string + }{ + {"shorter than width pads with spaces", "abc", 5, "abc "}, + {"equal to width returned unchanged", "abcde", 5, "abcde"}, + {"longer than width returned unchanged", "abcdef", 5, "abcdef"}, + {"empty string fills full width", "", 4, " "}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, padRight(tc.in, tc.width)) + }) + } +} + +func TestShortOffset(t *testing.T) { + tests := []struct { + name string + in string + want string + }{ + {"short offset returned unchanged", "abc123", "abc123"}, + {"exactly 12 chars returned unchanged", "abcdef123456", "abcdef123456"}, + {"long offset truncated", "abcdef123456ghijkl", "abcdef12…ijkl"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, shortOffset(tc.in)) + }) + } +} + +func TestInvalidEventTypesError_Single(t *testing.T) { + err := invalidEventTypesError([]string{"new.users"}) + require := assert.New(t) + require.Error(err) + msg := err.Error() + require.Contains(msg, `"new.users"`) + require.Contains(msg, "--list-event-types") + // SDK's internal type name must not leak into user output. + require.NotContains(msg, "EventStreamSubscribeEventsEventTypeEnum") +} + +func TestInvalidEventTypesError_Multiple(t *testing.T) { + err := invalidEventTypesError([]string{"new.users", "free.users", "totally.bogus"}) + require := assert.New(t) + require.Error(err) + msg := err.Error() + // All three bad values must be surfaced. + require.Contains(msg, `"new.users"`) + require.Contains(msg, `"free.users"`) + require.Contains(msg, `"totally.bogus"`) + require.Contains(msg, "invalid --event-type values") + require.Contains(msg, "--list-event-types") + require.NotContains(msg, "EventStreamSubscribeEventsEventTypeEnum") +} + +func TestReconnectBackoff(t *testing.T) { + // Attempt 0 with no server directive (a healthy connection rotation) + // reconnects immediately. + assert.Equal(t, time.Duration(0), reconnectBackoff(0, 0)) + assert.Equal(t, time.Duration(0), reconnectBackoff(-3, 0)) + + // With full jitter the delay is always in [0, ceiling) and the ceiling + // grows exponentially but never exceeds reconnectMaxBackoff. + for attempt := 1; attempt <= 20; attempt++ { + ceiling := reconnectBaseBackoff << (attempt - 1) + if ceiling > reconnectMaxBackoff || ceiling <= 0 { + ceiling = reconnectMaxBackoff + } + for range 100 { + d := reconnectBackoff(attempt, 0) + assert.GreaterOrEqual(t, d, time.Duration(0), "attempt %d delay must be non-negative", attempt) + assert.Less(t, d, ceiling, "attempt %d delay must be below its ceiling", attempt) + assert.LessOrEqual(t, d, reconnectMaxBackoff, "attempt %d delay must never exceed the max", attempt) + } + } + + // A server-advertised retry acts as a floor: even on a healthy rotation + // (attempt 0) we never reconnect faster than the server asked. + assert.Equal(t, 3*time.Second, reconnectBackoff(0, 3*time.Second)) + + // When the server retry exceeds the jitter ceiling, it always wins. + for range 100 { + assert.GreaterOrEqual(t, reconnectBackoff(2, time.Minute), time.Minute, + "server retry must act as a floor regardless of jitter") + } +} + +func TestColorForEventType(t *testing.T) { + // We don't assert exact escape codes (those live in the ansi package); + // we just confirm the original event type is preserved inside the colored + // output so users still see the right text. + cases := []string{ + "user.created", + "user.updated", + "user.deleted", + "organization.member.added", + "organization.member.removed", + "role.assigned", + "unknown.event", + "error", + } + for _, c := range cases { + t.Run(c, func(t *testing.T) { + out := colorForEventType(c) + assert.True(t, strings.Contains(out, c), "colored output %q must contain %q", out, c) + }) + } +}