From 8aa2d2c9c4b3e2cdd1aa43eeaffa1c2c8db162f1 Mon Sep 17 00:00:00 2001 From: Kunal Dawar Date: Thu, 28 May 2026 19:25:41 +0530 Subject: [PATCH 1/4] feat(event-streams): improve subscribe UX and bump go-auth0/v2 to v2.12.0 Adds --no-reconnect / --max-reconnects flags wired to the new option.WithoutStreamReconnection / WithMaxStreamReconnectAttempts in go-auth0 v2.12.0, and --list-event-types to discover valid --event-type values. Filters connection_timeout error frames up front so JSON, output file, and rendered output all skip the SDK's reconnect artifacts. Tracks server-emitted error events separately from per-type counts and surfaces them in the disconnect summary alongside session duration. Aligns rendered event rows with fixed-width columns, prints a more readable resume hint that wraps cleanly on long cursors, and returns a single error reporting all invalid --event-type values at once instead of bailing on the first. --- go.mod | 2 +- go.sum | 4 +- internal/cli/event_streams_subscribe.go | 246 ++++++++++++++++--- internal/cli/event_streams_subscribe_test.go | 179 ++++++++++++++ 4 files changed, 398 insertions(+), 33 deletions(-) create mode 100644 internal/cli/event_streams_subscribe_test.go diff --git a/go.mod b/go.mod index d7ff3df9c..2f09b20d0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/PuerkitoBio/rehttp v1.4.0 github.com/atotto/clipboard v0.1.4 github.com/auth0/go-auth0 v1.42.0 - github.com/auth0/go-auth0/v2 v2.11.0 + github.com/auth0/go-auth0/v2 v2.12.0 github.com/briandowns/spinner v1.23.2 github.com/charmbracelet/glamour v1.0.0 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e diff --git a/go.sum b/go.sum index 0c1381211..dc4d71d7c 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI= github.com/auth0/go-auth0 v1.42.0 h1:mJcdCccDpSmUwpEM8qVHvta0uAJ1BzFYl/mmg7tUjOA= github.com/auth0/go-auth0 v1.42.0/go.mod h1:32sQB1uAn+99fJo6N819EniKq8h785p0ag0lMWhiTaE= -github.com/auth0/go-auth0/v2 v2.11.0 h1:qKJoPFvnN8iQKnbq4S7Ai9gpxpwnNLHM8UnEVHNhR4w= -github.com/auth0/go-auth0/v2 v2.11.0/go.mod h1:Q/Y3VZVoI3sw87VyTPhx2TQL6Sq4Q/iCP67rW2gcn+M= +github.com/auth0/go-auth0/v2 v2.12.0 h1:M6YphgwMB9H/3h9nbNOy05gEj0LkDLujA8QzZ22jk7c= +github.com/auth0/go-auth0/v2 v2.12.0/go.mod h1:Q/Y3VZVoI3sw87VyTPhx2TQL6Sq4Q/iCP67rW2gcn+M= github.com/aybabtme/iocontrol v0.0.0-20150809002002-ad15bcfc95a0 h1:0NmehRCgyk5rljDQLKUO+cRJCnduDyn11+zGZIc9Z48= github.com/aybabtme/iocontrol v0.0.0-20150809002002-ad15bcfc95a0/go.mod h1:6L7zgvqo0idzI7IO8de6ZC051AfXb5ipkIJ7bIA2tGA= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= diff --git a/internal/cli/event_streams_subscribe.go b/internal/cli/event_streams_subscribe.go index fcf605d6e..a744619d9 100644 --- a/internal/cli/event_streams_subscribe.go +++ b/internal/cli/event_streams_subscribe.go @@ -15,6 +15,7 @@ import ( "time" managementv2 "github.com/auth0/go-auth0/v2/management" + managementoption "github.com/auth0/go-auth0/v2/management/option" "github.com/spf13/cobra" "github.com/auth0/auth0-cli/internal/ansi" @@ -66,8 +67,59 @@ var ( Help: "Append every received event as a JSON line to this file (raw payload). " + "Independent of the stdout format.", } + + eventSubscribeNoReconnect = Flag{ + Name: "No Reconnect", + LongForm: "no-reconnect", + Help: "Disable transparent mid-stream reconnection. By default the SDK " + + "reconnects up to 5 times when the connection drops, preserving " + + "the cursor so no events are missed.", + } + + eventSubscribeMaxReconnects = Flag{ + Name: "Max Reconnects", + LongForm: "max-reconnects", + Help: "Maximum number of transparent mid-stream reconnect attempts. " + + "0 keeps the SDK default (5). Ignored when --no-reconnect is set.", + } + + eventSubscribeListEventTypes = Flag{ + Name: "List Event Types", + LongForm: "list-event-types", + Help: "Print every event type accepted by --event-type and exit, " + + "without opening a subscription.", + } ) +// supportedEventTypes drives --list-event-types and validates --event-type +// values up front so all bad values can be reported in one error. Sourced from +// SDK enum constants so a rename or removal in go-auth0 is a compile-time +// failure here. New types added to the SDK still need a manual append. +var supportedEventTypes = []string{ + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupCreated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupMemberAdded), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupMemberDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupRoleAssigned), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupRoleDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumGroupUpdated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationConnectionAdded), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationConnectionRemoved), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationConnectionUpdated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationCreated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationGroupRoleAssigned), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationGroupRoleDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberAdded), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberRoleAssigned), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationMemberRoleDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumOrganizationUpdated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumUserCreated), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumUserDeleted), + string(managementv2.EventStreamSubscribeEventsEventTypeEnumUserUpdated), +} + type subscribeInputs struct { From string FromTimestamp string @@ -75,6 +127,9 @@ type subscribeInputs struct { Verbose bool ShowHeartbeats bool OutputFile string + NoReconnect bool + MaxReconnects int + ListEventTypes bool } func subscribeEventStreamCmd(cli *cli) *cobra.Command { @@ -92,8 +147,10 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { "Heartbeat (`offset-only`) messages are suppressed by default and surfaced via " + "a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats " + "to render each one. Press Ctrl+C to disconnect; a per-type summary and the " + - "latest cursor will be printed so you can resume with --from.", + "latest cursor will be printed so you can resume with --from.\n\n" + + "Run with --list-event-types to print every value accepted by --event-type.", Example: ` auth0 event-streams subscribe + auth0 event-streams subscribe --list-event-types auth0 event-streams subscribe --event-type user.created auth0 event-streams subscribe --event-type user.created --event-type user.updated auth0 event-streams subscribe --from-timestamp 2026-05-01T00:00:00Z @@ -103,6 +160,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { auth0 event-streams subscribe --output-file events.jsonl auth0 event-streams subscribe --json | jq .`, RunE: func(cmd *cobra.Command, args []string) error { + if inputs.ListEventTypes { + cli.renderer.Infof(ansi.Bold("Supported event types:")) + for _, t := range supportedEventTypes { + cli.renderer.Output(" " + t) + } + return nil + } + req := &managementv2.SubscribeEventsRequestParameters{} if inputs.From != "" { @@ -113,6 +178,7 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } if len(inputs.EventTypes) > 0 { eventTypes := make([]*managementv2.EventStreamSubscribeEventsEventTypeEnum, 0, len(inputs.EventTypes)) + var invalid []string for _, t := range inputs.EventTypes { t = strings.TrimSpace(t) if t == "" { @@ -120,10 +186,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } enum, err := managementv2.NewEventStreamSubscribeEventsEventTypeEnumFromString(t) if err != nil { - return fmt.Errorf("invalid --event-type value %q: %w", t, err) + invalid = append(invalid, t) + continue } eventTypes = append(eventTypes, enum.Ptr()) } + if len(invalid) > 0 { + return invalidEventTypesError(invalid) + } req.EventType = eventTypes } @@ -137,7 +207,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { outFile = f } - stream, err := cli.apiv2.Events.Subscribe(cmd.Context(), req) + var subscribeOpts []managementoption.RequestOption + if inputs.NoReconnect { + subscribeOpts = append(subscribeOpts, managementoption.WithoutStreamReconnection()) + } else if inputs.MaxReconnects > 0 { + subscribeOpts = append(subscribeOpts, managementoption.WithMaxStreamReconnectAttempts(uint(inputs.MaxReconnects))) + } + + stream, err := cli.apiv2.Events.Subscribe(cmd.Context(), req, subscribeOpts...) if err != nil { return fmt.Errorf("failed to subscribe to events: %w", err) } @@ -151,12 +228,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { counts := map[string]int{} var ( totalEvents uint64 + errorEvents uint64 heartbeats uint64 lastOffset string lastHeartbeatAt time.Time countsMu sync.Mutex summaryOnce sync.Once streamClosed atomic.Bool + startedAt = time.Now() ) flushSummary := func() { @@ -167,23 +246,50 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { countsMu.Lock() defer countsMu.Unlock() + total := atomic.LoadUint64(&totalEvents) + errs := atomic.LoadUint64(&errorEvents) + hbs := atomic.LoadUint64(&heartbeats) + duration := time.Since(startedAt).Round(time.Second) + cli.renderer.Newline() cli.renderer.Infof(ansi.Bold("Disconnected. Summary:")) - cli.renderer.Infof(" Events received: %d", atomic.LoadUint64(&totalEvents)) - cli.renderer.Infof(" Heartbeats: %d", atomic.LoadUint64(&heartbeats)) + cli.renderer.Infof(" Duration: %s", duration) + cli.renderer.Infof(" Events received: %d", total) + if errs > 0 { + cli.renderer.Infof(" Errors: %s", ansi.BrightRed(fmt.Sprintf("%d", errs))) + } + cli.renderer.Infof(" Heartbeats: %d", hbs) if len(counts) > 0 { types := make([]string, 0, len(counts)) + maxLen := 0 for t := range counts { types = append(types, t) + if len(t) > maxLen { + maxLen = len(t) + } } - sort.Strings(types) + // Sort by count desc; tie-break alphabetically so output is stable. + sort.Slice(types, func(i, j int) bool { + if counts[types[i]] != counts[types[j]] { + return counts[types[i]] > counts[types[j]] + } + return types[i] < types[j] + }) + cli.renderer.Newline() + cli.renderer.Infof(" By type:") for _, t := range types { - cli.renderer.Infof(" %s %s %d", ansi.Faint("·"), t, counts[t]) + cli.renderer.Infof(" %s %d", padRight(t, maxLen), counts[t]) } } if lastOffset != "" { + // The cursor is opaque and often very long; print the + // command and cursor on separate lines so the cursor + // wraps cleanly and is easy to select / copy in a + // terminal. cli.renderer.Newline() - cli.renderer.Infof("Resume with: %s", ansi.Cyan(fmt.Sprintf("auth0 event-streams subscribe --from %s", lastOffset))) + cli.renderer.Infof("Resume from cursor %s:", ansi.Faint(shortOffset(lastOffset))) + cli.renderer.Output(" " + ansi.Cyan("auth0 event-streams subscribe --from \\")) + cli.renderer.Output(" " + ansi.Cyan(lastOffset)) } }) } @@ -209,16 +315,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { for { event, err := stream.Recv() if err != nil { - /* Treat as graceful shutdown if - - EOF (server closed) - - context cancelled - - we closed the stream ourselves (Ctrl+C) - - http2 body closed error (result of stream.Close()) */ + // The SDK auto-reconnects on mid-stream disconnects, so a + // transient body close no longer surfaces here. Treat as + // graceful only on EOF (terminator or reconnect attempts + // exhausted), context cancellation, or our own Ctrl+C. + // Everything else is a real error to show to the user. isGraceful := errors.Is(err, io.EOF) || - errors.Is(err, cmd.Context().Err()) || cmd.Context().Err() != nil || - streamClosed.Load() || - strings.Contains(err.Error(), "response body closed") + streamClosed.Load() if isGraceful { flushSummary() return nil @@ -232,6 +336,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { lastOffset = summary.offset } + // Connection_timeout is emitted by the server right before it + // drops the SSE connection; the SDK transparently reconnects + // using the cursor, so it's a protocol artifact and should + // not surface in any output sink (rendered, JSON, or file). + if summary.isError && summary.errorCode == "connection_timeout" { + continue + } + // Always persist raw payload to file if requested. if outFile != nil { raw, mErr := json.Marshal(&event) @@ -262,12 +374,16 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { continue } - atomic.AddUint64(&totalEvents, 1) - countsMu.Lock() - counts[summary.eventType]++ - countsMu.Unlock() - - renderEventSummary(cli, summary) + if summary.isError { + atomic.AddUint64(&errorEvents, 1) + renderErrorEvent(cli, summary) + } else { + atomic.AddUint64(&totalEvents, 1) + countsMu.Lock() + counts[summary.eventType]++ + countsMu.Unlock() + renderEventSummary(cli, summary) + } if inputs.Verbose { payload, mErr := json.MarshalIndent(&event, "", " ") @@ -287,6 +403,10 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { eventSubscribeVerbose.RegisterBool(cmd, &inputs.Verbose, false) eventSubscribeShowHeartbeats.RegisterBool(cmd, &inputs.ShowHeartbeats, false) eventSubscribeOutputFile.RegisterString(cmd, &inputs.OutputFile, "") + eventSubscribeNoReconnect.RegisterBool(cmd, &inputs.NoReconnect, false) + eventSubscribeMaxReconnects.RegisterInt(cmd, &inputs.MaxReconnects, 0) + eventSubscribeListEventTypes.RegisterBool(cmd, &inputs.ListEventTypes, false) + cmd.MarkFlagsMutuallyExclusive("no-reconnect", "max-reconnects") cmd.Flags().BoolVar(&cli.json, "json", false, "Output each event as JSON (one indented object per event).") cmd.Flags().BoolVar(&cli.jsonCompact, "json-compact", false, "Output each event as compact, single-line JSON (newline-delimited).") @@ -299,12 +419,15 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { // extracted by re-marshalling the SDK union type and pulling the standard // CloudEvents envelope fields. type eventSummary struct { - eventType string - isHeartbeat bool - offset string - id string - source string - time time.Time + eventType string + isHeartbeat bool + isError bool + offset string + id string + source string + time time.Time + errorCode string + errorMessage string } func summarizeEvent(ev *managementv2.EventStreamSubscribeEventsResponseContent) eventSummary { @@ -317,6 +440,20 @@ func summarizeEvent(ev *managementv2.EventStreamSubscribeEventsResponseContent) return s } + if s.eventType == "error" { + s.isError = true + if em := ev.GetError(); em != nil { + if d := em.GetError(); d != nil { + s.errorCode = string(d.GetCode()) + s.errorMessage = d.GetMessage() + if o := d.Offset; o != nil { + s.offset = *o + } + } + } + return s + } + // All concrete event payloads share the same shape: // { "type": "...", "offset": "...", "event": { "id", "time", "source", ... } } // Re-marshal once and extract the envelope generically so we don't have @@ -343,6 +480,15 @@ func summarizeEvent(ev *managementv2.EventStreamSubscribeEventsResponseContent) return s } +// Column widths for the rendered event line. Padded before coloring so that +// ANSI escape sequences don't throw off visual alignment. Values that exceed +// the width are kept full-length (we never truncate forensic data like event +// IDs or sources); only that one row jitters. +const ( + eventTypeColWidth = 32 + eventSourceColWidth = 32 +) + func renderEventSummary(cli *cli, s eventSummary) { ts := s.time if ts.IsZero() { @@ -351,13 +497,53 @@ func renderEventSummary(cli *cli, s eventSummary) { line := fmt.Sprintf( "%s %s %s %s", ansi.Faint(ts.Local().Format("15:04:05")), - ansi.Bold(colorForEventType(s.eventType)), - s.source, + ansi.Bold(colorForEventType(padRight(s.eventType, eventTypeColWidth))), + padRight(s.source, eventSourceColWidth), ansi.Faint(s.id), ) cli.renderer.Output(line) } +func renderErrorEvent(cli *cli, s eventSummary) { + ts := time.Now() + line := fmt.Sprintf( + "%s %s %s %s", + ansi.Faint(ts.Local().Format("15:04:05")), + ansi.Bold(colorForEventType(padRight("error", eventTypeColWidth))), + ansi.BrightRed(padRight(s.errorCode, eventSourceColWidth)), + s.errorMessage, + ) + cli.renderer.Output(line) +} + +// padRight pads s with spaces to width, or returns s unchanged if it already +// exceeds width. +func padRight(s string, width int) string { + if len(s) >= width { + return s + } + return s + strings.Repeat(" ", width-len(s)) +} + +// invalidEventTypesError builds a user-friendly error for one or more unknown +// --event-type values and points to --list-event-types for the full list. The +// SDK's underlying error mentions internal Go type names, so we don't surface +// it. +func invalidEventTypesError(values []string) error { + var b strings.Builder + if len(values) == 1 { + fmt.Fprintf(&b, "invalid --event-type value %q", values[0]) + } else { + quoted := make([]string, 0, len(values)) + for _, v := range values { + quoted = append(quoted, fmt.Sprintf("%q", v)) + } + fmt.Fprintf(&b, "invalid --event-type values: %s", strings.Join(quoted, ", ")) + } + b.WriteString("\nRun `auth0 event-streams subscribe --list-event-types` to see all supported types") + return errors.New(b.String()) +} + func renderHeartbeatLine(cli *cli, s eventSummary) { cli.renderer.Output(fmt.Sprintf( "%s %s %s", diff --git a/internal/cli/event_streams_subscribe_test.go b/internal/cli/event_streams_subscribe_test.go new file mode 100644 index 000000000..dfaeb3b85 --- /dev/null +++ b/internal/cli/event_streams_subscribe_test.go @@ -0,0 +1,179 @@ +package cli + +import ( + "encoding/json" + "strings" + "testing" + + managementv2 "github.com/auth0/go-auth0/v2/management" + "github.com/stretchr/testify/assert" +) + +func unmarshalSubscribeEvent(t *testing.T, raw string) *managementv2.EventStreamSubscribeEventsResponseContent { + t.Helper() + var ev managementv2.EventStreamSubscribeEventsResponseContent + if err := json.Unmarshal([]byte(raw), &ev); err != nil { + t.Fatalf("unmarshal subscribe event: %v", err) + } + return &ev +} + +func TestSummarizeEvent_Heartbeat(t *testing.T) { + ev := unmarshalSubscribeEvent(t, `{"type":"offset-only","offset":"abc123"}`) + + got := summarizeEvent(ev) + + assert.Equal(t, "offset-only", got.eventType) + assert.True(t, got.isHeartbeat) + assert.False(t, got.isError) + assert.Equal(t, "abc123", got.offset) +} + +func TestSummarizeEvent_Error(t *testing.T) { + ev := unmarshalSubscribeEvent(t, `{ + "type":"error", + "error":{ + "code":"rate_limited", + "message":"too many requests", + "offset":"cursor-9" + } + }`) + + got := summarizeEvent(ev) + + assert.Equal(t, "error", got.eventType) + assert.True(t, got.isError) + assert.False(t, got.isHeartbeat) + assert.Equal(t, "rate_limited", got.errorCode) + assert.Equal(t, "too many requests", got.errorMessage) + assert.Equal(t, "cursor-9", got.offset) +} + +func TestSummarizeEvent_ConnectionTimeoutIsRecognized(t *testing.T) { + // Connection_timeout is filtered upstream; summarizeEvent must still + // surface it as an error with the exact code so the filter can catch it. + ev := unmarshalSubscribeEvent(t, `{ + "type":"error", + "error":{ + "code":"connection_timeout", + "message":"server is closing the SSE connection" + } + }`) + + got := summarizeEvent(ev) + + assert.True(t, got.isError) + assert.Equal(t, "connection_timeout", got.errorCode) +} + +func TestSummarizeEvent_ConcreteEventEnvelope(t *testing.T) { + ev := unmarshalSubscribeEvent(t, `{ + "type":"user.created", + "offset":"off-42", + "event":{ + "specversion":"1.0", + "type":"user.created", + "source":"urn:auth0:tenant.example.com", + "id":"evt_abcdef123456", + "time":"2026-05-28T10:11:12Z", + "a0tenant":"tenant", + "a0stream":"es_xyz", + "data":{"user":{"user_id":"auth0|1"}} + } + }`) + + got := summarizeEvent(ev) + + assert.Equal(t, "user.created", got.eventType) + assert.False(t, got.isHeartbeat) + assert.False(t, got.isError) + assert.Equal(t, "off-42", got.offset) + assert.Equal(t, "evt_abcdef123456", got.id) + assert.Equal(t, "urn:auth0:tenant.example.com", got.source) + assert.Equal(t, 2026, got.time.Year()) +} + +func TestPadRight(t *testing.T) { + tests := []struct { + name string + in string + width int + want string + }{ + {"shorter than width pads with spaces", "abc", 5, "abc "}, + {"equal to width returned unchanged", "abcde", 5, "abcde"}, + {"longer than width returned unchanged", "abcdef", 5, "abcdef"}, + {"empty string fills full width", "", 4, " "}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, padRight(tc.in, tc.width)) + }) + } +} + +func TestShortOffset(t *testing.T) { + tests := []struct { + name string + in string + want string + }{ + {"short offset returned unchanged", "abc123", "abc123"}, + {"exactly 12 chars returned unchanged", "abcdef123456", "abcdef123456"}, + {"long offset truncated", "abcdef123456ghijkl", "abcdef12…ijkl"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, shortOffset(tc.in)) + }) + } +} + +func TestInvalidEventTypesError_Single(t *testing.T) { + err := invalidEventTypesError([]string{"new.users"}) + require := assert.New(t) + require.Error(err) + msg := err.Error() + require.Contains(msg, `"new.users"`) + require.Contains(msg, "--list-event-types") + // SDK's internal type name must not leak into user output. + require.NotContains(msg, "EventStreamSubscribeEventsEventTypeEnum") +} + +func TestInvalidEventTypesError_Multiple(t *testing.T) { + err := invalidEventTypesError([]string{"new.users", "free.users", "totally.bogus"}) + require := assert.New(t) + require.Error(err) + msg := err.Error() + // All three bad values must be surfaced. + require.Contains(msg, `"new.users"`) + require.Contains(msg, `"free.users"`) + require.Contains(msg, `"totally.bogus"`) + require.Contains(msg, "invalid --event-type values") + require.Contains(msg, "--list-event-types") + require.NotContains(msg, "EventStreamSubscribeEventsEventTypeEnum") +} + +func TestColorForEventType(t *testing.T) { + // We don't assert exact escape codes (those live in the ansi package); + // we just confirm the original event type is preserved inside the colored + // output so users still see the right text. + cases := []string{ + "user.created", + "user.updated", + "user.deleted", + "organization.member.added", + "organization.member.removed", + "role.assigned", + "unknown.event", + "error", + } + for _, c := range cases { + t.Run(c, func(t *testing.T) { + out := colorForEventType(c) + assert.True(t, strings.Contains(out, c), "colored output %q must contain %q", out, c) + }) + } +} From b9c934f4b43880594adcc2208e5223cadcf94f4e Mon Sep 17 00:00:00 2001 From: Kunal Dawar Date: Thu, 28 May 2026 19:37:34 +0530 Subject: [PATCH 2/4] docs: regenerate event-streams subscribe docs --- docs/auth0_event-streams_subscribe.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/auth0_event-streams_subscribe.md b/docs/auth0_event-streams_subscribe.md index 169441139..3fe3df505 100644 --- a/docs/auth0_event-streams_subscribe.md +++ b/docs/auth0_event-streams_subscribe.md @@ -14,6 +14,8 @@ Use --verbose to also print the full JSON payload after each summary, or --json Heartbeat (`offset-only`) messages are suppressed by default and surfaced via a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats to render each one. Press Ctrl+C to disconnect; a per-type summary and the latest cursor will be printed so you can resume with --from. +Run with --list-event-types to print every value accepted by --event-type. + ## Usage ``` auth0 event-streams subscribe [flags] @@ -23,6 +25,7 @@ auth0 event-streams subscribe [flags] ``` auth0 event-streams subscribe + auth0 event-streams subscribe --list-event-types auth0 event-streams subscribe --event-type user.created auth0 event-streams subscribe --event-type user.created --event-type user.updated auth0 event-streams subscribe --from-timestamp 2026-05-01T00:00:00Z @@ -42,6 +45,9 @@ auth0 event-streams subscribe [flags] --from-timestamp string RFC-3339 timestamp indicating where to start streaming events from. Use this on the initial query when no cursor (--from) is available; prefer --from on subsequent runs as it is more accurate. --json Output each event as JSON (one indented object per event). --json-compact Output each event as compact, single-line JSON (newline-delimited). + --list-event-types Print every event type accepted by --event-type and exit, without opening a subscription. + --max-reconnects int Maximum number of transparent mid-stream reconnect attempts. 0 keeps the SDK default (5). Ignored when --no-reconnect is set. + --no-reconnect Disable transparent mid-stream reconnection. By default the SDK reconnects up to 5 times when the connection drops, preserving the cursor so no events are missed. --output-file string Append every received event as a JSON line to this file (raw payload). Independent of the stdout format. --show-heartbeats offset-only Show every offset-only heartbeat as its own line. By default heartbeats are silently tracked and only the latest cursor is reported on disconnect. -v, --verbose Print the full JSON payload after each event summary line. From 91b3bccd67af58d7b7cb2022b49e348582ae8dcd Mon Sep 17 00:00:00 2001 From: Kunal Dawar Date: Sat, 30 May 2026 12:07:26 +0530 Subject: [PATCH 3/4] feat(event-streams): resume subscribe stream across server connection rotations The server rotates long-lived SSE connections every few minutes, which previously exhausted the SDK's lifetime reconnect cap and silently ended the session. Replace SDK-internal reconnection with a CLI-level resume loop that re-subscribes from the last cursor, so the session stays continuous and no events are missed. Backoff uses exponential delay with full jitter and only accrues on consecutive zero-progress failures; any forward progress resets it so healthy rotations reconnect instantly and silently. The server-advertised SSE retry directive is honored as a floor. --max-reconnects now caps consecutive failed attempts (0 = unlimited while progress is made), and Ctrl+C cancels via context so the summary always prints. Also collapse the duplicated resume hint into a single line. --- docs/auth0_event-streams_subscribe.md | 8 +- internal/cli/event_streams_subscribe.go | 236 ++++++++++++++----- internal/cli/event_streams_subscribe_test.go | 33 +++ 3 files changed, 216 insertions(+), 61 deletions(-) diff --git a/docs/auth0_event-streams_subscribe.md b/docs/auth0_event-streams_subscribe.md index 3fe3df505..4442e9c7e 100644 --- a/docs/auth0_event-streams_subscribe.md +++ b/docs/auth0_event-streams_subscribe.md @@ -12,7 +12,9 @@ By default, every received event is rendered as a single, color-coded summary li Use --verbose to also print the full JSON payload after each summary, or --json / --json-compact to emit raw JSON suitable for piping into `jq`. -Heartbeat (`offset-only`) messages are suppressed by default and surfaced via a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats to render each one. Press Ctrl+C to disconnect; a per-type summary and the latest cursor will be printed so you can resume with --from. +Heartbeat (`offset-only`) messages are suppressed by default and surfaced via a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats to render each one. + +The server rotates long-lived connections every few minutes; the command transparently resumes from the last cursor so the session stays continuous. Use --no-reconnect to exit when the connection ends, or --max-reconnects to cap the number of consecutive failed reconnect attempts. Press Ctrl+C to disconnect; a per-type summary and the latest cursor are printed so you can resume with --from. Run with --list-event-types to print every value accepted by --event-type. @@ -46,8 +48,8 @@ auth0 event-streams subscribe [flags] --json Output each event as JSON (one indented object per event). --json-compact Output each event as compact, single-line JSON (newline-delimited). --list-event-types Print every event type accepted by --event-type and exit, without opening a subscription. - --max-reconnects int Maximum number of transparent mid-stream reconnect attempts. 0 keeps the SDK default (5). Ignored when --no-reconnect is set. - --no-reconnect Disable transparent mid-stream reconnection. By default the SDK reconnects up to 5 times when the connection drops, preserving the cursor so no events are missed. + --max-reconnects int Maximum number of consecutive failed reconnect attempts before giving up. 0 (default) keeps retrying as long as the stream is making progress. Ignored when --no-reconnect is set. + --no-reconnect Disable automatic reconnection. By default the stream resumes from the last cursor after the server drops the connection, so no events are missed. With this flag the command exits when the stream ends. --output-file string Append every received event as a JSON line to this file (raw payload). Independent of the stdout format. --show-heartbeats offset-only Show every offset-only heartbeat as its own line. By default heartbeats are silently tracked and only the latest cursor is reported on disconnect. -v, --verbose Print the full JSON payload after each event summary line. diff --git a/internal/cli/event_streams_subscribe.go b/internal/cli/event_streams_subscribe.go index a744619d9..5383b5821 100644 --- a/internal/cli/event_streams_subscribe.go +++ b/internal/cli/event_streams_subscribe.go @@ -1,10 +1,12 @@ package cli import ( + "context" "encoding/json" "errors" "fmt" "io" + "math/rand/v2" "os" "os/signal" "sort" @@ -19,6 +21,7 @@ import ( "github.com/spf13/cobra" "github.com/auth0/auth0-cli/internal/ansi" + "github.com/auth0/auth0-cli/internal/auth0" ) var ( @@ -71,16 +74,17 @@ var ( eventSubscribeNoReconnect = Flag{ Name: "No Reconnect", LongForm: "no-reconnect", - Help: "Disable transparent mid-stream reconnection. By default the SDK " + - "reconnects up to 5 times when the connection drops, preserving " + - "the cursor so no events are missed.", + Help: "Disable automatic reconnection. By default the stream resumes from " + + "the last cursor after the server drops the connection, so no events " + + "are missed. With this flag the command exits when the stream ends.", } eventSubscribeMaxReconnects = Flag{ Name: "Max Reconnects", LongForm: "max-reconnects", - Help: "Maximum number of transparent mid-stream reconnect attempts. " + - "0 keeps the SDK default (5). Ignored when --no-reconnect is set.", + Help: "Maximum number of consecutive failed reconnect attempts before " + + "giving up. 0 (default) keeps retrying as long as the stream is " + + "making progress. Ignored when --no-reconnect is set.", } eventSubscribeListEventTypes = Flag{ @@ -146,8 +150,12 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { "or --json / --json-compact to emit raw JSON suitable for piping into `jq`.\n\n" + "Heartbeat (`offset-only`) messages are suppressed by default and surfaced via " + "a periodic faint indicator and a final cursor on disconnect; pass --show-heartbeats " + - "to render each one. Press Ctrl+C to disconnect; a per-type summary and the " + - "latest cursor will be printed so you can resume with --from.\n\n" + + "to render each one.\n\n" + + "The server rotates long-lived connections every few minutes; the command " + + "transparently resumes from the last cursor so the session stays continuous. " + + "Use --no-reconnect to exit when the connection ends, or --max-reconnects to cap " + + "the number of consecutive failed reconnect attempts. Press Ctrl+C to disconnect; " + + "a per-type summary and the latest cursor are printed so you can resume with --from.\n\n" + "Run with --list-event-types to print every value accepted by --event-type.", Example: ` auth0 event-streams subscribe auth0 event-streams subscribe --list-event-types @@ -207,19 +215,6 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { outFile = f } - var subscribeOpts []managementoption.RequestOption - if inputs.NoReconnect { - subscribeOpts = append(subscribeOpts, managementoption.WithoutStreamReconnection()) - } else if inputs.MaxReconnects > 0 { - subscribeOpts = append(subscribeOpts, managementoption.WithMaxStreamReconnectAttempts(uint(inputs.MaxReconnects))) - } - - stream, err := cli.apiv2.Events.Subscribe(cmd.Context(), req, subscribeOpts...) - if err != nil { - return fmt.Errorf("failed to subscribe to events: %w", err) - } - defer func() { _ = stream.Close() }() - useJSON := cli.json || cli.jsonCompact if !useJSON { cli.renderer.Infof(ansi.Faint("Subscribed to event stream. Press Ctrl+C to disconnect.")) @@ -234,7 +229,7 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { lastHeartbeatAt time.Time countsMu sync.Mutex summaryOnce sync.Once - streamClosed atomic.Bool + userInterrupted atomic.Bool startedAt = time.Now() ) @@ -282,22 +277,18 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } } if lastOffset != "" { - // The cursor is opaque and often very long; print the - // command and cursor on separate lines so the cursor - // wraps cleanly and is easy to select / copy in a - // terminal. cli.renderer.Newline() - cli.renderer.Infof("Resume from cursor %s:", ansi.Faint(shortOffset(lastOffset))) - cli.renderer.Output(" " + ansi.Cyan("auth0 event-streams subscribe --from \\")) - cli.renderer.Output(" " + ansi.Cyan(lastOffset)) + cli.renderer.Infof("Resume with: %s", ansi.Cyan(fmt.Sprintf("auth0 event-streams subscribe --from %s", lastOffset))) } }) } // The root command installs a SIGINT handler that calls os.Exit(0) // from a goroutine, which would skip our deferred summary. Reset - // it first so only our handler runs, then print the summary and - // exit cleanly ourselves. + // it first so only our handler runs: it cancels the stream context + // so the resume loop unwinds cleanly, prints the summary, and exits. + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() signal.Reset(os.Interrupt, syscall.SIGTERM) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) @@ -306,30 +297,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { if _, ok := <-sigCh; !ok { return } - streamClosed.Store(true) - _ = stream.Close() - flushSummary() - os.Exit(0) + userInterrupted.Store(true) + cancel() }() - for { - event, err := stream.Recv() - if err != nil { - // The SDK auto-reconnects on mid-stream disconnects, so a - // transient body close no longer surfaces here. Treat as - // graceful only on EOF (terminator or reconnect attempts - // exhausted), context cancellation, or our own Ctrl+C. - // Everything else is a real error to show to the user. - isGraceful := errors.Is(err, io.EOF) || - cmd.Context().Err() != nil || - streamClosed.Load() - if isGraceful { - flushSummary() - return nil - } - return fmt.Errorf("error receiving event: %w", err) - } - + // Per-event handler shared across reconnects. Returns true when an + // event counts as forward progress (a real event or heartbeat), + // which resets the reconnect backoff. + handleEvent := func(event managementv2.EventStreamSubscribeEventsResponseContent) bool { summary := summarizeEvent(&event) if summary.offset != "" { @@ -337,11 +312,10 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } // Connection_timeout is emitted by the server right before it - // drops the SSE connection; the SDK transparently reconnects - // using the cursor, so it's a protocol artifact and should - // not surface in any output sink (rendered, JSON, or file). + // drops the SSE connection; we resume from the cursor, so it's + // a protocol artifact and must not surface in any output sink. if summary.isError && summary.errorCode == "connection_timeout" { - continue + return false } // Always persist raw payload to file if requested. @@ -359,7 +333,7 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } else { cli.renderer.JSONResult(&event) } - continue + return true } if summary.isHeartbeat { @@ -371,7 +345,7 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { lastHeartbeatAt = time.Now() renderHeartbeatPulse(cli, summary) } - continue + return true } if summary.isError { @@ -393,6 +367,76 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { cli.renderer.Output(ansi.ColorizeJSON(string(payload))) } } + return true + } + + // Outer resume loop. The server drops long-lived SSE connections + // every few minutes by design; rather than treating that as the + // end of the stream, we transparently re-subscribe from the last + // cursor so the session feels continuous (the same model used by + // `stripe listen`, `kubectl logs -f`, and `aws logs tail --follow`). + // + // Backoff only kicks in for consecutive failures that made zero + // progress. Any forward progress resets it, so a healthy stream + // that simply rotates connections reconnects instantly. We give up + // only after --max-reconnects consecutive zero-progress failures + // (0 = retry indefinitely while progress is being made). + consecutiveFailures := 0 + for { + if ctx.Err() != nil { + flushSummary() + return nil + } + + progressed, serverRetry, err := runStreamSession(ctx, cli.apiv2, req, lastOffset, handleEvent) + + // Ctrl+C or parent cancellation: always a graceful exit. + if userInterrupted.Load() || errors.Is(err, context.Canceled) || ctx.Err() != nil { + flushSummary() + return nil + } + + if inputs.NoReconnect { + if err != nil { + return fmt.Errorf("error receiving event: %w", err) + } + flushSummary() + return nil + } + + // A clean end-of-stream (err == nil) or any session that made + // forward progress resets the backoff so connection rotations + // reconnect instantly. Only consecutive zero-progress failures + // accrue toward the --max-reconnects ceiling. + if err == nil || progressed { + consecutiveFailures = 0 + } else { + consecutiveFailures++ + } + + if inputs.MaxReconnects > 0 && consecutiveFailures > inputs.MaxReconnects { + flushSummary() + if !useJSON { + cli.renderer.Errorf("Giving up after %d consecutive reconnect attempts without progress.", inputs.MaxReconnects) + } + return fmt.Errorf("stream reconnection failed after %d consecutive attempts: %w", inputs.MaxReconnects, err) + } + + delay := reconnectBackoff(consecutiveFailures, serverRetry) + // Healthy connection rotations resume silently (even when the + // server asks for a short `retry:` wait) so a normal long-lived + // session looks seamless. Only surface a notice once we're + // actually backing off real, consecutive failures. + if !useJSON && consecutiveFailures > 0 { + cli.renderer.Infof(ansi.Faint(fmt.Sprintf("· connection lost, reconnecting in %s…", delay.Round(time.Millisecond)))) + } + + select { + case <-ctx.Done(): + flushSummary() + return nil + case <-time.After(delay): + } } }, } @@ -415,6 +459,82 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { return cmd } +// runStreamSession opens a single SSE subscription and pumps events through +// handleEvent until the connection ends or ctx is cancelled. SDK-internal +// reconnection is disabled so this function owns exactly one connection; the +// caller's outer loop is the single source of truth for reconnection. When +// resumeFrom is set it overrides req.From so each reconnect picks up exactly +// where the last delivered event left off. It returns whether the session made +// any forward progress (used to reset the caller's backoff), the server's most +// recently advertised SSE `retry:` interval (0 if none) so the caller never +// reconnects faster than the server asked, and the terminating error (nil on a +// clean server close). +func runStreamSession( + ctx context.Context, + api *auth0.APIV2, + req *managementv2.SubscribeEventsRequestParameters, + resumeFrom string, + handleEvent func(managementv2.EventStreamSubscribeEventsResponseContent) bool, +) (progressed bool, serverRetry time.Duration, err error) { + if resumeFrom != "" { + req.From = &resumeFrom + } + + stream, err := api.Events.Subscribe(ctx, req, managementoption.WithoutStreamReconnection()) + if err != nil { + return false, 0, err + } + defer func() { _ = stream.Close() }() + + for { + event, recvErr := stream.Recv() + if recvErr != nil { + // The server may advertise an SSE `retry:` directive telling + // clients how long to wait before reconnecting; honor it as a + // floor so we respect server-side load shedding. + retry := time.Duration(stream.LastRetryMs()) * time.Millisecond + // EOF means the server closed the stream normally (including the + // periodic connection rotation); surface it as a clean end so the + // caller resumes without counting it as a failure. + if errors.Is(recvErr, io.EOF) { + return progressed, retry, nil + } + return progressed, retry, recvErr + } + if handleEvent(event) { + progressed = true + } + } +} + +// reconnectBackoff returns the delay before the next reconnect attempt. It is +// the larger of the server-advertised SSE `retry:` interval (serverRetry) and +// our own exponential backoff with full jitter, so we never reconnect faster +// than the server asked. With no consecutive failures and no server directive +// the delay is 0 (a healthy connection rotation reconnects instantly). Each +// subsequent consecutive failure doubles the jitter ceiling up to +// reconnectMaxBackoff; the actual jitter is a random value in [0, ceiling) to +// avoid thundering-herd reconnects across many clients. +func reconnectBackoff(consecutiveFailures int, serverRetry time.Duration) time.Duration { + var jittered time.Duration + if consecutiveFailures > 0 { + ceiling := reconnectBaseBackoff << (consecutiveFailures - 1) + if ceiling > reconnectMaxBackoff || ceiling <= 0 { + ceiling = reconnectMaxBackoff + } + jittered = time.Duration(rand.Int64N(int64(ceiling))) + } + if serverRetry > jittered { + return serverRetry + } + return jittered +} + +const ( + reconnectBaseBackoff = 500 * time.Millisecond + reconnectMaxBackoff = 30 * time.Second +) + // eventSummary is a generic, payload-agnostic projection of an SSE message // extracted by re-marshalling the SDK union type and pulling the standard // CloudEvents envelope fields. diff --git a/internal/cli/event_streams_subscribe_test.go b/internal/cli/event_streams_subscribe_test.go index dfaeb3b85..cb98aee13 100644 --- a/internal/cli/event_streams_subscribe_test.go +++ b/internal/cli/event_streams_subscribe_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "strings" "testing" + "time" managementv2 "github.com/auth0/go-auth0/v2/management" "github.com/stretchr/testify/assert" @@ -156,6 +157,38 @@ func TestInvalidEventTypesError_Multiple(t *testing.T) { require.NotContains(msg, "EventStreamSubscribeEventsEventTypeEnum") } +func TestReconnectBackoff(t *testing.T) { + // Attempt 0 with no server directive (a healthy connection rotation) + // reconnects immediately. + assert.Equal(t, time.Duration(0), reconnectBackoff(0, 0)) + assert.Equal(t, time.Duration(0), reconnectBackoff(-3, 0)) + + // With full jitter the delay is always in [0, ceiling) and the ceiling + // grows exponentially but never exceeds reconnectMaxBackoff. + for attempt := 1; attempt <= 20; attempt++ { + ceiling := reconnectBaseBackoff << (attempt - 1) + if ceiling > reconnectMaxBackoff || ceiling <= 0 { + ceiling = reconnectMaxBackoff + } + for range 100 { + d := reconnectBackoff(attempt, 0) + assert.GreaterOrEqual(t, d, time.Duration(0), "attempt %d delay must be non-negative", attempt) + assert.Less(t, d, ceiling, "attempt %d delay must be below its ceiling", attempt) + assert.LessOrEqual(t, d, reconnectMaxBackoff, "attempt %d delay must never exceed the max", attempt) + } + } + + // A server-advertised retry acts as a floor: even on a healthy rotation + // (attempt 0) we never reconnect faster than the server asked. + assert.Equal(t, 3*time.Second, reconnectBackoff(0, 3*time.Second)) + + // When the server retry exceeds the jitter ceiling, it always wins. + for range 100 { + assert.GreaterOrEqual(t, reconnectBackoff(2, time.Minute), time.Minute, + "server retry must act as a floor regardless of jitter") + } +} + func TestColorForEventType(t *testing.T) { // We don't assert exact escape codes (those live in the ansi package); // we just confirm the original event type is preserved inside the colored From c0938673102539f66f6f3cfa59e2227993b0020e Mon Sep 17 00:00:00 2001 From: Kunal Dawar Date: Sat, 30 May 2026 12:10:36 +0530 Subject: [PATCH 4/4] fix(event-streams): enforce minimum interval between stream reconnects A connection that flaps (delivers a frame then dies immediately) counts as forward progress and resets the backoff, which could spin in a tight reconnect loop. Enforce a 1s floor between attempts based on how long the session actually lasted, so healthy long-lived sessions are unaffected but flapping connections are paced. --- internal/cli/event_streams_subscribe.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/cli/event_streams_subscribe.go b/internal/cli/event_streams_subscribe.go index 5383b5821..ec1b78502 100644 --- a/internal/cli/event_streams_subscribe.go +++ b/internal/cli/event_streams_subscribe.go @@ -388,7 +388,9 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { return nil } + attemptStart := time.Now() progressed, serverRetry, err := runStreamSession(ctx, cli.apiv2, req, lastOffset, handleEvent) + sessionDuration := time.Since(attemptStart) // Ctrl+C or parent cancellation: always a graceful exit. if userInterrupted.Load() || errors.Is(err, context.Canceled) || ctx.Err() != nil { @@ -423,6 +425,14 @@ func subscribeEventStreamCmd(cli *cli) *cobra.Command { } delay := reconnectBackoff(consecutiveFailures, serverRetry) + // Guard against a flapping connection that delivers a frame and + // dies immediately: each such session "progresses" and resets + // the backoff, which would otherwise spin in a tight reconnect + // loop. Enforce a minimum spacing between attempts based on how + // long the session actually lasted, regardless of progress. + if remaining := reconnectMinInterval - sessionDuration; remaining > delay { + delay = remaining + } // Healthy connection rotations resume silently (even when the // server asks for a short `retry:` wait) so a normal long-lived // session looks seamless. Only surface a notice once we're @@ -533,6 +543,10 @@ func reconnectBackoff(consecutiveFailures int, serverRetry time.Duration) time.D const ( reconnectBaseBackoff = 500 * time.Millisecond reconnectMaxBackoff = 30 * time.Second + // Floor between consecutive reconnect attempts regardless of progress, so a + // connection that flaps (delivers a frame then dies immediately) can't spin + // in a tight reconnect loop. + reconnectMinInterval = 1 * time.Second ) // eventSummary is a generic, payload-agnostic projection of an SSE message