Skip to content

Simplify GetMessagesForReplica to List<StoredMessages> and add ignorePositions#168

Merged
stidsborg merged 1 commit into
mainfrom
simplify-get-messages-for-replica
Jun 14, 2026
Merged

Simplify GetMessagesForReplica to List<StoredMessages> and add ignorePositions#168
stidsborg merged 1 commit into
mainfrom
simplify-get-messages-for-replica

Conversation

@stidsborg

Copy link
Copy Markdown
Owner

What

Simplifies IMessageStore.GetMessagesForReplica and lays the foundation for de-duplicating watchdog message delivery.

Before

Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(ReplicaId replicaId);

After

Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions);
  • New record StoredMessages(StoredId StoredId, List<StoredMessage> Messages) replaces the dictionary return shape.
  • ignorePositions excludes already-handled positions so they aren't re-delivered on subsequent ticks.
  • FlowsManager.Push now takes IReadOnlyList<StoredMessages>.

MessageWatchdog (version 1)

The watchdog keeps an ever-growing HashSet<long> of positions it has already pushed and passes it as ignorePositions each tick. This is a deliberate first step — it will be refined once the QueueManager reports the positions it has persisted into its effects, which will bound the set.

Store implementations

Each store filters the ignore-set with a stable, parameterized query so the plan is reused regardless of set size (no plan-cache churn from inlined literals):

Store Filter
PostgreSQL position != ALL($2) (bound array param)
SQL Server Position NOT IN (SELECT CAST(value AS BIGINT) FROM STRING_SPLIT(@IgnorePositions, ','))
MariaDB FIND_IN_SET(position, ?) = 0
InMemory HashSet lookup

The empty case is handled per dialect (Postgres empty array; SQL Server -1 sentinel in the parameter to avoid an empty-string CAST; MariaDB FIND_IN_SET(_, '') = 0 is naturally a no-op).

Tests

Adds GetMessagesForReplicaExcludesIgnoredPositions to the shared MessageStoreTests template, run across all four stores (InMemory, PostgreSQL, MariaDB, SQL Server). Covers:

  • empty ignore-set returns every message for the replica, grouped by flow and ordered by position;
  • messages owned by a different replica are never returned;
  • ignoring a position drops just that message;
  • ignoring all of a flow's positions drops the whole group.

All four pass against the real databases.

…Positions

Return a List<StoredMessages> (StoredId + its messages) instead of a
Dictionary<StoredId, List<StoredMessage>>, and accept an ignorePositions
parameter so already-pushed messages are not re-delivered.

MessageWatchdog (version 1) keeps an ever-growing set of pushed positions
and passes it each tick; this will be refined once the QueueManager reports
the positions it has persisted into its effects.

Stores filter the ignore-set via a stable, parameterized query to avoid
plan-cache churn: Postgres '!= ALL($2)', SQL Server STRING_SPLIT, MariaDB
FIND_IN_SET. Adds a cross-store test covering empty and non-empty ignore sets.
@stidsborg stidsborg merged commit 5496dd9 into main Jun 14, 2026
8 checks passed
@stidsborg stidsborg deleted the simplify-get-messages-for-replica branch June 14, 2026 09:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant