Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a7b2fd5
tests with nats fixed
majkelx Dec 13, 2025
92aee35
stablity of the reader
majkelx Dec 13, 2025
3033bbe
docs: initialize project
majkelx Mar 23, 2026
4a93267
feat: enhance NATS connection monitoring with slow consumer tracking
majkelx Mar 24, 2026
4ef39c9
fix(01-01): replace exit() with RuntimeError and narrow bare except
majkelx Mar 24, 2026
3c7aeed
fix(01-01): fix JestStream typos and conftest find_nats_host bug
majkelx Mar 24, 2026
66d9f45
Merge branch 'worktree-agent-af2766eb' into fixsubscriptions
majkelx Mar 24, 2026
5043eac
feat: add NATS JetStream client considerations and reliability tests
majkelx Mar 24, 2026
9e300e3
chore(02-01): install test dependencies and update pytest configuration
majkelx Mar 24, 2026
e9c0e16
feat(02-01): rewrite conftest.py with all fixtures and update CI work…
majkelx Mar 24, 2026
a6d8f13
feat(02-01): create infrastructure smoke tests and fix event loop sco…
majkelx Mar 24, 2026
d7ccf31
Merge branch 'worktree-agent-a33198fa' into fixsubscriptions
majkelx Mar 24, 2026
3efba6d
feat(02-03): migrate 9 test files to fixture-based infrastructure
majkelx Mar 24, 2026
fe95cf9
fix(02-03): add backward-compatible ci shim for parallel plan execution
majkelx Mar 24, 2026
8060115
feat(02-02): migrate 6 high-volume test files to fixture-based infras…
majkelx Mar 24, 2026
95834e6
feat(02-02): migrate 5 remaining test files to fixture-based infrastr…
majkelx Mar 24, 2026
c2f607c
Merge branch 'worktree-agent-a5c1d3fe' into fixsubscriptions
majkelx Mar 24, 2026
130dc6c
fix: correction of tests
majkelx Mar 24, 2026
fbbc835
fix(tests): prevent singleton cascade failures and skip ping tests in CI
majkelx Mar 24, 2026
ae49044
fix(tests): always close before reopen in singleton lifecycle tests
majkelx Mar 24, 2026
a440ca4
fix(tests): increase timeout for scheduled_open_fail test
majkelx Mar 24, 2026
0aec0ed
fix(ci): skip timing-sensitive test in CI, bump Python to 3.11
majkelx Mar 24, 2026
4a14fe8
feat(03-01): create test_corr_patterns.py with happy-path tests for a…
majkelx Mar 24, 2026
721b60b
feat(03-01): create test_corr_health.py with health_status tests for …
majkelx Mar 24, 2026
7e51038
feat(03-03): add correctness tests for all 3 error_behavior modes (CO…
majkelx Mar 24, 2026
e7376f1
Merge branch 'worktree-agent-ac2b6312' into fixsubscriptions
majkelx Mar 24, 2026
3049805
feat(03-02): create consumer lifecycle tests and decompose test_messe…
majkelx Mar 24, 2026
e930a2f
feat(03-02): create publisher error/success tracking tests
majkelx Mar 24, 2026
c49ea2c
feat(04-01): add resilience test infrastructure to conftest.py and py…
majkelx Mar 24, 2026
ce30b8c
feat(04-01): create reader recovery resilience tests (RESL-01)
majkelx Mar 24, 2026
4dccda9
feat(04-03): create RPC responder reconnect resilience tests (RESL-03)
majkelx Mar 24, 2026
dd23822
feat(04-03): create health_status transition tests across all drivers…
majkelx Mar 24, 2026
558f7be
feat(04-02): create consumer expiry detection and recreation tests (R…
majkelx Mar 24, 2026
6b96e65
feat(04-02): create publisher disconnect handling tests (RESL-04)
majkelx Mar 24, 2026
cc25aa2
Merge branch 'worktree-agent-a03113af' into fixsubscriptions
majkelx Mar 24, 2026
a1ad9fb
feat(05-02): create CPU-bound blocking recovery tests (COMP-02)
majkelx Mar 24, 2026
f374660
feat(05-01): create branch comparison failure tests (COMP-01)
majkelx Mar 24, 2026
24c67ff
feat(05-01): create recovery timing benchmark tests (COMP-03)
majkelx Mar 24, 2026
ce3d163
feat(05-02): create slow consumer detection test (COMP-04)
majkelx Mar 24, 2026
dde5e50
Merge branch 'worktree-agent-ad30521a' into fixsubscriptions
majkelx Mar 24, 2026
ba891d0
fix(ci): repair Messenger singleton state leakage between test modules
majkelx Mar 24, 2026
4f43e3b
fix(ci): repair Messenger singleton state leakage between test modules
majkelx Mar 24, 2026
5741f3f
ver bump
majkelx Mar 24, 2026
a43827a
readme commit
majkelx Mar 24, 2026
9fc2579
scaffold serverish.messenger.transport for thread-separated NATS I/O
majkelx Mar 24, 2026
bba5211
Merge remote-tracking branch 'origin/master' into fixsubscriptions
majkelx Apr 19, 2026
513c9e9
test: rewrite core pub/sub tests in fixture-based style
majkelx Apr 19, 2026
464446d
Merge branch 'fixsubscriptions' into feat/reader-redesign
majkelx Apr 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions .github/workflows/test-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,20 @@ jobs:
test:
runs-on: ubuntu-latest

services:
nats:
image: nats:latest
ports:
- 4222:4222

steps:
- name: Check out code
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Set up Python 3.10
uses: actions/setup-python@v2
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"

- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python -
run: curl -sSL https://install.python-poetry.org | python -

- name: Install dependencies
run: |
poetry install
run: poetry install -E messenger -E dns

- name: Run tests
run: |
poetry run pytest
run: poetry run pytest
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,5 @@ dmypy.json

/docker/nats/nats-log/
/docker/nats/nats-store/
.planning/
.claude/
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ this will install `nats-py` package.
## Plans
- Decorators `@messenger.sub(subject)` (Web framework style)
- Progress with automatic increment for known durations e.g. `pro_pub.update(completed=1, when=+20)`
- Pytest fixtures for NATS on CI
- Custom JetStream API replacing nats-py private internals access

## Changes
* 1.7 Subscription reliability: reader auto-recovers from NATS disconnects and consumer expiry,
RPC responder auto-resubscribes, `health_status` property on all drivers (reader, publisher,
RPC responder, connection) with reconnect counts, error tracking, slow consumer detection.
Comprehensive test suite (194 tests) with testcontainers-based NATS fixtures.
* 1.5 Introduces live documents (`LiveDocument`, `get_documentreader()`, `get_live_document()`) for auto-updating configuration from NATS.
* 1.4 Contains improvements to the network problems recovery and connection tracking.
* 1.1 Switches to `param` 2.* nad `py-nats` 1.7.*. Also, publisher may reraise exceptions or ignore them.
Expand Down
325 changes: 325 additions & 0 deletions doc/nats-client-considerations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@

# NATS JetStream Client Under Adverse Conditions (Python asyncio)

Problems, Failure Modes, and Design Strategies


1. Context

System model
• Python client (asyncio, nats.py)
• JetStream (pull consumers)
• User code runs in same process, often:
• blocking I/O
• CPU-bound
• sometimes pathological (UI, sync libs)

Core constraint (CPython)
• GIL: tylko jeden wątek wykonuje bytecode naraz 
• przełączanie między wątkami następuje okresowo między instrukcjami 
• GIL:
• chroni runtime
• nie daje thread-safety logiki aplikacji


2. Fundamental mismatch

Asyncio assumptions
• cooperative scheduling
• tasks must await to yield

Real world
• user code:
• nie yielduje
• blokuje event loop
• nie kontrolujesz tego

➡️ Wniosek:

Nie możesz polegać na user-loop jako części systemu transportowego


3. Failure Modes (realne, produkcyjne)

3.1 Event loop starvation

Cause:
• sync I/O
• CPU loop
• long callback

Effect:
• brak read z socketu
• brak ACK flush
• brak PONG handling

Outcome:
• disconnect (client stale)
• albo server disconnect (slow consumer)


3.2 ACK starvation

Cause:
• ACK wykonywany w user-loop

Effect:
• ACK opóźniony lub brak

Outcome:
• redelivery
• duplikaty
• max_ack_pending exhaustion


3.3 Pull lifecycle break

Pull request:
• jest ephemeral
• powiązany z inbox

Disconnect →
• pull traci sens
• inbox przestaje być aktywny

➡️ wymagany restart pull-loop


3.4 Queue coupling bug

Jeśli:
• queue w user-loop
• enqueue zależy od user-loop

To:
• freeze loop = brak enqueue
• brak ACK (w trybie enqueue-ack)

➡️ fałszywe bezpieczeństwo


3.5 Thread-safety illusion
• asyncio.Queue nie jest thread-safe 
• GIL ≠ thread safety
• multi-step operations mogą się przeplatać 


4. Design goals

System ma być:
1. Transport-safe
• NATS connection nie zależy od user-loop
2. Backpressure-aware
• brak niekontrolowanego wzrostu pamięci
3. Semantically explicit
• ACK policy jawna
4. Failure-resilient
• reconnect + restart pull


5. Proven Architecture

5.1 Separation of concerns

NATS loop (thread A)
• socket I/O
• parsing
• pull
• enqueue
• ACK (opcjonalnie)

User loop (thread B)
• processing
• iteracja async for


5.2 Core invariant

User code nigdy nie blokuje NATS loop


6. Queue Architecture

6.1 Queue MUST live in NATS loop

Dlaczego:
• enqueue + ACK muszą być atomowe względem transportu
• user-loop nie jest deterministyczny


6.2 Cross-thread consumption pattern

User loop:

async def __anext__(self):
cfut = asyncio.run_coroutine_threadsafe(
self._q.get(),
self._nats_loop
)
return await asyncio.wrap_future(cfut)

Properties:
• brak bezpośredniego dostępu do queue
• thread-safe
• brak locków


6.3 Backpressure strategies

Strategy A: Drop (UI)
• bounded queue
• overwrite / drop

Strategy B: Pause pull
• nie wysyłasz .NEXT
• JetStream buforuje

Strategy C: Spill
• durable buffer


7. ACK strategies (critical)

7.1 ACK-after-enqueue

Mechanism:
• enqueue in NATS loop
• immediate ACK

Pros:
• stabilność
• brak redelivery storm

Cons:
• possible data loss (unless durable queue)


7.2 ACK-after-process

Mechanism:
• ACK triggered by user

Pros:
• at-least-once

Cons:
• zależność od user-loop
• redelivery on freeze


7.3 Hybrid (recommended)

Use case Strategy
UI / telemetry enqueue-ack
jobs / tasks process-ack



8. Consumer scaling strategy

Problem:
• user tworzy setki subskrypcji

Solution: Fan-in

Single consumer:
• FilterSubjects lub wildcard
• routing lokalny

Benefits:
• mniej consumerów
• mniejsze obciążenie serwera

Trade-off:
• większe max_ack_pending
• lokalny routing complexity


9. Reconnect strategy

On reconnect:
1. invalidate:
• inbox
• pull state
2. restart:
• pull loop
• iterator bridge
3. ensure consumer (idempotent)


10. What threads actually solve (precisely)

Works:

Scenario Thread helps
Blocking I/O YES (GIL released)
sleep YES
short CPU bursts PARTIAL

Does NOT work:

Scenario Thread helps
long C-extension holding GIL NO
process freeze NO



11. Key insight (important)

Threading does NOT fix user bugs.
It isolates transport from user bugs.


12. Minimal correctness rules
1. Single pull-loop per consumer
2. Queue in NATS loop
3. No direct cross-thread queue access
4. Explicit ACK policy
5. Restart on reconnect
6. Dedup (stream sequence)


13. Optional improvements (advanced)
• batching delivery (reduce cross-thread overhead)
• coalescing (latest-value semantics)
• adaptive batch size
• latency watchdog (detect loop freeze)


Final takeaway

Jeśli chcesz stabilności:
• oddziel transport od usera
• ACK rób tam, gdzie masz kontrolę
• queue trzymaj tam, gdzie masz deterministykę

Reszta to tylko tuning.


Jeśli chcesz, mogę przerobić to na:
• formalny RFC (sekcje, diagramy, state machine)
• albo konkretny skeleton kodu dla serverish2 (gotowy do wklejenia)
Loading
Loading