Stop leaking JetStream consumers on reopen; tighten reconnect detection (2.0.3)#28
Merged
majkelx merged 1 commit intoMay 2, 2026
Conversation
…connect detection `MsgReader._reopen()` only called `unsubscribe()` on the old subscription, which detaches the local handle but does NOT delete the server-side JetStream consumer. Each NATS reconnect therefore left an orphan consumer on the stream; one orphan accumulates per reopen, indefinitely, until the process restarts (or the stream's consumer cap is hit). Symptom in the field: monotonically growing `nats consumer ls <stream>` output with creation timestamps lining up with NATS reconnect events. Fix mirrors the teardown order already used by `_close_pull_subscription`: capture `consumer_info()` before unsubscribing (so we still know the consumer name), unsubscribe, then `delete_consumer()`. Failures at any step are logged and ignored - the new subscription is created regardless, matching the previous resilience behaviour. Also reduces the in-fetch blocking_interval (10s → 2s) inside the non-nowait wait loop. The outer loop only checks `_reconnect_needed` between fetch cycles, so a low-rate subject can sit on a dead pull subscription for up to one cycle after a reconnect. Reduces the worst-case detection lag from ~10s to ~2s; total wait window is unchanged at 100s (50 cycles × 2s).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Bug
MsgReader._reopen()callspull_subscription.unsubscribe()on the old subscription, which detaches the local handle but does not delete the server-side JetStream consumer. Each NATS reconnect therefore leaves an orphan consumer on the stream; one orphan per reopen, accumulating indefinitely until the process restarts (or the stream's consumer cap is hit).Symptom in the field: monotonically growing `nats consumer ls ` output, with consumer creation timestamps lining up with NATS reconnect events.
The teardown sequence in `_close_pull_subscription()` already shows the correct pattern: `consumer_info() → unsubscribe() → delete_consumer()`. `_reopen()` was just missing the `delete_consumer()` step.
Fix
Two small changes in `serverish/messenger/msg_reader.py`:
_reopen()now deletes the old consumer (mirrors_close_pull_subscriptionteardown order). Capture `consumer_info()` before unsubscribing so we still know the consumer name; unsubscribe; then `delete_consumer()`. Failures at any step are logged and ignored — the new subscription is created regardless, matching the previous resilience.Tighten the reconnect-detection window in the non-nowait wait loop. The outer loop only checks `_reconnect_needed` between fetch cycles, so a low-rate subject can sit on a dead pull subscription for up to one full cycle after a NATS reconnect (silent callbacks until detection). Reduce `blocking_interval` from 10s to 2s; bump `max_wait_cycles` from 10 to 50 so the total 100s wait window is unchanged.
Compat
Test plan