Simplify GetMessagesForReplica to List<StoredMessages> and add ignorePositions#168
Merged
Merged
Conversation
…Positions Return a List<StoredMessages> (StoredId + its messages) instead of a Dictionary<StoredId, List<StoredMessage>>, and accept an ignorePositions parameter so already-pushed messages are not re-delivered. MessageWatchdog (version 1) keeps an ever-growing set of pushed positions and passes it each tick; this will be refined once the QueueManager reports the positions it has persisted into its effects. Stores filter the ignore-set via a stable, parameterized query to avoid plan-cache churn: Postgres '!= ALL($2)', SQL Server STRING_SPLIT, MariaDB FIND_IN_SET. Adds a cross-store test covering empty and non-empty ignore sets.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Simplifies
IMessageStore.GetMessagesForReplicaand lays the foundation for de-duplicating watchdog message delivery.Before
After
record StoredMessages(StoredId StoredId, List<StoredMessage> Messages)replaces the dictionary return shape.ignorePositionsexcludes already-handled positions so they aren't re-delivered on subsequent ticks.FlowsManager.Pushnow takesIReadOnlyList<StoredMessages>.MessageWatchdog (version 1)
The watchdog keeps an ever-growing
HashSet<long>of positions it has already pushed and passes it asignorePositionseach tick. This is a deliberate first step — it will be refined once theQueueManagerreports the positions it has persisted into its effects, which will bound the set.Store implementations
Each store filters the ignore-set with a stable, parameterized query so the plan is reused regardless of set size (no plan-cache churn from inlined literals):
position != ALL($2)(bound array param)Position NOT IN (SELECT CAST(value AS BIGINT) FROM STRING_SPLIT(@IgnorePositions, ','))FIND_IN_SET(position, ?) = 0HashSetlookupThe empty case is handled per dialect (Postgres empty array; SQL Server
-1sentinel in the parameter to avoid an empty-stringCAST; MariaDBFIND_IN_SET(_, '') = 0is naturally a no-op).Tests
Adds
GetMessagesForReplicaExcludesIgnoredPositionsto the sharedMessageStoreTeststemplate, run across all four stores (InMemory, PostgreSQL, MariaDB, SQL Server). Covers:All four pass against the real databases.