From 5afe25edc937a98013366d6de0571efedd6efa06 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 14 Jun 2026 10:19:32 +0200 Subject: [PATCH 1/6] Add GetCrashedReplicaMessages and SetReplica to IMessageStore GetCrashedReplicaMessages returns the (flow, position) identifiers of undelivered messages owned by a replica that is no longer alive (not in the supplied live-replica set). SetReplica re-assigns the messages at the given positions to a new replica, guarded by an expected-replica check so a concurrent takeover is not clobbered. Implemented across the in-memory, PostgreSQL, MariaDB and SqlServer stores, with shared message-store tests covering crashed-replica fetching and guarded re-assignment. --- .../InMemoryTests/MessageStoreTests.cs | 8 +++ .../TestTemplates/MessageStoreTests.cs | 66 +++++++++++++++++++ .../Messaging/IMessageStore.cs | 16 ++++- .../Storage/InMemoryFunctionStore.cs | 28 ++++++++ .../Messaging/MessageStoreTests.cs | 8 +++ .../MariaDbMessageStore.cs | 24 +++++++ .../SqlGenerator.cs | 47 +++++++++++++ .../Messaging/MessageStoreTests.cs | 8 +++ .../PostgreSqlMessageStore.cs | 20 ++++++ .../SqlGenerator.cs | 37 +++++++++++ .../Messaging/MessageStoreTests.cs | 8 +++ .../SqlGenerator.cs | 48 ++++++++++++++ .../SqlServerMessageStore.cs | 20 ++++++ 13 files changed, 337 insertions(+), 1 deletion(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs index 5e530d3e..2c05ddcc 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs @@ -117,4 +117,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs index ee7b06ed..00e9ad28 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs @@ -1203,4 +1203,70 @@ await messageStore.ReplaceMessage( var afterReplaceIdle = (await messageStore.GetMessages(idleFlow)).ToList(); afterReplaceIdle.Single(m => m.Position == idleMessages[1].Position).Replica.ShouldBe(publisher); } + + public abstract Task CrashedReplicaMessagesAreFetched(); + protected async Task CrashedReplicaMessagesAreFetched(Task functionStoreTask) + { + var functionStore = await functionStoreTask; + var messageStore = functionStore.MessageStore; + var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes(); + + var liveReplica = ReplicaId.NewId(); + var crashedReplica1 = ReplicaId.NewId(); + var crashedReplica2 = ReplicaId.NewId(); + + var flow1 = TestStoredId.Create(); + var flow2 = TestStoredId.Create(); + + // owned by a live replica -> not crashed + await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: liveReplica)); + // owned by crashed replicas -> crashed + await messageStore.AppendMessage(flow1, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica1)); + await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica2)); + + var flow1Messages = await messageStore.GetMessages(flow1); + var flow2Messages = await messageStore.GetMessages(flow2); + var bPosition = flow1Messages.Single(m => m.Replica == crashedReplica1).Position; + var cPosition = flow2Messages.Single(m => m.Replica == crashedReplica2).Position; + + var crashed = await messageStore.GetCrashedReplicaMessages([liveReplica]); + + crashed.Count.ShouldBe(2); + crashed.ShouldContain(t => t.Item1 == flow1 && t.Item2 == bPosition); + crashed.ShouldContain(t => t.Item1 == flow2 && t.Item2 == cPosition); + } + + public abstract Task MessageReplicaCanBeReassigned(); + protected async Task MessageReplicaCanBeReassigned(Task functionStoreTask) + { + var functionStore = await functionStoreTask; + var messageStore = functionStore.MessageStore; + var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes(); + + var crashedReplica = ReplicaId.NewId(); + var newReplica = ReplicaId.NewId(); + var otherReplica = ReplicaId.NewId(); + + var flow1 = TestStoredId.Create(); + var flow2 = TestStoredId.Create(); + + await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica)); + await messageStore.AppendMessage(flow2, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica)); + // owned by a different replica -> must not be reassigned even though its position is included + await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: otherReplica)); + + var aPosition = (await messageStore.GetMessages(flow1)).Single().Position; + var flow2Messages = await messageStore.GetMessages(flow2); + var bPosition = flow2Messages.Single(m => m.Replica == crashedReplica).Position; + var cPosition = flow2Messages.Single(m => m.Replica == otherReplica).Position; + + await messageStore.SetReplica([aPosition, bPosition, cPosition], newReplica, expectedReplica: crashedReplica); + + var afterFlow1 = await messageStore.GetMessages(flow1); + var afterFlow2 = await messageStore.GetMessages(flow2); + + afterFlow1.Single().Replica.ShouldBe(newReplica); + afterFlow2.Single(m => m.Position == bPosition).Replica.ShouldBe(newReplica); + afterFlow2.Single(m => m.Position == cPosition).Replica.ShouldBe(otherReplica); + } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs index 0ec5b1c2..b6917ef5 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Storage; @@ -31,4 +32,17 @@ public interface IMessageStore /// Used by the MessageWatchdog to push messages to live flows owned by this replica. /// Task>> GetMessagesForReplica(ReplicaId replicaId); + + /// + /// Returns the (flow, position) identifiers of the undelivered messages owned by a replica that is no + /// longer alive (its replica is not contained in ). + /// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via . + /// + Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas); + + /// + /// Re-assigns the messages at the provided positions to , + /// but only those still owned by . + /// + Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index ac40e0a9..06f35071 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -688,5 +688,33 @@ public Task>> GetMessagesForReplica(Rep } } + public Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + var live = liveReplicas.ToHashSet(); + lock (_sync) + { + var result = new List>(); + foreach (var (storedId, messages) in _messages) + foreach (var (position, message) in messages.OrderBy(kv => kv.Key)) + if (!live.Contains(message.Replica)) + result.Add(Tuple.Create(storedId, position)); + + return result.ToTask(); + } + } + + public Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + lock (_sync) + { + foreach (var position in positions) + foreach (var messages in _messages.Values) + if (messages.TryGetValue(position, out var message) && message.Replica == expectedReplica) + messages[position] = message with { Replica = newReplica }; + + return Task.CompletedTask; + } + } + #endregion } \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs index f2d1d6b0..766d2312 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs @@ -112,4 +112,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs index caae8651..713a91ff 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs @@ -212,6 +212,30 @@ public async Task>> GetMessagesForRepli return storedMessages; } + public async Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); + await using var command = _sqlGenerator + .GetCrashedReplicaMessages(liveReplicas) + .ToSqlCommand(conn); + + await using var reader = await command.ExecuteReaderAsync(); + return await _sqlGenerator.ReadStoredIdAndPositions(reader); + } + + public async Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + if (positionsList.Count == 0) + return; + + await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); + await using var command = _sqlGenerator + .SetReplica(positionsList, newReplica, expectedReplica) + .ToSqlCommand(conn); + await command.ExecuteNonQueryAsync(); + } + public static StoredMessage ConvertToStoredMessage(byte[] content, long position, string? replica) { var arrs = BinaryPacker.Split(content, expectedPieces: 5); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs index 2da6b3a8..5cb0ae82 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs @@ -1,3 +1,4 @@ +using System; using System.Runtime.Serialization; using System.Text.Json; using Cleipnir.ResilientFunctions.Domain; @@ -671,6 +672,52 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]); } + public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList(); + var notInClause = replicas.Count == 0 + ? "" + : $" AND replica NOT IN ({replicas.StringJoin(", ")})"; + var sql = @$" + SELECT id, position + FROM {tablePrefix}_messages + WHERE replica IS NOT NULL{notInClause} + ORDER BY position;"; + + return StoreCommand.Create(sql); + } + + public async Task>> ReadStoredIdAndPositions(MySqlDataReader reader) + { + var result = new List>(); + while (await reader.ReadAsync()) + { + var id = reader.GetString(0).ToGuid().ToStoredId(); + var position = reader.GetInt64(1); + result.Add(Tuple.Create(id, position)); + } + + return result; + } + + public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + + var sql = @$" + UPDATE {tablePrefix}_messages + SET replica = ? + WHERE position IN ({string.Join(", ", positionsList.Select(_ => "?"))}) AND replica = ?"; + + var command = StoreCommand.Create(sql); + command.AddParameter(newReplica.AsGuid.ToString("N")); + foreach (var position in positionsList) + command.AddParameter(position); + command.AddParameter(expectedReplica.AsGuid.ToString("N")); + + return command; + } + public StoreCommand GetMessages(IEnumerable storedIds) { var sql = @$" diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs index ccafb1c6..594572c1 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs @@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index cc08bc5c..edd85a7b 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -211,6 +211,26 @@ public async Task>> GetMessagesForRepli return storedMessages; } + public async Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + await using var conn = await CreateConnection(); + await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn); + + await using var reader = await command.ExecuteReaderAsync(); + return await sqlGenerator.ReadStoredIdAndPositions(reader); + } + + public async Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsArray = positions.ToArray(); + if (positionsArray.Length == 0) + return; + + await using var conn = await CreateConnection(); + await using var command = sqlGenerator.SetReplica(positionsArray, newReplica, expectedReplica).ToNpgsqlCommand(conn); + await command.ExecuteNonQueryAsync(); + } + public static StoredMessage ConvertToStoredMessage(byte[] content, long position, Guid? replica) { var arrs = BinaryPacker.Split(content, expectedPieces: 5); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index 85e6cc62..3088d14c 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -594,6 +594,43 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return StoreCommand.Create(sql, values: [ replicaId.AsGuid ]); } + + public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + var sql = @$" + SELECT id, position + FROM {tablePrefix}_messages + WHERE replica IS NOT NULL AND replica != ALL($1) + ORDER BY position;"; + + return StoreCommand.Create(sql, values: [ liveReplicas.Select(r => r.AsGuid).ToArray() ]); + } + + public async Task>> ReadStoredIdAndPositions(NpgsqlDataReader reader) + { + var result = new List>(); + while (await reader.ReadAsync()) + { + var id = reader.GetGuid(0).ToStoredId(); + var position = reader.GetInt64(1); + result.Add(Tuple.Create(id, position)); + } + + return result; + } + + public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var sql = @$" + UPDATE {tablePrefix}_messages + SET replica = $1 + WHERE position = ANY($2) AND replica = $3"; + + return StoreCommand.Create( + sql, + values: [ newReplica.AsGuid, positions.ToArray(), expectedReplica.AsGuid ] + ); + } public async Task>> ReadMessagesForMultipleStores(NpgsqlDataReader reader) { diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs index aab8c6c2..f2fa56fa 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs @@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index fded7cdc..d3c52f6e 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -675,6 +675,54 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return command; } + public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + var replicas = liveReplicas.ToList(); + var notInClause = replicas.Count == 0 + ? "" + : $" AND Replica NOT IN ({replicas.Select((_, i) => $"@Replica{i}").StringJoin(", ")})"; + var sql = @$" + SELECT Id, Position + FROM {tablePrefix}_Messages + WHERE Replica IS NOT NULL{notInClause} + ORDER BY Position;"; + + var command = StoreCommand.Create(sql); + for (var i = 0; i < replicas.Count; i++) + command.AddParameter($"@Replica{i}", replicas[i].AsGuid); + return command; + } + + public async Task>> ReadStoredIdAndPositions(SqlDataReader reader) + { + var result = new List>(); + while (await reader.ReadAsync()) + { + var id = reader.GetGuid(0).ToStoredId(); + var position = reader.GetInt64(1); + result.Add(Tuple.Create(id, position)); + } + + return result; + } + + public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + var sql = @$" + UPDATE {tablePrefix}_Messages + SET Replica = @NewReplica + WHERE Position IN ({positionsList.Select((_, i) => $"@Position{i}").StringJoin(", ")}) AND Replica = @ExpectedReplica"; + + var command = StoreCommand.Create(sql); + command.AddParameter("@NewReplica", newReplica.AsGuid); + for (var i = 0; i < positionsList.Count; i++) + command.AddParameter($"@Position{i}", positionsList[i]); + command.AddParameter("@ExpectedReplica", expectedReplica.AsGuid); + + return command; + } + public async Task>> ReadStoredIdsMessages(SqlDataReader reader) { var messages = new Dictionary>(); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index db08ca23..33852818 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -221,6 +221,26 @@ public async Task>> GetMessagesForRepli return storedMessages; } + public async Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + { + await using var conn = await CreateConnection(); + await using var cmd = _sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToSqlCommand(conn); + await using var reader = await cmd.ExecuteReaderAsync(); + + return await _sqlGenerator.ReadStoredIdAndPositions(reader); + } + + public async Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + if (positionsList.Count == 0) + return; + + await using var conn = await CreateConnection(); + await using var command = _sqlGenerator.SetReplica(positionsList, newReplica, expectedReplica).ToSqlCommand(conn); + await command.ExecuteNonQueryAsync(); + } + public static StoredMessage ConvertToStoredMessage(byte[] content, long position, Guid? replica) { var arrs = BinaryPacker.Split(content, expectedPieces: 5); From 50a5d31dbbee8c28eb8a4aea87e0d3ba8a421429 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 14 Jun 2026 10:32:20 +0200 Subject: [PATCH 2/6] Replace Tuple with StoredIdAndPosition record Introduces a named StoredIdAndPosition record (in Storage/Types.cs) for the GetCrashedReplicaMessages result instead of the anonymous Tuple, giving the (flow, position) pair meaningful member names. --- .../Messaging/TestTemplates/MessageStoreTests.cs | 4 ++-- .../Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs | 5 ++--- .../Storage/InMemoryFunctionStore.cs | 6 +++--- Core/Cleipnir.ResilientFunctions/Storage/Types.cs | 2 ++ .../MariaDbMessageStore.cs | 2 +- .../Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs | 7 +++---- .../PostgreSqlMessageStore.cs | 2 +- .../Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs | 6 +++--- .../Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs | 6 +++--- .../SqlServerMessageStore.cs | 2 +- 10 files changed, 21 insertions(+), 21 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs index 00e9ad28..7c041097 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs @@ -1232,8 +1232,8 @@ protected async Task CrashedReplicaMessagesAreFetched(Task funct var crashed = await messageStore.GetCrashedReplicaMessages([liveReplica]); crashed.Count.ShouldBe(2); - crashed.ShouldContain(t => t.Item1 == flow1 && t.Item2 == bPosition); - crashed.ShouldContain(t => t.Item1 == flow2 && t.Item2 == cPosition); + crashed.ShouldContain(t => t.StoredId == flow1 && t.Position == bPosition); + crashed.ShouldContain(t => t.StoredId == flow2 && t.Position == cPosition); } public abstract Task MessageReplicaCanBeReassigned(); diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs index b6917ef5..a4047ceb 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Storage; @@ -38,7 +37,7 @@ public interface IMessageStore /// longer alive (its replica is not contained in ). /// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via . /// - Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas); + Task> GetCrashedReplicaMessages(IEnumerable liveReplicas); /// /// Re-assigns the messages at the provided positions to , diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 06f35071..8be578f0 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -688,16 +688,16 @@ public Task>> GetMessagesForReplica(Rep } } - public Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) { var live = liveReplicas.ToHashSet(); lock (_sync) { - var result = new List>(); + var result = new List(); foreach (var (storedId, messages) in _messages) foreach (var (position, message) in messages.OrderBy(kv => kv.Key)) if (!live.Contains(message.Replica)) - result.Add(Tuple.Create(storedId, position)); + result.Add(new StoredIdAndPosition(storedId, position)); return result.ToTask(); } diff --git a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs index 450de4e5..f88c9ec8 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs @@ -113,6 +113,8 @@ public record StoredException(string ExceptionMessage, string? ExceptionStackTra public record StatusAndId(StoredId StoredId, Status Status, long Expiry); +public record StoredIdAndPosition(StoredId StoredId, long Position); + public record StoredEffectChange( StoredId StoredId, EffectId EffectId, diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs index 713a91ff..a201a3ee 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs @@ -212,7 +212,7 @@ public async Task>> GetMessagesForRepli return storedMessages; } - public async Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public async Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) { await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); await using var command = _sqlGenerator diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs index 5cb0ae82..c1f2f00e 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs @@ -1,4 +1,3 @@ -using System; using System.Runtime.Serialization; using System.Text.Json; using Cleipnir.ResilientFunctions.Domain; @@ -687,14 +686,14 @@ WHERE replica IS NOT NULL{notInClause} return StoreCommand.Create(sql); } - public async Task>> ReadStoredIdAndPositions(MySqlDataReader reader) + public async Task> ReadStoredIdAndPositions(MySqlDataReader reader) { - var result = new List>(); + var result = new List(); while (await reader.ReadAsync()) { var id = reader.GetString(0).ToGuid().ToStoredId(); var position = reader.GetInt64(1); - result.Add(Tuple.Create(id, position)); + result.Add(new StoredIdAndPosition(id, position)); } return result; diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index edd85a7b..420f5211 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -211,7 +211,7 @@ public async Task>> GetMessagesForRepli return storedMessages; } - public async Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public async Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) { await using var conn = await CreateConnection(); await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index 3088d14c..2de5077a 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -606,14 +606,14 @@ public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplica return StoreCommand.Create(sql, values: [ liveReplicas.Select(r => r.AsGuid).ToArray() ]); } - public async Task>> ReadStoredIdAndPositions(NpgsqlDataReader reader) + public async Task> ReadStoredIdAndPositions(NpgsqlDataReader reader) { - var result = new List>(); + var result = new List(); while (await reader.ReadAsync()) { var id = reader.GetGuid(0).ToStoredId(); var position = reader.GetInt64(1); - result.Add(Tuple.Create(id, position)); + result.Add(new StoredIdAndPosition(id, position)); } return result; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index d3c52f6e..16e31d60 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -693,14 +693,14 @@ WHERE Replica IS NOT NULL{notInClause} return command; } - public async Task>> ReadStoredIdAndPositions(SqlDataReader reader) + public async Task> ReadStoredIdAndPositions(SqlDataReader reader) { - var result = new List>(); + var result = new List(); while (await reader.ReadAsync()) { var id = reader.GetGuid(0).ToStoredId(); var position = reader.GetInt64(1); - result.Add(Tuple.Create(id, position)); + result.Add(new StoredIdAndPosition(id, position)); } return result; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index 33852818..aba87473 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -221,7 +221,7 @@ public async Task>> GetMessagesForRepli return storedMessages; } - public async Task>> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public async Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) { await using var conn = await CreateConnection(); await using var cmd = _sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToSqlCommand(conn); From fb7fc069e56a134e1e92ae5d4d55417bd3843083 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 14 Jun 2026 10:39:52 +0200 Subject: [PATCH 3/6] Make message Replica column NOT NULL and simplify GetCrashedReplicaMessages The message replica is always populated (AppendMessage COALESCEs to the publisher replica, never null), so the column is now declared NOT NULL across all three stores. With non-null replicas the GetCrashedReplicaMessages query drops the redundant IS NOT NULL guard, the unneeded ORDER BY, and the empty-live-set conditional (the caller is always among the live replicas), leaving a plain WHERE replica NOT IN (...) / != ALL. --- .../MariaDbMessageStore.cs | 2 +- .../Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs | 6 +----- .../PostgreSqlMessageStore.cs | 2 +- .../Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs | 3 +-- .../Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs | 6 +----- .../SqlServerMessageStore.cs | 2 +- 6 files changed, 6 insertions(+), 15 deletions(-) diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs index a201a3ee..3aa4e32d 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs @@ -30,7 +30,7 @@ public async Task Initialize() CREATE TABLE IF NOT EXISTS {_tablePrefix}_messages ( position BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, id CHAR(32), - replica CHAR(32) NULL, + replica CHAR(32) NOT NULL, content LONGBLOB, INDEX {_tablePrefix}_messages_id_idx (id) );"; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs index c1f2f00e..fe784e65 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs @@ -674,14 +674,10 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) { var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList(); - var notInClause = replicas.Count == 0 - ? "" - : $" AND replica NOT IN ({replicas.StringJoin(", ")})"; var sql = @$" SELECT id, position FROM {tablePrefix}_messages - WHERE replica IS NOT NULL{notInClause} - ORDER BY position;"; + WHERE replica NOT IN ({replicas.StringJoin(", ")})"; return StoreCommand.Create(sql); } diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index 420f5211..ebeba7aa 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -39,7 +39,7 @@ public async Task Initialize() CREATE TABLE IF NOT EXISTS {tablePrefix}_messages ( position BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID, - replica UUID NULL, + replica UUID NOT NULL, content BYTEA ); CREATE INDEX IF NOT EXISTS {tablePrefix}_messages_id_idx ON {tablePrefix}_messages (id);"; diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index 2de5077a..aeb1445e 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -600,8 +600,7 @@ public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplica var sql = @$" SELECT id, position FROM {tablePrefix}_messages - WHERE replica IS NOT NULL AND replica != ALL($1) - ORDER BY position;"; + WHERE replica != ALL($1)"; return StoreCommand.Create(sql, values: [ liveReplicas.Select(r => r.AsGuid).ToArray() ]); } diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index 16e31d60..87a5d430 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -678,14 +678,10 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) { var replicas = liveReplicas.ToList(); - var notInClause = replicas.Count == 0 - ? "" - : $" AND Replica NOT IN ({replicas.Select((_, i) => $"@Replica{i}").StringJoin(", ")})"; var sql = @$" SELECT Id, Position FROM {tablePrefix}_Messages - WHERE Replica IS NOT NULL{notInClause} - ORDER BY Position;"; + WHERE Replica NOT IN ({replicas.Select((_, i) => $"@Replica{i}").StringJoin(", ")})"; var command = StoreCommand.Create(sql); for (var i = 0; i < replicas.Count; i++) diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index aba87473..bf7873dd 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -33,7 +33,7 @@ public async Task Initialize() CREATE TABLE {_tablePrefix}_Messages ( Position BIGINT IDENTITY(1,1) PRIMARY KEY, Id UNIQUEIDENTIFIER, - Replica UNIQUEIDENTIFIER NULL, + Replica UNIQUEIDENTIFIER NOT NULL, Content VARBINARY(MAX) ); CREATE INDEX {_tablePrefix}_Messages_Id ON {_tablePrefix}_Messages (Id);"; From 809ffc597466747ae046c5f1ad32dc9718575912 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 14 Jun 2026 10:44:00 +0200 Subject: [PATCH 4/6] Take IReadOnlySet for GetCrashedReplicaMessages liveReplicas A set is the natural type for the membership test and matches how live replicas are already represented elsewhere (ReplicaWatchdog builds a HashSet). The in-memory store now uses the set's Contains directly instead of copying into a local HashSet. --- .../Messaging/TestTemplates/MessageStoreTests.cs | 2 +- Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs | 2 +- .../Storage/InMemoryFunctionStore.cs | 5 ++--- .../MariaDbMessageStore.cs | 2 +- .../Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs | 2 +- .../PostgreSqlMessageStore.cs | 2 +- .../Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs | 2 +- .../Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs | 2 +- .../SqlServerMessageStore.cs | 2 +- 9 files changed, 10 insertions(+), 11 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs index 7c041097..cb00436a 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs @@ -1229,7 +1229,7 @@ protected async Task CrashedReplicaMessagesAreFetched(Task funct var bPosition = flow1Messages.Single(m => m.Replica == crashedReplica1).Position; var cPosition = flow2Messages.Single(m => m.Replica == crashedReplica2).Position; - var crashed = await messageStore.GetCrashedReplicaMessages([liveReplica]); + var crashed = await messageStore.GetCrashedReplicaMessages(new HashSet { liveReplica }); crashed.Count.ShouldBe(2); crashed.ShouldContain(t => t.StoredId == flow1 && t.Position == bPosition); diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs index a4047ceb..0c493485 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs @@ -37,7 +37,7 @@ public interface IMessageStore /// longer alive (its replica is not contained in ). /// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via . /// - Task> GetCrashedReplicaMessages(IEnumerable liveReplicas); + Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas); /// /// Re-assigns the messages at the provided positions to , diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 8be578f0..6d8f438b 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -688,15 +688,14 @@ public Task>> GetMessagesForReplica(Rep } } - public Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { - var live = liveReplicas.ToHashSet(); lock (_sync) { var result = new List(); foreach (var (storedId, messages) in _messages) foreach (var (position, message) in messages.OrderBy(kv => kv.Key)) - if (!live.Contains(message.Replica)) + if (!liveReplicas.Contains(message.Replica)) result.Add(new StoredIdAndPosition(storedId, position)); return result.ToTask(); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs index 3aa4e32d..5e4d80f8 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs @@ -212,7 +212,7 @@ public async Task>> GetMessagesForRepli return storedMessages; } - public async Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public async Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); await using var command = _sqlGenerator diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs index fe784e65..853fe6d6 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs @@ -671,7 +671,7 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]); } - public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) + public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList(); var sql = @$" diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index ebeba7aa..cde6e5c9 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -211,7 +211,7 @@ public async Task>> GetMessagesForRepli return storedMessages; } - public async Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public async Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { await using var conn = await CreateConnection(); await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index aeb1445e..4325ec67 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -595,7 +595,7 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return StoreCommand.Create(sql, values: [ replicaId.AsGuid ]); } - public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) + public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { var sql = @$" SELECT id, position diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index 87a5d430..fd749f8e 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -675,7 +675,7 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return command; } - public StoreCommand GetCrashedReplicaMessages(IEnumerable liveReplicas) + public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { var replicas = liveReplicas.ToList(); var sql = @$" diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index bf7873dd..9897834f 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -221,7 +221,7 @@ public async Task>> GetMessagesForRepli return storedMessages; } - public async Task> GetCrashedReplicaMessages(IEnumerable liveReplicas) + public async Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { await using var conn = await CreateConnection(); await using var cmd = _sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToSqlCommand(conn); From 8d999802eaf9af389837c9065324820342376173 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 14 Jun 2026 10:47:49 +0200 Subject: [PATCH 5/6] Use STRING_SPLIT for SqlServer GetCrashedReplicaMessages live-replica filter Collapses the generated @Replica0, @Replica1, ... parameters into a single comma-separated @Replicas parameter split server-side, matching the STRING_SPLIT convention already used for Id IN (...) across the store. --- .../Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index fd749f8e..2bbc7ecb 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -677,15 +677,13 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) { - var replicas = liveReplicas.ToList(); var sql = @$" SELECT Id, Position FROM {tablePrefix}_Messages - WHERE Replica NOT IN ({replicas.Select((_, i) => $"@Replica{i}").StringJoin(", ")})"; + WHERE Replica NOT IN (SELECT CAST(value AS UNIQUEIDENTIFIER) FROM STRING_SPLIT(@Replicas, ','))"; var command = StoreCommand.Create(sql); - for (var i = 0; i < replicas.Count; i++) - command.AddParameter($"@Replica{i}", replicas[i].AsGuid); + command.AddParameter("@Replicas", string.Join(",", liveReplicas.Select(r => r.AsGuid))); return command; } From d0ad047818e22988c42bfb74141825b5c6da0709 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 14 Jun 2026 10:57:28 +0200 Subject: [PATCH 6/6] Use STRING_SPLIT for SqlServer SetReplica positions filter Replaces the generated @Position0, @Position1, ... parameters with a single comma-separated @Positions parameter split server-side (CAST AS BIGINT), matching the STRING_SPLIT convention used elsewhere in the store. The message-store wrapper already short-circuits on an empty position list, so STRING_SPLIT never receives an empty string. --- .../Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index 2bbc7ecb..2a6a6ad6 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -702,16 +702,14 @@ public async Task> ReadStoredIdAndPositions(SqlDataRea public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) { - var positionsList = positions.ToList(); var sql = @$" UPDATE {tablePrefix}_Messages SET Replica = @NewReplica - WHERE Position IN ({positionsList.Select((_, i) => $"@Position{i}").StringJoin(", ")}) AND Replica = @ExpectedReplica"; + WHERE Position IN (SELECT CAST(value AS BIGINT) FROM STRING_SPLIT(@Positions, ',')) AND Replica = @ExpectedReplica"; var command = StoreCommand.Create(sql); command.AddParameter("@NewReplica", newReplica.AsGuid); - for (var i = 0; i < positionsList.Count; i++) - command.AddParameter($"@Position{i}", positionsList[i]); + command.AddParameter("@Positions", string.Join(",", positions)); command.AddParameter("@ExpectedReplica", expectedReplica.AsGuid); return command;