Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "serverish"
version = "2.0.4"
version = "2.0.5"
description = "helpers for server alike projects"
authors = ["Mikołaj Kałuszyński", "MMME team"]
readme = "README.md"
Expand Down
9 changes: 9 additions & 0 deletions serverish/base/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ class MessengerNotConnected(Exception):
class MessengerReaderStopped(Exception):
pass

class MessengerReaderConfigError(ValueError):
"""Raised when a MsgReader is misconfigured (e.g. missing required start marker).

This is a fatal error — the same configuration will never succeed against
the NATS server. The reader stops and this exception propagates out of
``async for`` loops so callers fail loudly instead of retrying forever.
"""
pass

class MessengerReaderAlreadyOpen(RuntimeError):
pass

Expand Down
25 changes: 24 additions & 1 deletion serverish/messenger/msg_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@


import nats.errors
import nats.js.errors
import param
from nats.js import JetStreamContext
from nats.js.api import DeliverPolicy, ConsumerConfig

from serverish.base import wait_for_psce
from serverish.base.exceptions import (MessengerReaderStopped, MessengerNotConnected,
from serverish.base.exceptions import (MessengerReaderStopped, MessengerReaderConfigError,
MessengerNotConnected,
MessengerReaderAlreadyOpen, MessengerRequestTimeout)
from serverish.base.fifoset import FifoSet
from serverish.messenger import Messenger
Expand Down Expand Up @@ -105,6 +107,17 @@ def __init__(self, subject, parent = None,
super().__init__(subject=subject, parent=parent,
deliver_policy=deliver_policy, opt_start_time=opt_start_time, consumer_cfg=consumer_cfg_defaults,
**kwargs)
# Validate deliver_policy ↔ start-marker consistency up front so
# callers get a clear error at construction time rather than an
# opaque NATS 400 error buried inside the read loop.
if self.deliver_policy == 'by_start_time' and self.opt_start_time is None:
raise ValueError(
"deliver_policy='by_start_time' requires opt_start_time to be set"
)
if self.deliver_policy == 'by_start_sequence' and self.consumer_cfg.get('opt_start_seq') is None:
raise ValueError(
"deliver_policy='by_start_sequence' requires opt_start_seq to be set in consumer_cfg"
)
log.debug(f"Created {self}")


Expand Down Expand Up @@ -384,6 +397,16 @@ def logput(self, msg: str) -> None:
st.logput(f'{e.task}-err')
st.error = e.error
self._last_error = e.error # Track for health monitoring
# Fatal NATS API errors (4xx): the same request will never
# succeed — stop retrying immediately and surface a clear
# MessengerReaderConfigError to the caller.
if isinstance(e.error, (nats.js.errors.BadRequestError,
nats.js.errors.NotFoundError)):
log.error(st.fmt(f"fatal NATS error (not retrying): {e.error}"))
self._stop.set()
raise MessengerReaderConfigError(
f"Fatal NATS error on {self.subject}: {e.error}"
) from e.error
Comment thread
majkelx marked this conversation as resolved.
match self.error_behavior:
case 'RAISE':
log.error(st.fmt(f"raising read_next error: {e.error}"))
Expand Down
99 changes: 99 additions & 0 deletions tests/test_delivery_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from serverish.messenger import Messenger, get_publisher, get_reader
from serverish.messenger.msg_reader import MsgReader
from serverish.base.exceptions import MessengerReaderConfigError


@pytest.mark.nats
Expand Down Expand Up @@ -310,3 +311,101 @@ async def test_consumer_cfg_nonzero_microsecond_is_unchanged():
cfg = await rdr._create_consumer_cfg()

assert cfg.opt_start_time == '2026-04-25T16:00:00.123456Z'


# ---------------------------------------------------------------------------
# Unit tests for deliver_policy validation (no NATS needed)
# ---------------------------------------------------------------------------

def test_by_start_time_without_opt_start_time_raises():
"""MsgReader must raise ValueError at construction when deliver_policy='by_start_time'
but opt_start_time is not provided."""
m = Messenger()
with pytest.raises(ValueError, match="opt_start_time"):
MsgReader('test.unit', parent=m, deliver_policy='by_start_time')


def test_by_start_time_with_opt_start_time_succeeds():
"""MsgReader must succeed when deliver_policy='by_start_time' and opt_start_time is set."""
m = Messenger()
t = datetime.datetime(2026, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
rdr = MsgReader('test.unit', parent=m, deliver_policy='by_start_time', opt_start_time=t)
assert rdr.deliver_policy == 'by_start_time'


def test_by_start_sequence_without_opt_start_seq_raises():
"""MsgReader must raise ValueError at construction when deliver_policy='by_start_sequence'
but opt_start_seq is not provided."""
m = Messenger()
with pytest.raises(ValueError, match="opt_start_seq"):
MsgReader('test.unit', parent=m, deliver_policy='by_start_sequence')


def test_by_start_sequence_with_opt_start_seq_succeeds():
"""MsgReader must succeed when deliver_policy='by_start_sequence' and opt_start_seq is set."""
m = Messenger()
rdr = MsgReader('test.unit', parent=m, deliver_policy='by_start_sequence',
consumer_cfg={'opt_start_seq': 42})
assert rdr.deliver_policy == 'by_start_sequence'


def test_messenger_get_reader_by_start_time_without_time_raises():
"""Messenger.get_reader must raise ValueError when by_start_time is used without opt_start_time."""
with pytest.raises(ValueError, match="opt_start_time"):
Messenger.get_reader('test.unit', deliver_policy='by_start_time')


# ---------------------------------------------------------------------------
# Unit test: MessengerReaderConfigError is exported from serverish.base
# ---------------------------------------------------------------------------

def test_messenger_reader_config_error_is_exported():
"""MessengerReaderConfigError must be importable from serverish.base."""
from serverish.base import MessengerReaderConfigError as Err # noqa: F401
assert issubclass(Err, ValueError)


# ---------------------------------------------------------------------------
# Unit test: fatal NATS errors (BadRequestError / NotFoundError) raise
# MessengerReaderConfigError from __anext__ (no live NATS needed)
# ---------------------------------------------------------------------------

async def test_fatal_nats_bad_request_raises_config_error():
"""When open() raises nats.js.errors.BadRequestError the read loop must
stop and raise MessengerReaderConfigError instead of retrying forever."""
import nats.js.errors

m = Messenger()
t = datetime.datetime(2026, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
rdr = MsgReader('test.unit', parent=m, deliver_policy='by_start_time',
opt_start_time=t, error_behavior='WAIT')

# Patch open() so it raises BadRequestError immediately
async def _bad_open():
raise nats.js.errors.BadRequestError()

rdr.open = _bad_open

with pytest.raises(MessengerReaderConfigError):
async for _ in rdr:
pass # pragma: no cover


async def test_fatal_nats_not_found_raises_config_error():
"""When open() raises nats.js.errors.NotFoundError the read loop must
stop and raise MessengerReaderConfigError."""
import nats.js.errors

m = Messenger()
rdr = MsgReader('test.unit', parent=m, deliver_policy='all',
error_behavior='WAIT')

async def _not_found_open():
raise nats.js.errors.NotFoundError()

rdr.open = _not_found_open

with pytest.raises(MessengerReaderConfigError):
async for _ in rdr:
pass # pragma: no cover

9 changes: 7 additions & 2 deletions tests/test_messenger_issue5.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import nats
import pytest

from serverish.base.exceptions import MessengerReaderConfigError
from serverish.messenger import Messenger, get_reader


Expand All @@ -14,7 +15,11 @@ async def test_messenger_issue5_subject_not_in_stream(messenger):
reader.error_behavior = "RAISE"
try:
cfg = await reader.read_next()
except nats.js.errors.NotFoundError:
except MessengerReaderConfigError:
# "No stream found" is a fatal configuration error — it is surfaced as
# MessengerReaderConfigError (wrapping the original NotFoundError) so
# that callers get a clear, actionable exception instead of a raw NATS
# 404 that would otherwise retry forever in WAIT mode.
pass
else:
assert False, 'Should raise NotFoundError'
assert False, 'Should raise MessengerReaderConfigError'
Loading