Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
[TestMethod]
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());

[TestMethod]
public override Task CrashedReplicaMessagesAreFetched()
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessageReplicaCanBeReassigned()
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -1203,4 +1203,70 @@ await messageStore.ReplaceMessage(
var afterReplaceIdle = (await messageStore.GetMessages(idleFlow)).ToList();
afterReplaceIdle.Single(m => m.Position == idleMessages[1].Position).Replica.ShouldBe(publisher);
}

public abstract Task CrashedReplicaMessagesAreFetched();
protected async Task CrashedReplicaMessagesAreFetched(Task<IFunctionStore> functionStoreTask)
{
var functionStore = await functionStoreTask;
var messageStore = functionStore.MessageStore;
var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes();

var liveReplica = ReplicaId.NewId();
var crashedReplica1 = ReplicaId.NewId();
var crashedReplica2 = ReplicaId.NewId();

var flow1 = TestStoredId.Create();
var flow2 = TestStoredId.Create();

// owned by a live replica -> not crashed
await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: liveReplica));
// owned by crashed replicas -> crashed
await messageStore.AppendMessage(flow1, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica1));
await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica2));

var flow1Messages = await messageStore.GetMessages(flow1);
var flow2Messages = await messageStore.GetMessages(flow2);
var bPosition = flow1Messages.Single(m => m.Replica == crashedReplica1).Position;
var cPosition = flow2Messages.Single(m => m.Replica == crashedReplica2).Position;

var crashed = await messageStore.GetCrashedReplicaMessages(new HashSet<ReplicaId> { liveReplica });

crashed.Count.ShouldBe(2);
crashed.ShouldContain(t => t.StoredId == flow1 && t.Position == bPosition);
crashed.ShouldContain(t => t.StoredId == flow2 && t.Position == cPosition);
}

public abstract Task MessageReplicaCanBeReassigned();
protected async Task MessageReplicaCanBeReassigned(Task<IFunctionStore> functionStoreTask)
{
var functionStore = await functionStoreTask;
var messageStore = functionStore.MessageStore;
var stringType = typeof(string).SimpleQualifiedName().ToUtf8Bytes();

var crashedReplica = ReplicaId.NewId();
var newReplica = ReplicaId.NewId();
var otherReplica = ReplicaId.NewId();

var flow1 = TestStoredId.Create();
var flow2 = TestStoredId.Create();

await messageStore.AppendMessage(flow1, new StoredMessage("a".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica));
await messageStore.AppendMessage(flow2, new StoredMessage("b".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: crashedReplica));
// owned by a different replica -> must not be reassigned even though its position is included
await messageStore.AppendMessage(flow2, new StoredMessage("c".ToJson().ToUtf8Bytes(), stringType, Position: 0, Replica: otherReplica));

var aPosition = (await messageStore.GetMessages(flow1)).Single().Position;
var flow2Messages = await messageStore.GetMessages(flow2);
var bPosition = flow2Messages.Single(m => m.Replica == crashedReplica).Position;
var cPosition = flow2Messages.Single(m => m.Replica == otherReplica).Position;

await messageStore.SetReplica([aPosition, bPosition, cPosition], newReplica, expectedReplica: crashedReplica);

var afterFlow1 = await messageStore.GetMessages(flow1);
var afterFlow2 = await messageStore.GetMessages(flow2);

afterFlow1.Single().Replica.ShouldBe(newReplica);
afterFlow2.Single(m => m.Position == bPosition).Replica.ShouldBe(newReplica);
afterFlow2.Single(m => m.Position == cPosition).Replica.ShouldBe(otherReplica);
}
}
13 changes: 13 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,17 @@ public interface IMessageStore
/// Used by the MessageWatchdog to push messages to live flows owned by this replica.
/// </summary>
Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(ReplicaId replicaId);

/// <summary>
/// Returns the (flow, position) identifiers of the undelivered messages owned by a replica that is no
/// longer alive (its replica is not contained in <paramref name="liveReplicas"/>).
/// Used to detect messages stranded by crashed replicas so they can be re-assigned to a live replica via <see cref="SetReplica"/>.
/// </summary>
Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas);

/// <summary>
/// Re-assigns the messages at the provided positions to <paramref name="newReplica"/>,
/// but only those still owned by <paramref name="expectedReplica"/>.
/// </summary>
Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica);
}
27 changes: 27 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -688,5 +688,32 @@ public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForReplica(Rep
}
}

public Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
{
lock (_sync)
{
var result = new List<StoredIdAndPosition>();
foreach (var (storedId, messages) in _messages)
foreach (var (position, message) in messages.OrderBy(kv => kv.Key))
if (!liveReplicas.Contains(message.Replica))
result.Add(new StoredIdAndPosition(storedId, position));

return result.ToTask();
}
}

public Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
{
lock (_sync)
{
foreach (var position in positions)
foreach (var messages in _messages.Values)
if (messages.TryGetValue(position, out var message) && message.Replica == expectedReplica)
messages[position] = message with { Replica = newReplica };

return Task.CompletedTask;
}
}

#endregion
}
2 changes: 2 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Storage/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public record StoredException(string ExceptionMessage, string? ExceptionStackTra

public record StatusAndId(StoredId StoredId, Status Status, long Expiry);

public record StoredIdAndPosition(StoredId StoredId, long Position);

public record StoredEffectChange(
StoredId StoredId,
EffectId EffectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
[TestMethod]
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());

[TestMethod]
public override Task CrashedReplicaMessagesAreFetched()
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessageReplicaCanBeReassigned()
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task Initialize()
CREATE TABLE IF NOT EXISTS {_tablePrefix}_messages (
position BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id CHAR(32),
replica CHAR(32) NULL,
replica CHAR(32) NOT NULL,
content LONGBLOB,
INDEX {_tablePrefix}_messages_id_idx (id)
);";
Expand Down Expand Up @@ -212,6 +212,30 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
return storedMessages;
}

public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
{
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
await using var command = _sqlGenerator
.GetCrashedReplicaMessages(liveReplicas)
.ToSqlCommand(conn);

await using var reader = await command.ExecuteReaderAsync();
return await _sqlGenerator.ReadStoredIdAndPositions(reader);
}

public async Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
{
var positionsList = positions.ToList();
if (positionsList.Count == 0)
return;

await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
await using var command = _sqlGenerator
.SetReplica(positionsList, newReplica, expectedReplica)
.ToSqlCommand(conn);
await command.ExecuteNonQueryAsync();
}

public static StoredMessage ConvertToStoredMessage(byte[] content, long position, string? replica)
{
var arrs = BinaryPacker.Split(content, expectedPieces: 5);
Expand Down
42 changes: 42 additions & 0 deletions Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,48 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)
return StoreCommand.Create(sql, values: [ replicaId.AsGuid.ToString("N") ]);
}

public StoreCommand GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
{
var replicas = liveReplicas.Select(r => $"'{r.AsGuid:N}'").ToList();
var sql = @$"
SELECT id, position
FROM {tablePrefix}_messages
WHERE replica NOT IN ({replicas.StringJoin(", ")})";

return StoreCommand.Create(sql);
}

public async Task<List<StoredIdAndPosition>> ReadStoredIdAndPositions(MySqlDataReader reader)
{
var result = new List<StoredIdAndPosition>();
while (await reader.ReadAsync())
{
var id = reader.GetString(0).ToGuid().ToStoredId();
var position = reader.GetInt64(1);
result.Add(new StoredIdAndPosition(id, position));
}

return result;
}

public StoreCommand SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
{
var positionsList = positions.ToList();

var sql = @$"
UPDATE {tablePrefix}_messages
SET replica = ?
WHERE position IN ({string.Join(", ", positionsList.Select(_ => "?"))}) AND replica = ?";

var command = StoreCommand.Create(sql);
command.AddParameter(newReplica.AsGuid.ToString("N"));
foreach (var position in positionsList)
command.AddParameter(position);
command.AddParameter(expectedReplica.AsGuid.ToString("N"));

return command;
}

public StoreCommand GetMessages(IEnumerable<StoredId> storedIds)
{
var sql = @$"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
[TestMethod]
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());

[TestMethod]
public override Task CrashedReplicaMessagesAreFetched()
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessageReplicaCanBeReassigned()
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task Initialize()
CREATE TABLE IF NOT EXISTS {tablePrefix}_messages (
position BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID,
replica UUID NULL,
replica UUID NOT NULL,
content BYTEA
);
CREATE INDEX IF NOT EXISTS {tablePrefix}_messages_id_idx ON {tablePrefix}_messages (id);";
Expand Down Expand Up @@ -211,6 +211,26 @@ public async Task<Dictionary<StoredId, List<StoredMessage>>> GetMessagesForRepli
return storedMessages;
}

public async Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
{
await using var conn = await CreateConnection();
await using var command = sqlGenerator.GetCrashedReplicaMessages(liveReplicas).ToNpgsqlCommand(conn);

await using var reader = await command.ExecuteReaderAsync();
return await sqlGenerator.ReadStoredIdAndPositions(reader);
}

public async Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
{
var positionsArray = positions.ToArray();
if (positionsArray.Length == 0)
return;

await using var conn = await CreateConnection();
await using var command = sqlGenerator.SetReplica(positionsArray, newReplica, expectedReplica).ToNpgsqlCommand(conn);
await command.ExecuteNonQueryAsync();
}

public static StoredMessage ConvertToStoredMessage(byte[] content, long position, Guid? replica)
{
var arrs = BinaryPacker.Split(content, expectedPieces: 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,42 @@ public StoreCommand GetMessagesForReplica(ReplicaId replicaId)

return StoreCommand.Create(sql, values: [ replicaId.AsGuid ]);
}

public StoreCommand GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas)
{
var sql = @$"
SELECT id, position
FROM {tablePrefix}_messages
WHERE replica != ALL($1)";

return StoreCommand.Create(sql, values: [ liveReplicas.Select(r => r.AsGuid).ToArray() ]);
}

public async Task<List<StoredIdAndPosition>> ReadStoredIdAndPositions(NpgsqlDataReader reader)
{
var result = new List<StoredIdAndPosition>();
while (await reader.ReadAsync())
{
var id = reader.GetGuid(0).ToStoredId();
var position = reader.GetInt64(1);
result.Add(new StoredIdAndPosition(id, position));
}

return result;
}

public StoreCommand SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica)
{
var sql = @$"
UPDATE {tablePrefix}_messages
SET replica = $1
WHERE position = ANY($2) AND replica = $3";

return StoreCommand.Create(
sql,
values: [ newReplica.AsGuid, positions.ToArray(), expectedReplica.AsGuid ]
);
}

public async Task<Dictionary<StoredId, IReadOnlyList<StoredMessage>>> ReadMessagesForMultipleStores(NpgsqlDataReader reader)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,12 @@ public override Task ConcurrentBatchedMessagesToSameStoredIdAreAllAdded()
[TestMethod]
public override Task MessageReplicaIsTakenFromTargetFlowOwner()
=> MessageReplicaIsTakenFromTargetFlowOwner(FunctionStoreFactory.Create());

[TestMethod]
public override Task CrashedReplicaMessagesAreFetched()
=> CrashedReplicaMessagesAreFetched(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessageReplicaCanBeReassigned()
=> MessageReplicaCanBeReassigned(FunctionStoreFactory.Create());
}
Loading