diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs index 5e530d3e..2c05ddcc 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageStoreTests.cs @@ -117,4 +117,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs index ee7b06ed..cb00436a 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs @@ -1203,4 +1203,70 @@ await messageStore.ReplaceMessage( var afterReplaceIdle = (await messageStore.GetMessages(idleFlow)).ToList(); afterReplaceIdle.Single(m => m.Position == idleMessages[1].Position).Replica.ShouldBe(publisher); } + + public abstract Task CrashedReplicaMessagesAreFetched(); + protected async Task CrashedReplicaMessagesAreFetched(Task functionStoreTask) + { + var functionStore = await functionStoreTask; + var messageStore = functionStore.MessageStore; + var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes(); + + var liveReplica = ReplicaId.NewId(); + var crashedReplica1 = ReplicaId.NewId(); + var crashedReplica2 = ReplicaId.NewId(); + + var flow1 = TestStoredId.Create(); + var flow2 = TestStoredId.Create(); + + // owned by a live replica -> not crashed + await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: liveReplica)); + // owned by crashed replicas -> crashed + await messageStore.AppendMessage(flow1, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica1)); + await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica2)); + + var flow1Messages = await messageStore.GetMessages(flow1); + var flow2Messages = await messageStore.GetMessages(flow2); + var bPosition = flow1Messages.Single(m => m.Replica == crashedReplica1).Position; + var cPosition = flow2Messages.Single(m => m.Replica == crashedReplica2).Position; + + var crashed = await messageStore.GetCrashedReplicaMessages(new HashSet { liveReplica }); + + crashed.Count.ShouldBe(2); + crashed.ShouldContain(t => t.StoredId == flow1 && t.Position == bPosition); + crashed.ShouldContain(t => t.StoredId == flow2 && t.Position == cPosition); + } + + public abstract Task MessageReplicaCanBeReassigned(); + protected async Task MessageReplicaCanBeReassigned(Task functionStoreTask) + { + var functionStore = await functionStoreTask; + var messageStore = functionStore.MessageStore; + var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes(); + + var crashedReplica = ReplicaId.NewId(); + var newReplica = ReplicaId.NewId(); + var otherReplica = ReplicaId.NewId(); + + var flow1 = TestStoredId.Create(); + var flow2 = TestStoredId.Create(); + + await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica)); + await messageStore.AppendMessage(flow2, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica)); + // owned by a different replica -> must not be reassigned even though its position is included + await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: otherReplica)); + + var aPosition = (await messageStore.GetMessages(flow1)).Single().Position; + var flow2Messages = await messageStore.GetMessages(flow2); + var bPosition = flow2Messages.Single(m => m.Replica == crashedReplica).Position; + var cPosition = flow2Messages.Single(m => m.Replica == otherReplica).Position; + + await messageStore.SetReplica([aPosition, bPosition, cPosition], newReplica, expectedReplica: crashedReplica); + + var afterFlow1 = await messageStore.GetMessages(flow1); + var afterFlow2 = await messageStore.GetMessages(flow2); + + afterFlow1.Single().Replica.ShouldBe(newReplica); + afterFlow2.Single(m => m.Position == bPosition).Replica.ShouldBe(newReplica); + afterFlow2.Single(m => m.Position == cPosition).Replica.ShouldBe(otherReplica); + } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs index 0ec5b1c2..0c493485 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs @@ -31,4 +31,17 @@ public interface IMessageStore /// Used by the MessageWatchdog to push messages to live flows owned by this replica. /// Task>> GetMessagesForReplica(ReplicaId replicaId); + + /// + /// Returns the (flow, position) identifiers of the undelivered messages owned by a replica that is no + /// longer alive (its replica is not contained in ). + /// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via . + /// + Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas); + + /// + /// Re-assigns the messages at the provided positions to , + /// but only those still owned by . + /// + Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index ac40e0a9..6d8f438b 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -688,5 +688,32 @@ public Task>> GetMessagesForReplica(Rep } } + public Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + lock (_sync) + { + var result = new List(); + foreach (var (storedId, messages) in _messages) + foreach (var (position, message) in messages.OrderBy(kv => kv.Key)) + if (!liveReplicas.Contains(message.Replica)) + result.Add(new StoredIdAndPosition(storedId, position)); + + return result.ToTask(); + } + } + + public Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + lock (_sync) + { + foreach (var position in positions) + foreach (var messages in _messages.Values) + if (messages.TryGetValue(position, out var message) && message.Replica == expectedReplica) + messages[position] = message with { Replica = newReplica }; + + return Task.CompletedTask; + } + } + #endregion } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs index 450de4e5..f88c9ec8 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/Types.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/Types.cs @@ -113,6 +113,8 @@ public record StoredException(string ExceptionMessage, string? ExceptionStackTra public record StatusAndId(StoredId StoredId, Status Status, long Expiry); +public record StoredIdAndPosition(StoredId StoredId, long Position); + public record StoredEffectChange( StoredId StoredId, EffectId EffectId, diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs index f2d1d6b0..766d2312 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/Messaging/MessageStoreTests.cs @@ -112,4 +112,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs index caae8651..5e4d80f8 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs @@ -30,7 +30,7 @@ public async Task Initialize() CREATE TABLE IF NOT EXISTS {_tablePrefix}_messages ( position BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, id CHAR(32), - replica CHAR(32) NULL, + replica CHAR(32) NOT NULL, content LONGBLOB, INDEX {_tablePrefix}_messages_id_idx (id) );"; @@ -212,6 +212,30 @@ public async Task>> GetMessagesForRepli return storedMessages; } + public async Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); + await using var command = _sqlGenerator + .GetCrashedReplicaMessages(liveReplicas) + .ToSqlCommand(conn); + + await using var reader = await command.ExecuteReaderAsync(); + return await _sqlGenerator.ReadStoredIdAndPositions(reader); + } + + public async Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + if (positionsList.Count == 0) + return; + + await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); + await using var command = _sqlGenerator + .SetReplica(positionsList, newReplica, expectedReplica) + .ToSqlCommand(conn); + await command.ExecuteNonQueryAsync(); + } + public static StoredMessage ConvertToStoredMessage(byte[] content, long position, string? replica) { var arrs = BinaryPacker.Split(content, expectedPieces: 5); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs index 2da6b3a8..853fe6d6 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs @@ -671,6 +671,48 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]); } + public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList(); + var sql = @$" + SELECT id, position + FROM {tablePrefix}_messages + WHERE replica NOT IN ({replicas.StringJoin(", ")})"; + + return StoreCommand.Create(sql); + } + + public async Task> ReadStoredIdAndPositions(MySqlDataReader reader) + { + var result = new List(); + while (await reader.ReadAsync()) + { + var id = reader.GetString(0).ToGuid().ToStoredId(); + var position = reader.GetInt64(1); + result.Add(new StoredIdAndPosition(id, position)); + } + + return result; + } + + public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + + var sql = @$" + UPDATE {tablePrefix}_messages + SET replica = ? + WHERE position IN ({string.Join(", ", positionsList.Select(_ => "?"))}) AND replica = ?"; + + var command = StoreCommand.Create(sql); + command.AddParameter(newReplica.AsGuid.ToString("N")); + foreach (var position in positionsList) + command.AddParameter(position); + command.AddParameter(expectedReplica.AsGuid.ToString("N")); + + return command; + } + public StoreCommand GetMessages(IEnumerable storedIds) { var sql = @$" diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs index ccafb1c6..594572c1 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/Messaging/MessageStoreTests.cs @@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index cc08bc5c..cde6e5c9 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -39,7 +39,7 @@ public async Task Initialize() CREATE TABLE IF NOT EXISTS {tablePrefix}_messages ( position BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID, - replica UUID NULL, + replica UUID NOT NULL, content BYTEA ); CREATE INDEX IF NOT EXISTS {tablePrefix}_messages_id_idx ON {tablePrefix}_messages (id);"; @@ -211,6 +211,26 @@ public async Task>> GetMessagesForRepli return storedMessages; } + public async Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + await using var conn = await CreateConnection(); + await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn); + + await using var reader = await command.ExecuteReaderAsync(); + return await sqlGenerator.ReadStoredIdAndPositions(reader); + } + + public async Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsArray = positions.ToArray(); + if (positionsArray.Length == 0) + return; + + await using var conn = await CreateConnection(); + await using var command = sqlGenerator.SetReplica(positionsArray, newReplica, expectedReplica).ToNpgsqlCommand(conn); + await command.ExecuteNonQueryAsync(); + } + public static StoredMessage ConvertToStoredMessage(byte[] content, long position, Guid? replica) { var arrs = BinaryPacker.Split(content, expectedPieces: 5); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index 85e6cc62..4325ec67 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -594,6 +594,42 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return StoreCommand.Create(sql, values: [ replicaId.AsGuid ]); } + + public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + var sql = @$" + SELECT id, position + FROM {tablePrefix}_messages + WHERE replica != ALL($1)"; + + return StoreCommand.Create(sql, values: [ liveReplicas.Select(r => r.AsGuid).ToArray() ]); + } + + public async Task> ReadStoredIdAndPositions(NpgsqlDataReader reader) + { + var result = new List(); + while (await reader.ReadAsync()) + { + var id = reader.GetGuid(0).ToStoredId(); + var position = reader.GetInt64(1); + result.Add(new StoredIdAndPosition(id, position)); + } + + return result; + } + + public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var sql = @$" + UPDATE {tablePrefix}_messages + SET replica = $1 + WHERE position = ANY($2) AND replica = $3"; + + return StoreCommand.Create( + sql, + values: [ newReplica.AsGuid, positions.ToArray(), expectedReplica.AsGuid ] + ); + } public async Task>> ReadMessagesForMultipleStores(NpgsqlDataReader reader) { diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs index aab8c6c2..f2fa56fa 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/Messaging/MessageStoreTests.cs @@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded() [TestMethod] public override Task MessageReplicaIsTakenFromTargetFlowOwner() => MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task CrashedReplicaMessagesAreFetched() + => CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create()); + + [TestMethod] + public override Task MessageReplicaCanBeReassigned() + => MessageReplicaCanBeReassigned(FunctionStoreFactory.Create()); } \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index fded7cdc..2a6a6ad6 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -675,6 +675,46 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId) return command; } + public StoreCommand GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + var sql = @$" + SELECT Id, Position + FROM {tablePrefix}_Messages + WHERE Replica NOT IN (SELECT CAST(value AS UNIQUEIDENTIFIER) FROM STRING_SPLIT(@Replicas, ','))"; + + var command = StoreCommand.Create(sql); + command.AddParameter("@Replicas", string.Join(",", liveReplicas.Select(r => r.AsGuid))); + return command; + } + + public async Task> ReadStoredIdAndPositions(SqlDataReader reader) + { + var result = new List(); + while (await reader.ReadAsync()) + { + var id = reader.GetGuid(0).ToStoredId(); + var position = reader.GetInt64(1); + result.Add(new StoredIdAndPosition(id, position)); + } + + return result; + } + + public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var sql = @$" + UPDATE {tablePrefix}_Messages + SET Replica = @NewReplica + WHERE Position IN (SELECT CAST(value AS BIGINT) FROM STRING_SPLIT(@Positions, ',')) AND Replica = @ExpectedReplica"; + + var command = StoreCommand.Create(sql); + command.AddParameter("@NewReplica", newReplica.AsGuid); + command.AddParameter("@Positions", string.Join(",", positions)); + command.AddParameter("@ExpectedReplica", expectedReplica.AsGuid); + + return command; + } + public async Task>> ReadStoredIdsMessages(SqlDataReader reader) { var messages = new Dictionary>(); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index db08ca23..9897834f 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -33,7 +33,7 @@ public async Task Initialize() CREATE TABLE {_tablePrefix}_Messages ( Position BIGINT IDENTITY(1,1) PRIMARY KEY, Id UNIQUEIDENTIFIER, - Replica UNIQUEIDENTIFIER NULL, + Replica UNIQUEIDENTIFIER NOT NULL, Content VARBINARY(MAX) ); CREATE INDEX {_tablePrefix}_Messages_Id ON {_tablePrefix}_Messages (Id);"; @@ -221,6 +221,26 @@ public async Task>> GetMessagesForRepli return storedMessages; } + public async Task> GetCrashedReplicaMessages(IReadOnlySet liveReplicas) + { + await using var conn = await CreateConnection(); + await using var cmd = _sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToSqlCommand(conn); + await using var reader = await cmd.ExecuteReaderAsync(); + + return await _sqlGenerator.ReadStoredIdAndPositions(reader); + } + + public async Task SetReplica(IEnumerable positions, ReplicaId newReplica, ReplicaId expectedReplica) + { + var positionsList = positions.ToList(); + if (positionsList.Count == 0) + return; + + await using var conn = await CreateConnection(); + await using var command = _sqlGenerator.SetReplica(positionsList, newReplica, expectedReplica).ToSqlCommand(conn); + await command.ExecuteNonQueryAsync(); + } + public static StoredMessage ConvertToStoredMessage(byte[] content, long position, Guid? replica) { var arrs = BinaryPacker.Split(content, expectedPieces: 5);