diff --git a/api.go b/api.go index eef03e5a..318a84e6 100644 --- a/api.go +++ b/api.go @@ -90,6 +90,22 @@ type Block interface { // Verify verifies the block by speculatively executing it on top of its ancestor. Verify(ctx context.Context) (VerifiedBlock, error) + + // non nil only for sealing blocks & first ever simplex block + SealingBlockInfo() *SealingBlockInfo +} + +type SealingBlockInfo struct { + // the new epoch number(aka the seq of the block this is in) + Epoch uint64 + // ValidatorSet of the new epoch + ValidatorSet Nodes + // PrevSealingBlockHash is the hash of the previous sealing block + PrevSealingBlockHash Digest +} + +func (s *SealingBlockInfo) String() string { + return fmt.Sprintf("Info: Epoch %d. Num Validators %d, PrevHash %s", s.Epoch, len(s.ValidatorSet), s.PrevSealingBlockHash) } type VerifiedBlock interface { @@ -100,6 +116,9 @@ type VerifiedBlock interface { // Bytes returns a byte encoding of the block Bytes() ([]byte, error) + + // non nil only for sealing blocks & first ever simplex block + SealingBlockInfo() *SealingBlockInfo } // BlockDeserializer deserializes blocks according to formatting diff --git a/block_scheduler.go b/block_scheduler.go index 6434d5bd..a57ccc74 100644 --- a/block_scheduler.go +++ b/block_scheduler.go @@ -121,6 +121,19 @@ func (bs *BlockDependencyManager) ExecuteEmptyRoundDependents(emptyRound uint64) bs.dependencies = remainingDeps } +func (bs *BlockDependencyManager) IsSequenceScheduled(seq uint64) bool { + bs.lock.Lock() + defer bs.lock.Unlock() + + for _, dep := range bs.dependencies { + if dep.blockSeq == seq { + return true + } + } + + return false +} + func (bs *BlockDependencyManager) ScheduleTaskWithDependencies(task Task, blockSeq uint64, prev *Digest, emptyRounds []uint64) error { bs.lock.Lock() defer bs.lock.Unlock() @@ -147,7 +160,7 @@ func (bs *BlockDependencyManager) ScheduleTaskWithDependencies(task Task, blockS return nil } - bs.logger.Debug("Adding block verification task with dependencies", zap.Any("prevBlock", prev), zap.Uint64s("emptyRounds", emptyRounds)) + bs.logger.Debug("Adding block verification task with dependencies", zap.Any("prevBlock", prev), zap.Uint64s("emptyRounds", emptyRounds), zap.Uint64("blockSeq", blockSeq)) emptyRoundsSet := make(map[uint64]struct{}) for _, round := range emptyRounds { emptyRoundsSet[round] = struct{}{} diff --git a/epoch.go b/epoch.go index 55d4a26e..b8f6e558 100644 --- a/epoch.go +++ b/epoch.go @@ -262,7 +262,7 @@ func (e *Epoch) maybeAssignDefaultConfig() error { e.MaxRebroadcastWait = DefaultEmptyVoteRebroadcastTimeout } if e.RandomSource == nil { - source, err := newRandomSource() + source, err := NewRandomSource() if err != nil { return err } @@ -3437,12 +3437,16 @@ func LeaderForRound(nodes []NodeID, r uint64) NodeID { } func Quorum(n int) int { - f := (n - 1) / 3 + f := F(n) // Obtained from the equation: // Quorum * 2 = N + F + 1 return (n+f)/2 + 1 } +func F(n int) int { + return (n - 1) / 3 +} + // messagesFromNode maps nodeIds to the messages it sent in a given round. type messagesFromNode map[string]map[uint64]*messagesForRound diff --git a/msg.go b/msg.go index 0456de2b..31b5d86f 100644 --- a/msg.go +++ b/msg.go @@ -320,17 +320,13 @@ func (q *QuorumRound) VerifyQCConsistentWithBlock() error { // String returns a string representation of the QuorumRound. // It is meant as a debugging aid for logs. -func (q *QuorumRound) String() string { - if q != nil { - err := q.IsWellFormed() - if err != nil { - return fmt.Sprintf("QuorumRound{Error: %s}", err) - } else { - return fmt.Sprintf("QuorumRound{Round: %d, Seq: %d, Finalized: %t}", q.GetRound(), q.GetSequence(), q.Finalization != nil) - } +func (q QuorumRound) String() string { + err := q.IsWellFormed() + if err != nil { + return fmt.Sprintf("QuorumRound{Error: %s}", err) + } else { + return fmt.Sprintf("QuorumRound{Round: %d, Seq: %d, Finalized: %t}", q.GetRound(), q.GetSequence(), q.Finalization != nil) } - - return "QuorumRound{nil}" } type VerifiedQuorumRound struct { diff --git a/nonvalidator/chain_helpers_test.go b/nonvalidator/chain_helpers_test.go new file mode 100644 index 00000000..daa494e6 --- /dev/null +++ b/nonvalidator/chain_helpers_test.go @@ -0,0 +1,222 @@ +package nonvalidator + +import ( + "context" + "fmt" + "testing" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +var testNodes = simplex.Nodes{ + {Node: simplex.NodeID{1}, Weight: 1}, + {Node: simplex.NodeID{2}, Weight: 1}, + {Node: simplex.NodeID{3}, Weight: 1}, + {Node: simplex.NodeID{4}, Weight: 1}, +} + +var genesis = testutil.NewTestBlock(simplex.ProtocolMetadata{ + Seq: 0, + Round: 0, + Epoch: 0, +}, simplex.Blacklist{}) + +type messageInfo struct { + msg *simplex.Message + from simplex.NodeID +} + +// send is a helper that calls HandleMessage on `nv` with `m`. +func (m *messageInfo) send(nv *NonValidator) error { + return nv.HandleMessage(m.msg, m.from) +} + +// testChain is a helper that book-keeps the current chain-tip, alongside any +// blocks and finalizations indexed on the chain. It allows easy creation of block and sealing blocks. +// It also manages validator sets, and the SignatureAggregatorCreator required by non-validators to verify finalizations. +type testChain struct { + *testutil.InMemStorage + t *testing.T + + // seq, epoch, prev, sealingBlockHash defines the current tip of testChain. + // the next block uses seq + 1, epoch, prevDigest = digest, etc... + seq uint64 + epoch uint64 + digest simplex.Digest + sealingBlockHash simplex.Digest + + validatorSets map[uint64]simplex.Nodes +} + +func (c *testChain) String() string { + return fmt.Sprintf("TestChain: Current Epoch: %d, Current Seq: %d", c.epoch, c.seq) +} + +// newSeededChain returns a testChain whose storage is indexed through seq=lastSeq: +// +// seq 0 — genesis (epoch 0) +// seq 1 — sealing block opening epoch 1 (validatorSet = testNodes) +// seq 2..lastSeq — epoch 1 blocks +func newSeededChain(t *testing.T, nodes simplex.Nodes, lastSeq uint64) *testChain { + require.GreaterOrEqual(t, lastSeq, uint64(1), "lastSeq must be >= 1 (0 is genesis, 1 is the first epoch's sealing block)") + + tc := &testChain{ + InMemStorage: testutil.NewInMemStorage(), + t: t, + digest: genesis.Digest, + validatorSets: make(map[uint64]simplex.Nodes), + seq: 0, // set for clarity + epoch: 0, + } + require.NoError(t, tc.Index(context.Background(), genesis, simplex.Finalization{})) + tc.validatorSets[0] = nodes + + // firstSimplex comes after genesis + sealingBlock := tc.appendFirstSimplexAfterGenesis(nodes) + finalization := tc.newFinalization(sealingBlock) + require.NoError(t, tc.Index(context.Background(), sealingBlock, finalization)) + + for tc.seq < lastSeq { + b := tc.appendBlock() + finalization := tc.newFinalization(b) + + require.NoError(t, tc.Index(context.Background(), b, finalization)) + } + + return tc +} + +// newSnowToSimplexChain returns a testChain whose storage is indexed through seq=lastSnowSeq: +func newSnowToSimplexChain(t *testing.T, lastSnowSeq uint64) *testChain { + require.GreaterOrEqual(t, lastSnowSeq, uint64(1), "genesis must be indexed") + + tc := &testChain{ + InMemStorage: testutil.NewInMemStorage(), + t: t, + digest: genesis.Digest, + validatorSets: make(map[uint64]simplex.Nodes), + seq: 0, + epoch: 0, + } + + // genesis + require.NoError(t, tc.Index(context.Background(), genesis, simplex.Finalization{})) + + for tc.seq < lastSnowSeq { + b := tc.appendBlock() + require.NoError(t, tc.Index(context.Background(), b, simplex.Finalization{})) + } + + require.Equal(t, tc.seq, lastSnowSeq) + return tc +} + +// appendBlock advances the chain by one non-sealing block in the current +// epoch. The block is constructed but NOT indexed. +func (tc *testChain) appendBlock() *testutil.TestBlock { + tc.seq++ + block := newBlock(tc.seq, tc.epoch, tc.digest) + tc.digest = block.Digest + return block +} + +func (tc *testChain) newFinalization(b simplex.VerifiedBlock) simplex.Finalization { + nodes, ok := tc.validatorSets[b.BlockHeader().Epoch] + require.True(tc.t, ok, "Validator set expected to have before creating a new finalization", b.BlockHeader().Epoch, b.BlockHeader().Seq) + sigAgg := tc.signatureAggregatorCreator(nodes) + finalization, _ := testutil.NewFinalizationRecord(tc.t, sigAgg, b, nodes.NodeIDs()) + + return finalization +} + +// appendSealing advances the chain by one sealing block announcing nextEpoch +// and validatorSet. The sealing block's metadata.Epoch is the current epoch; +// subsequent appendBlock calls live in nextEpoch. Not indexed. +func (tc *testChain) appendSealing(validatorSet simplex.Nodes) *sealingTestBlock { + tc.seq++ + + block := newSealingTestBlock(tc.seq, tc.epoch, tc.digest, &simplex.SealingBlockInfo{ + Epoch: tc.seq, + ValidatorSet: validatorSet, + PrevSealingBlockHash: tc.sealingBlockHash, + }) + tc.digest = block.Digest + tc.epoch = tc.seq + tc.validatorSets[tc.epoch] = validatorSet + tc.sealingBlockHash = block.Digest + return block +} + +// the first simplex block must return sealing block information +func (tc *testChain) appendFirstSimplexAfterGenesis(validatorSet simplex.Nodes) *sealingTestBlock { + lastBlock, _, err := tc.Retrieve(tc.seq) + tc.seq++ + firstEverEpoch := tc.seq + require.NoError(tc.t, err) + + block := newSealingTestBlock(tc.seq, firstEverEpoch, tc.digest, &simplex.SealingBlockInfo{ + Epoch: tc.seq, + ValidatorSet: validatorSet, + PrevSealingBlockHash: lastBlock.BlockHeader().Digest, + }) + tc.digest = block.Digest + tc.epoch = firstEverEpoch + tc.sealingBlockHash = block.Digest + tc.validatorSets[firstEverEpoch] = validatorSet + return block +} + +func (tc *testChain) signatureAggregatorCreator(nodes []simplex.Node) simplex.SignatureAggregator { + isQuorumFunc := func(signatures []simplex.NodeID) bool { + count := 0 + nodeSet := make(map[string]struct{}) + for _, node := range nodes { + nodeSet[node.Node.String()] = struct{}{} + } + + for _, sig := range signatures { + if _, ok := nodeSet[sig.String()]; ok { + count++ + } + } + + return count >= simplex.Quorum(len(nodes)) + } + return &testutil.TestSignatureAggregator{ + IsQuorumFunc: isQuorumFunc, + N: len(nodes), + } +} + +// addEpochs adds sealing blocks at epochs, and normal blocks in between +func (tc *testChain) addEpochs(epochs ...uint64) { + // ensure that the new epoch we are adding is not already indexed + require.Greater(tc.t, epochs[0], tc.seq) + + for _, epoch := range epochs { + for tc.seq < epoch-1 { + b := tc.appendBlock() + finalization := tc.newFinalization(b) + require.NoError(tc.t, tc.Index(context.Background(), b, finalization)) + } + validatorSet, ok := tc.validatorSets[tc.epoch] + require.True(tc.t, ok) + + newNodes := append(validatorSet, simplex.Node{ + Node: simplex.NodeID{byte(epoch)}, + Weight: 1, + }) + sealing := tc.appendSealing(newNodes) + finalization := tc.newFinalization(sealing) + require.NoError(tc.t, tc.Index(context.Background(), sealing, finalization)) + } +} + +func (tc *testChain) nodes() simplex.Nodes { + latestValidatorSet, ok := tc.validatorSets[tc.epoch] + require.True(tc.t, ok) + + return latestValidatorSet +} diff --git a/nonvalidator/comm_test.go b/nonvalidator/comm_test.go new file mode 100644 index 00000000..86acde51 --- /dev/null +++ b/nonvalidator/comm_test.go @@ -0,0 +1,137 @@ +package nonvalidator + +import ( + "bytes" + "sync" + "testing" + + "github.com/ava-labs/simplex" +) + +// nonValidatorResponderComm implements simplex.Communication and is used during tests +// to create responses to any requests a node sends or broadcasts. +type nonValidatorResponderComm struct { + t *testing.T + + // storage is used to create the responses + storage simplex.Storage + + // nodes should contain the validator set of the highest epoch + nodes simplex.Nodes + + // ID is the NodeID of the non-validator using this comm. Broadcasts + // pick the first node in `nodes` that is not equal to ID as the + // simulated responder. + ID simplex.NodeID + + // responses is the queue of synthesized responses. Tests will pop messages + // and feed entries into NonValidator.HandleMessage. + responsesLock sync.Mutex + responses []*messageInfo +} + +func newTestResponder(t *testing.T, myNodeID simplex.NodeID, tc *testChain) *nonValidatorResponderComm { + return &nonValidatorResponderComm{ + nodes: tc.nodes(), + t: t, + ID: myNodeID, + storage: tc, + } +} + +func (r *nonValidatorResponderComm) Nodes() simplex.Nodes { return r.nodes } + +// Enqueues a response coming from `destination`. +func (r *nonValidatorResponderComm) Send(msg *simplex.Message, destination simplex.NodeID) { + r.handle(msg, destination) +} + +// Enqueues responses coming from all other nodes in the network. +func (r *nonValidatorResponderComm) Broadcast(msg *simplex.Message) { + + for _, n := range r.nodes { + if bytes.Equal(n.Node, r.ID) { + continue + } + + r.handle(msg, n.Node) + } +} + +func (r *nonValidatorResponderComm) handle(msg *simplex.Message, from simplex.NodeID) { + switch { + case msg.BlockDigestRequest != nil: + r.respondToDigestRequest(msg.BlockDigestRequest, from) + case msg.ReplicationRequest != nil: + r.respondToReplicationRequest(msg.ReplicationRequest, from) + } +} + +func (r *nonValidatorResponderComm) clearResponses() { + r.responsesLock.Lock() + defer r.responsesLock.Unlock() + r.responses = []*messageInfo{} +} + +func (r *nonValidatorResponderComm) respondToDigestRequest(req *simplex.BlockDigestRequest, from simplex.NodeID) { + block, fin, err := r.storage.Retrieve(req.Seq) + if err != nil { + return + } + + // Empty digest should mean "send any block at this seq" + if req.Digest != (simplex.Digest{}) && block.BlockHeader().Digest != req.Digest { + return + } + + resp := &simplex.ReplicationResponse{ + Data: []simplex.QuorumRound{{Block: block.(simplex.Block), Finalization: &fin}}, + } + + r.enqueue(&messageInfo{msg: &simplex.Message{ReplicationResponse: resp}, from: from}) +} + +func (r *nonValidatorResponderComm) respondToReplicationRequest(req *simplex.ReplicationRequest, from simplex.NodeID) { + + resp := &simplex.ReplicationResponse{} + + for _, seq := range req.Seqs { + block, fin, err := r.storage.Retrieve(seq) + if err == nil { + resp.Data = append(resp.Data, simplex.QuorumRound{Block: block.(simplex.Block), Finalization: &fin}) + } + } + + if req.LatestFinalizedSeq > 0 && r.storage.NumBlocks() > 0 { + numBlocks := r.storage.NumBlocks() + if req.LatestFinalizedSeq < numBlocks-1 { + block, fin, err := r.storage.Retrieve(numBlocks - 1) + if err == nil { + resp.LatestSeq = &simplex.QuorumRound{Block: block.(simplex.Block), Finalization: &fin} + } + } + } + + if len(resp.Data) == 0 && resp.LatestSeq == nil { + return + } + + r.enqueue(&messageInfo{msg: &simplex.Message{ReplicationResponse: resp}, from: from}) +} + +func (r *nonValidatorResponderComm) enqueue(m *messageInfo) { + r.responsesLock.Lock() + defer r.responsesLock.Unlock() + r.responses = append(r.responses, m) +} + +func (r *nonValidatorResponderComm) popResponse() (*messageInfo, bool) { + r.responsesLock.Lock() + defer r.responsesLock.Unlock() + if len(r.responses) == 0 { + return nil, false + } + m := r.responses[0] + r.responses = r.responses[1:] + return m, true +} diff --git a/nonvalidator/epoch_replicator.go b/nonvalidator/epoch_replicator.go new file mode 100644 index 00000000..bc5e5226 --- /dev/null +++ b/nonvalidator/epoch_replicator.go @@ -0,0 +1,80 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package nonvalidator + +import ( + "github.com/ava-labs/simplex" + "go.uber.org/zap" +) + +type latestValidatorSetRetriever interface { + Nodes() simplex.Nodes +} + +// TODO: garbage collect map +type epochReplicator struct { + logger simplex.Logger + + // receivedSealingBlocks stores sealing blocks that have been received by our non-validator. + // It maps the epoch they are creating, to the NodeIds that have sent them to us(and which digest they sent). + // Once we have collected f+1 messages for a finalization, the sealing block for that epoch is validated. + sealingBlockResponses map[uint64]map[string]simplex.Digest + + // digests stores the metadata associated with a sealing block. + digests map[string]*simplex.QuorumRound + + // latestValidatorSetRetriever is used to calculate the threshold of votes needed to validate an epoch + latestValidatorSetRetriever latestValidatorSetRetriever +} + +func newEpochReplicator(logger simplex.Logger, validatorSetRetriever latestValidatorSetRetriever) *epochReplicator { + return &epochReplicator{ + digests: make(map[string]*simplex.QuorumRound), + sealingBlockResponses: make(map[uint64]map[string]simplex.Digest), + logger: logger, + latestValidatorSetRetriever: validatorSetRetriever, + } +} + +// collectedQuorumRound notes that an quorum round for an unknown epoch was received. If that quorum round +// was a sealing block we mark it and return if a threshold of responses was reached. otherwise, return false. +func (e *epochReplicator) collectedQuorumRound(qr *simplex.QuorumRound, from simplex.NodeID) bool { + if qr.Block.SealingBlockInfo() == nil { + return false + } + e.logger.Debug("Collected a quorum round with a sealing block", zap.Stringer("QR", qr), zap.Stringer("From", from)) + + sealingInfo := qr.Block.SealingBlockInfo() + threshold := simplex.F(len(e.latestValidatorSetRetriever.Nodes())) + 1 + epochResponses, ok := e.sealingBlockResponses[sealingInfo.Epoch] + digest := qr.Block.BlockHeader().Digest + if !ok { + digests := make(map[string]simplex.Digest) + digests[from.String()] = digest + e.sealingBlockResponses[sealingInfo.Epoch] = digests + } else { + epochResponses[string(from)] = digest + } + + e.digests[digest.String()] = qr + + // check if we have a threshold of responses + counts := make(map[string]uint64) + for _, digest := range epochResponses { + sDigest := digest.String() + _, ok := counts[sDigest] + if !ok { + counts[sDigest] = 1 + } else { + counts[sDigest] += 1 + } + + if counts[sDigest] >= uint64(threshold) { + e.logger.Info("We received enough messages to validate a higher epoch", zap.Stringer("EpochInfo", sealingInfo), zap.Int("Threshold", threshold), zap.Uint64("Responses", counts[sDigest])) + return true + } + } + + return false +} diff --git a/nonvalidator/epoch_replicator_test.go b/nonvalidator/epoch_replicator_test.go new file mode 100644 index 00000000..9335d2e9 --- /dev/null +++ b/nonvalidator/epoch_replicator_test.go @@ -0,0 +1,73 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package nonvalidator + +import ( + "testing" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +func newSealingQuorumRound(epoch uint64, numValidators int) *simplex.QuorumRound { + validatorSet := make(simplex.Nodes, numValidators) + for i := range validatorSet { + validatorSet[i] = simplex.Node{Node: simplex.NodeID{byte(i + 1)}, Weight: 1} + } + + return &simplex.QuorumRound{ + Block: newSealingTestBlock(1, epoch, simplex.Digest{}, &simplex.SealingBlockInfo{ + Epoch: epoch, + ValidatorSet: validatorSet, + }), + } +} + +type testValidatorSetRetriever struct { + nodes simplex.Nodes +} + +func (v *testValidatorSetRetriever) Nodes() simplex.Nodes { + return v.nodes +} + +// TestCollectedQuorumRound feeds an epochReplicator a sealing-block quorum round +// for an unknown epoch and asserts collectedQuorumRound only confirms the epoch +// once a threshold of distinct validators have voted for the same digest. +func TestCollectedQuorumRound(t *testing.T) { + tests := []struct { + name string + qr *simplex.QuorumRound + }{ + { + name: "4 validator nodes", + qr: newSealingQuorumRound(1, 4), + }, + { + name: "16 validator nodes", + qr: newSealingQuorumRound(1, 16), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + voters := tt.qr.Block.SealingBlockInfo().ValidatorSet + // votes required to confirm the epoch. + threshold := simplex.F(len(voters)) + 1 + require.GreaterOrEqual(t, len(voters), threshold, "need at least threshold validators to vote with") + e := newEpochReplicator(testutil.MakeLogger(t, 1), &testValidatorSetRetriever{ + nodes: voters, + }) + + // Each distinct vote below the threshold leaves the epoch unconfirmed. + for i := 0; i < threshold-1; i++ { + require.False(t, e.collectedQuorumRound(tt.qr, voters[i].Node)) + } + + // The threshold-th distinct vote for the same digest confirms it. + require.True(t, e.collectedQuorumRound(tt.qr, voters[threshold-1].Node)) + }) + } +} diff --git a/nonvalidator/epochs.go b/nonvalidator/epochs.go new file mode 100644 index 00000000..64ae0c85 --- /dev/null +++ b/nonvalidator/epochs.go @@ -0,0 +1,119 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package nonvalidator + +import ( + "bytes" + "errors" + + "github.com/ava-labs/simplex" +) + +var errNoGenesis = errors.New("No Genesis Found") + +type epochMetadata struct { + nodes simplex.Nodes + nodeLookup map[string]struct{} + epoch uint64 + signatureAggregator simplex.SignatureAggregator + prevSealingBlockHash simplex.Digest +} + +type epochs map[uint64]*epochMetadata + +func newEpochMetadata(sealingMetadata *simplex.SealingBlockInfo, sigCreator simplex.SignatureAggregatorCreator) *epochMetadata { + if sealingMetadata == nil { + return nil + } + + nodes := sealingMetadata.ValidatorSet + lookup := make(map[string]struct{}, len(nodes)) + for _, node := range nodes { + lookup[string(node.Node)] = struct{}{} + } + + return &epochMetadata{ + nodes: nodes, + nodeLookup: lookup, + epoch: sealingMetadata.Epoch, + signatureAggregator: sigCreator(nodes), + prevSealingBlockHash: sealingMetadata.PrevSealingBlockHash, + } +} + +// newEpochs creates a mapping of epoch numbers -> epoch metadata. The epoch metadata is used for verifying +// blocks and finalizations, and should only contain epochMetadata that we have validated. +func newEpochs(storage simplex.Storage, sigAggCreator simplex.SignatureAggregatorCreator) (epochs, error) { + lastBlockHeight := storage.NumBlocks() + if lastBlockHeight == 0 { + return nil, errNoGenesis + } + + lastBlock, _, err := storage.Retrieve(lastBlockHeight - 1) + if err != nil { + return nil, err + } + + epochs := make(map[uint64]*epochMetadata) + + // A zero Epoch means this is before the first ever Simplex block(ex. Genesis or Last Snowman Block) + if lastBlock.BlockHeader().Epoch == 0 { + return epochs, nil + } + + sealingBlock := lastBlock + if sealingBlock.SealingBlockInfo() == nil { + sealingBlock, _, err = storage.Retrieve(lastBlock.BlockHeader().Epoch) + if err != nil { + return nil, err + } + } + + lastAcceptedEpoch := newEpochMetadata(sealingBlock.SealingBlockInfo(), sigAggCreator) + epochs[lastAcceptedEpoch.epoch] = lastAcceptedEpoch + return epochs, nil +} + +func (e epochs) highestEpoch() (uint64, simplex.Nodes) { + highest := uint64(0) + nodes := []simplex.Node{} + for epoch, info := range e { + if epoch > highest { + highest = epoch + nodes = info.nodes + } + } + + return highest, nodes +} + +// removeOldEpochs deletes all epochs strictly less than startEpoch. +func (e epochs) removeOldEpochs(minEpochToKeep uint64) { + for epoch := range e { + if epoch < minEpochToKeep { + delete(e, epoch) + } + } +} + +func (e epochs) canValidate(block simplex.Block) bool { + if block.SealingBlockInfo() == nil { + return false + } + + _, ok := e[block.SealingBlockInfo().Epoch] + if ok { + // cannot validate twice + return false + } + + digest := block.BlockHeader().Digest + for _, md := range e { + if bytes.Equal(md.prevSealingBlockHash[:], digest[:]) { + // We have validated the next epoch, and the next epoch has a backward pointer to this one + return true + } + } + return false +} diff --git a/nonvalidator/epochs_test.go b/nonvalidator/epochs_test.go new file mode 100644 index 00000000..d7afe2e2 --- /dev/null +++ b/nonvalidator/epochs_test.go @@ -0,0 +1,228 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package nonvalidator + +import ( + "context" + "testing" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +// sealingTestBlock wraps testutil.TestBlock so tests can control whether the +// block reports itself as a sealing block. +type sealingTestBlock struct { + *testutil.TestBlock + sealingInfo *simplex.SealingBlockInfo +} + +func (b *sealingTestBlock) SealingBlockInfo() *simplex.SealingBlockInfo { + return b.sealingInfo +} + +func (b *sealingTestBlock) Verify(_ context.Context) (simplex.VerifiedBlock, error) { + return b, nil +} + +func newSealingTestBlock(seq, epoch uint64, prev simplex.Digest, sealingInfo *simplex.SealingBlockInfo) *sealingTestBlock { + return &sealingTestBlock{ + TestBlock: testutil.NewTestBlock(simplex.ProtocolMetadata{ + Seq: seq, + Round: seq, + Epoch: epoch, + Prev: prev, + }, simplex.Blacklist{}), + sealingInfo: sealingInfo, + } +} + +type indexedBlock struct { + seq uint64 + round uint64 + epoch uint64 + sealingInfo *simplex.SealingBlockInfo +} + +func TestNewEpochs(t *testing.T) { + // nonSimplexBlock represents a block from before the first ever Simplex block — + // i.e., genesis(epoch 0) or the last block produced by the prior consensus (Snowman). + // Its epoch is 0 and it has no sealing info. + var nonSimplexBlock = indexedBlock{seq: 0, round: 0, epoch: 0, sealingInfo: nil} + + nodes := simplex.Nodes{ + {Node: simplex.NodeID{1}}, + {Node: simplex.NodeID{2}}, + {Node: simplex.NodeID{3}}, + {Node: simplex.NodeID{4}}, + } + + sigAggCreator := func(n []simplex.Node) simplex.SignatureAggregator { + return &testutil.TestSignatureAggregator{N: len(n)} + } + + tests := []struct { + name string + blocks []indexedBlock + expectedErr error + expectedEpoch uint64 + expectEmpty bool + expectedLen int + }{ + { + name: "no last accepted", + blocks: nil, + expectedErr: errNoGenesis, + }, + { + // genesis is the only block — pre-Simplex, epoch 0. + name: "last accepted is genesis", + blocks: []indexedBlock{ + nonSimplexBlock, + }, + expectEmpty: true, + }, + { + // the latest block is itself a sealing block at seq 2. + name: "last accepted is sealing", + blocks: []indexedBlock{ + nonSimplexBlock, + {seq: 1, round: 1, epoch: 1, sealingInfo: &simplex.SealingBlockInfo{Epoch: 1, ValidatorSet: nodes}}, + {seq: 2, round: 2, epoch: 2, sealingInfo: &simplex.SealingBlockInfo{Epoch: 2, ValidatorSet: nodes}}, + }, + expectedEpoch: 2, + expectedLen: 1, + }, + { + // the latest block is not a sealing block; epoch field points back to + // the sealing block at seq 1. + name: "last accepted is not sealing", + blocks: []indexedBlock{ + nonSimplexBlock, + {seq: 1, round: 1, epoch: 1, sealingInfo: &simplex.SealingBlockInfo{Epoch: 1, ValidatorSet: nodes}}, + {seq: 2, round: 2, epoch: 1, sealingInfo: nil}, + {seq: 3, round: 3, epoch: 1, sealingInfo: nil}, + }, + expectedEpoch: 1, + expectedLen: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + storage := testutil.NewInMemStorage() + for _, b := range tt.blocks { + block := newSealingTestBlock(b.seq, b.epoch, simplex.Digest{}, b.sealingInfo) + require.NoError(t, storage.Index(context.Background(), block, simplex.Finalization{})) + } + + epochs, err := newEpochs(storage, sigAggCreator) + require.ErrorIs(t, err, tt.expectedErr) + if err != nil { + return + } + + if tt.expectEmpty { + require.Empty(t, epochs) + return + } + + require.Len(t, epochs, tt.expectedLen) + meta, ok := epochs[tt.expectedEpoch] + require.True(t, ok) + require.Equal(t, tt.expectedEpoch, meta.epoch) + require.Equal(t, nodes, meta.nodes) + require.Len(t, meta.nodeLookup, len(nodes)) + for _, n := range nodes { + _, ok := meta.nodeLookup[string(n.Node)] + require.True(t, ok) + } + require.NotNil(t, meta.signatureAggregator) + }) + } +} + +func TestRemoveOldEpochs(t *testing.T) { + newEpochsMap := func() epochs { + return epochs{ + 1: &epochMetadata{epoch: 1}, + 2: &epochMetadata{epoch: 2}, + 3: &epochMetadata{epoch: 3}, + 4: &epochMetadata{epoch: 4}, + } + } + + tests := []struct { + name string + startEpoch uint64 + expectedEpochs []uint64 + }{ + { + name: "remove none", + startEpoch: 0, + expectedEpochs: []uint64{1, 2, 3, 4}, + }, + { + name: "remove some", + startEpoch: 3, + expectedEpochs: []uint64{3, 4}, + }, + { + name: "remove all", + startEpoch: 5, + expectedEpochs: []uint64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := newEpochsMap() + e.removeOldEpochs(tt.startEpoch) + + require.Len(t, e, len(tt.expectedEpochs)) + for _, epoch := range tt.expectedEpochs { + _, ok := e[epoch] + require.True(t, ok, "expected epoch %d to remain", epoch) + } + }) + } +} + +// TestCanValidate checks whether the epochs struct properly validates higher epochs +// by keeping track of received sealing blocks. +func TestCanValidate(t *testing.T) { + tc := newSeededChain(t, testNodes, 2) + // create our epochs + e3, e4, e5 := tc.appendSealing(testNodes), tc.appendSealing(testNodes), tc.appendSealing(testNodes) + b6 := tc.appendBlock() + + sigAggCreator := func(n []simplex.Node) simplex.SignatureAggregator { + return &testutil.TestSignatureAggregator{N: len(n)} + } + + epochs, err := newEpochs(tc, sigAggCreator) + require.NoError(t, err) + + // should not be able to validate anything yet + require.False(t, epochs.canValidate(b6)) + require.False(t, epochs.canValidate(e5)) + require.False(t, epochs.canValidate(e4)) + require.False(t, epochs.canValidate(e3)) + + // say epoch 5 has been seen f + 1 times + epochs[e5.sealingInfo.Epoch] = newEpochMetadata(e5.SealingBlockInfo(), sigAggCreator) + + // we should be able to validate backwards now + require.False(t, epochs.canValidate(b6)) + require.False(t, epochs.canValidate(e5)) + require.True(t, epochs.canValidate(e4)) + require.False(t, epochs.canValidate(e3)) + + epochs[e4.sealingInfo.Epoch] = newEpochMetadata(e4.SealingBlockInfo(), sigAggCreator) + require.False(t, epochs.canValidate(b6)) + require.False(t, epochs.canValidate(e5)) + require.False(t, epochs.canValidate(e4)) // cannot validate twice + require.True(t, epochs.canValidate(e3)) +} diff --git a/nonvalidator/non_validator.go b/nonvalidator/non_validator.go new file mode 100644 index 00000000..e843cb9a --- /dev/null +++ b/nonvalidator/non_validator.go @@ -0,0 +1,571 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package nonvalidator + +import ( + "bytes" + "context" + "fmt" + "math/rand/v2" + "sync" + "time" + + "github.com/ava-labs/simplex" + "go.uber.org/zap" +) + +type finalizedSeq struct { + block simplex.Block + finalization *simplex.Finalization +} + +func (f *finalizedSeq) String() string { + seq := uint64(0) + digest := simplex.Digest{} + if f.block != nil { + seq = f.block.BlockHeader().Seq + digest = f.block.BlockHeader().Digest + } + if f.finalization != nil { + seq = f.finalization.Finalization.Seq + digest = f.finalization.Finalization.Digest + } + + return fmt.Sprintf("FinalizedSeq {BlockDigest: %s, Seq: %d, BlockExists %t, FinalizationExists %t}", digest, seq, f.block != nil, f.finalization != nil) +} + +type Config struct { + Storage simplex.Storage + Comm simplex.Communication + SignatureAggregatorCreator simplex.SignatureAggregatorCreator + + Logger simplex.Logger + + // How many sequences we allow to look past our next sequence to commit + MaxSequenceWindow uint64 + + // our node ID + ID simplex.NodeID + + StartTime time.Time + + // RandomSource is used by the replication state to pick which nodes to + // request sequences from. If nil, a cryptographically secure source is used. + RandomSource *rand.Rand +} + +type NonValidator struct { + Config + + lock *sync.Mutex + ctx context.Context + cancelCtx context.CancelFunc + haltedError error + + // ensures we only verify the same block one time + oneTimeVerifier *simplex.OneTimeVerifier + + // incompleteSequences stores sequences that we have not collected + // both a block and finalization for. Once both have been received, they are verified & indexed. + incompleteSequences map[uint64]*finalizedSeq + + // highestEpochCollector + highestEpochCollector *epochReplicator + + // sequence replication state + sequenceReplicator *simplex.ReplicationState + + // epochs contain a map of all epochs that have their validator set verified. + epochs epochs + + verifier *simplex.BlockDependencyManager +} + +// NewNonValidator creates a NonValidator with the given `config`. +func NewNonValidator(config Config) (*NonValidator, error) { + epochs, err := newEpochs(config.Storage, config.SignatureAggregatorCreator) + if err != nil { + return nil, err + } + + randomSource := config.RandomSource + if randomSource == nil { + randomSource, err = simplex.NewRandomSource() + if err != nil { + return nil, err + } + } + + ctx, cancelFunc := context.WithCancel(context.Background()) + scheduler := simplex.NewScheduler(config.Logger, simplex.DefaultProcessingBlocks) + + lock := &sync.Mutex{} + + replicator := simplex.NewReplicationState(config.Logger, config.Comm, config.ID, config.MaxSequenceWindow, true, config.StartTime, lock, randomSource) + + return &NonValidator{ + Config: config, + incompleteSequences: make(map[uint64]*finalizedSeq), + ctx: ctx, + cancelCtx: cancelFunc, + epochs: epochs, + verifier: simplex.NewBlockVerificationScheduler(config.Logger, simplex.DefaultProcessingBlocks, scheduler), + lock: lock, + highestEpochCollector: newEpochReplicator(config.Logger, config.Comm), + oneTimeVerifier: simplex.NewOneTimeVerifier(config.Logger), + sequenceReplicator: replicator, + }, nil +} + +func (n *NonValidator) Start() { + n.Logger.Info("Starting non-validator", zap.Stringer("ID", n.ID)) + n.broadcastLatestEpoch() +} + +func (n *NonValidator) Stop() { + n.Logger.Info("Shutting down non-validator", zap.Stringer("ID", n.ID)) + n.cancelCtx() + n.sequenceReplicator.Close() + n.verifier.Close() +} + +func (n *NonValidator) AdvanceTime(t time.Time) { + n.sequenceReplicator.AdvanceTime(t) +} + +// this function should be ran under a lock? +func (n *NonValidator) HandleMessage(msg *simplex.Message, from simplex.NodeID) error { + n.lock.Lock() + defer n.lock.Unlock() + + // A closed context means we have shut down. + if n.ctx.Err() != nil { + return nil + } + + if n.haltedError != nil { + return n.haltedError + } + + switch { + case msg.BlockDigestRequest != nil: + // TODO: it seems reasonable for our non-validator to be able to process these messages and send out responses. + return nil + case msg.BlockMessage != nil && msg.BlockMessage.Block != nil: + return n.handleBlock(msg.BlockMessage.Block, from) + case msg.Finalization != nil: + return n.handleFinalization(msg.Finalization, from) + case msg.ReplicationResponse != nil: + n.handleReplicationResponse(msg.ReplicationResponse, from) + return nil + default: + n.Logger.Debug("Received unexpected message", zap.Any("Message", msg), zap.Stringer("from", from)) + return nil + } + +} + +// handleBlock handles a block message. BlockMessages are sent when the leader proposes a block for its round. +// We only process blocks if they are from the leader and for the current epoch. +// Otherwise, we wait to process blocks until we receive a finalization. +func (n *NonValidator) handleBlock(block simplex.Block, from simplex.NodeID) error { + bh := block.BlockHeader() + n.Logger.Debug("Received a block message", zap.Uint64("Sequence", bh.Seq), zap.Stringer("From", from)) + + epoch, ok := n.epochs[bh.Epoch] + if !ok { + n.Logger.Debug("Received a block from an epoch we do not have", zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + if bh.Seq > n.MaxSequenceWindow+n.Storage.NumBlocks() { + n.Logger.Debug("Received a block from a sequence too far ahead", zap.Uint64("Num Blocks", n.Storage.NumBlocks()), zap.Uint64("Block Sequence", bh.Seq), zap.Stringer("From", from)) + return nil + } + + if !bytes.Equal(simplex.LeaderForRound(epoch.nodes.NodeIDs(), bh.Round), from) { + n.Logger.Debug("Received a block not from the leader of that round", zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + // If we have already verified the block discard it + if n.isAccepted(bh.Seq) { + n.Logger.Debug("Already accepted a block from this round") + return nil + } + + incomplete, ok := n.incompleteSequences[bh.Seq] + // we have not received any blocks or finalizations for this sequence + if !ok { + incompleteSeq := &finalizedSeq{ + block: block, + } + n.incompleteSequences[bh.Seq] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Sequence", incompleteSeq)) + return nil + } + + // Duplicate block, or finalization not yet received. + if incomplete.block != nil || incomplete.finalization == nil { + return nil + } + + if !bytes.Equal(incomplete.finalization.Finalization.Digest[:], bh.Digest[:]) { + n.Logger.Info( + "Received a block from the leader of a round whose digest mismatches the finalization", + zap.Stringer("Finalization Digest", incomplete.finalization.Finalization.Digest), + zap.Stringer("Block digest", bh.Digest), + zap.Stringer("From", from), + ) + return nil + } + + // add test that ensure this is here. otherwise i think an adversarial node can have us schedule many tasks + incomplete.block = block + + n.maybeValidateNextEpoch(block) + return n.scheduleNewFinalizedBlockTask(block, incomplete.finalization) +} + +func (n *NonValidator) isAccepted(seq uint64) bool { + return n.Storage.NumBlocks() > seq +} + +// newFinalizedBlockTask verifies and indexes the nextSeqToCommit. +// This task should only get executed when `block` is next to be verified and indexed. +func (n *NonValidator) newFinalizedBlockTask(block simplex.Block, finalization *simplex.Finalization) func() simplex.Digest { + return func() simplex.Digest { + md := block.BlockHeader() + n.Logger.Debug("Block verification started", zap.Uint64("sequence", md.Seq)) + start := time.Now() + defer func() { + elapsed := time.Since(start) + n.Logger.Debug("Block verification ended", zap.Uint64("sequence", md.Seq), zap.Duration("elapsed", elapsed)) + }() + + verifiedBlock, err := block.Verify(n.ctx) + // We have failed verifying a finalized block + if err != nil { + n.lock.Lock() + defer n.lock.Unlock() + + // defensive check in case we scheduled multiple finalized block verification tasks + if n.Storage.NumBlocks() != md.Seq { + n.Logger.Debug("Received finalized block that is not the next sequence to commit", + zap.Uint64("seq", md.Seq), zap.Uint64("height", n.Storage.NumBlocks())) + return md.Digest + } + + n.Logger.Info("Failed verifying a block that has a finalization", zap.Uint64("Block Seq", md.Seq), zap.Stringer("Block Digest", md.Digest), zap.Error(err)) + + n.sequenceReplicator.ResendFinalizationRequest(md.Seq, finalization.QC.Signers()) + return md.Digest + } + + n.lock.Lock() + defer n.lock.Unlock() + + // defensive check in case we scheduled multiple finalized block verification tasks + if n.Storage.NumBlocks() != md.Seq { + n.Logger.Debug("Received finalized block that is not the next sequence to commit", + zap.Uint64("seq", md.Seq), zap.Uint64("height", n.Storage.NumBlocks())) + return md.Digest + } + + if err := n.Storage.Index(n.ctx, verifiedBlock, *finalization); err != nil { + n.haltedError = err + n.Logger.Info("Failed indexing a block and finalization", zap.Uint64("Block Seq", md.Seq), zap.Stringer("Block Digest", md.Digest), zap.Error(err)) + return md.Digest + } + + n.Logger.Info("Verified and Indexed Block", zap.Uint64("Block Seq", md.Seq), zap.Stringer("Block Digest", md.Digest)) + + n.removeOldSequencesAndEpochs(md.Seq, md.Epoch) + + // in case we need to queue up any more tasks + if err := n.processReplicationState(); err != nil { + n.haltedError = err + n.Logger.Info("Failed calling process replication state", zap.Error(err)) + return md.Digest + } + + return md.Digest + } +} + +func (n *NonValidator) maybeValidateNextEpoch(block simplex.Block) { + sealingInfo := block.SealingBlockInfo() + if sealingInfo == nil { + return + } + _, alreadyValidated := n.epochs[sealingInfo.Epoch] + if alreadyValidated { + n.Logger.Info("Already validated.", zap.Uint64("Epoch", sealingInfo.Epoch)) + return + } + + n.Logger.Info("We have a valid sealing block, messages for that epoch can be processed.", zap.Uint64("Epoch", sealingInfo.Epoch)) + n.epochs[sealingInfo.Epoch] = newEpochMetadata(sealingInfo, n.SignatureAggregatorCreator) +} + +func (n *NonValidator) removeOldSequencesAndEpochs(lastCommittedSeq, minEpochToKeep uint64) { + for seq := range n.incompleteSequences { + if seq <= lastCommittedSeq { + delete(n.incompleteSequences, seq) + } + } + + n.epochs.removeOldEpochs(minEpochToKeep) +} + +// handleFinalization process a finalization message. If its for a future epoch, it will forward the finalization +// to the replication handler. +func (n *NonValidator) handleFinalization(finalization *simplex.Finalization, from simplex.NodeID) error { + bh := finalization.Finalization.BlockHeader + + n.Logger.Debug("Received a finalization", zap.Uint64("Seq", bh.Seq), zap.Stringer("From", from)) + + if n.isAccepted(bh.Seq) { + n.Logger.Debug("Received a stale finalization", zap.Uint64("Seq", bh.Seq), zap.Stringer("From", from)) + return nil + } + + epoch, ok := n.epochs[bh.Epoch] + if !ok { + // This finalization is after our lastAcceptedEpoch and is for an unknown Epoch, request that node to send us the sealing block. + n.Logger.Debug("Received a finalization from an unknown epoch", zap.Uint64("Unknown Epoch", bh.Epoch), zap.Stringer("From", from)) + n.sendRequest(bh.Epoch, from) + return nil + } + + if err := simplex.VerifyQC(finalization.QC, n.Logger, "Finalization", epoch.signatureAggregator.IsQuorum, epoch.nodeLookup, finalization, from); err != nil { + n.Logger.Debug("Received an invalid finalization", + zap.Int("round", int(bh.Round)), + zap.Stringer("NodeID", from)) + return nil + } + + // Don't store finalization in memory if it's too far ahead + if bh.Seq > n.MaxSequenceWindow+n.Storage.NumBlocks() { + n.Logger.Debug("Received a finalization from a sequence too far ahead", zap.Uint64("Num Blocks", n.Storage.NumBlocks()), zap.Uint64("Block Sequence", bh.Seq), zap.Stringer("From", from)) + n.sequenceReplicator.ReceivedFutureFinalization(finalization, n.nextSeqToCommit()) + return nil + } + + incomplete, ok := n.incompleteSequences[bh.Seq] + if !ok { + // we have not received anything for this sequence + incompleteSeq := &finalizedSeq{ + finalization: finalization, + } + n.incompleteSequences[bh.Seq] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Sequence", incompleteSeq)) + n.sequenceReplicator.ReceivedFutureFinalization(finalization, n.nextSeqToCommit()) + return nil + } + + // Duplicate finalization received. + if incomplete.finalization != nil { + // sanity check: should never happen. + if !bytes.Equal(incomplete.finalization.Finalization.Bytes(), finalization.Finalization.Bytes()) { + n.Logger.Warn( + "Mismatching finalizations", + zap.Uint64("Incoming Sequence", finalization.Finalization.Seq), + zap.Uint64("Stored sequence", incomplete.finalization.Finalization.Seq), + ) + n.haltedError = fmt.Errorf("Conflicting finalizations") + return fmt.Errorf("Conflicting finalizations") + } + + n.sequenceReplicator.ReceivedFutureFinalization(finalization, n.nextSeqToCommit()) + return nil + } + + incomplete.finalization = finalization + + // No block received yet for this sequence. + if incomplete.block == nil { + n.sequenceReplicator.ReceivedFutureFinalization(finalization, n.nextSeqToCommit()) + return nil + } + + digest := incomplete.block.BlockHeader().Digest + if !bytes.Equal(bh.Digest[:], digest[:]) { + n.Logger.Info( + "Received a block from the leader of a round whose digest mismatches the finalization", + zap.Stringer("Finalization Digest", bh.Digest), + zap.Stringer("Block digest", digest), + zap.Stringer("From", from), + ) + + n.sequenceReplicator.ReceivedFutureFinalization(finalization, n.nextSeqToCommit()) + return nil + } + + n.maybeValidateNextEpoch(incomplete.block) + return n.scheduleNewFinalizedBlockTask(incomplete.block, incomplete.finalization) +} + +func (n *NonValidator) scheduleNewFinalizedBlockTask(block simplex.Block, finalization *simplex.Finalization) error { + bh := finalization.Finalization.BlockHeader + if n.verifier.IsSequenceScheduled(bh.Seq) { + // Avoid scheduling more than one task. + // If verification during the task fails, we will try and reschedule. + return nil + } + + finalizedBlockTask := n.newFinalizedBlockTask(n.oneTimeVerifier.Wrap(block), finalization) + + var prev *simplex.Digest + if bh.Seq > 0 && !n.isAccepted(bh.Seq-1) { + prev = &bh.Prev + } + return n.verifier.ScheduleTaskWithDependencies(finalizedBlockTask, bh.Seq, prev, []uint64{}) +} + +func (n *NonValidator) handleReplicationResponse(resp *simplex.ReplicationResponse, from simplex.NodeID) error { + n.Logger.Debug("Received replication response", zap.Stringer("from", from), zap.Int("num seqs", len(resp.Data)), zap.Stringer("latest seq", resp.LatestSeq), zap.Stringer("From", from), zap.Stringers("Data", resp.Data)) + + for _, qr := range resp.Data { + if err := n.processQuorumRound(&qr, from); err != nil { + n.Logger.Debug("Failed processing quorum round", zap.Stringer("QR", qr), zap.Error(err)) + } + } + + if err := n.processQuorumRound(resp.LatestSeq, from); err != nil { + n.Logger.Debug("Failed processing latest seq", zap.Stringer("QR", resp.LatestSeq), zap.Error(err)) + } + + return n.processReplicationState() +} + +func (n *NonValidator) processReplicationState() error { + nextSeqToCommit := n.nextSeqToCommit() + n.sequenceReplicator.MaybeAdvanceState(nextSeqToCommit, 0, 0) + + // first we check if we can commit the next sequence, it is ok to try and commit the next sequence + // directly, since if there are any empty notarizations, `indexFinalization` will + // increment the round properly. + block, finalization, exists := n.sequenceReplicator.GetFinalizedBlockForSequence(nextSeqToCommit) + if !exists { + return nil + } + + // verify the finalization + epoch, ok := n.epochs[block.BlockHeader().Epoch] + if !ok { + return fmt.Errorf("expected epoch to have been validated: %d", block.BlockHeader().Epoch) + } + + err := simplex.VerifyQC(finalization.QC, n.Logger, "Finalization", epoch.signatureAggregator.IsQuorum, epoch.nodeLookup, finalization, simplex.NodeID{}) + if err != nil { + // We fetch from comm.Nodes instead of the nodes given in the finalization, because this node may give us an adversarial node list. + n.sequenceReplicator.ResendFinalizationRequest(block.BlockHeader().Seq, n.Comm.Nodes().NodeIDs()) + return nil + } + + n.sequenceReplicator.DeleteSeq(nextSeqToCommit) + + return n.scheduleNewFinalizedBlockTask(block, finalization) +} + +// This should only validate and store in the state. +func (n *NonValidator) processQuorumRound(qr *simplex.QuorumRound, from simplex.NodeID) error { + if qr == nil { + return nil + } + + if err := qr.VerifyQCConsistentWithBlock(); err != nil { + return err + } + + block := qr.Block + finalization := qr.Finalization + + // Non validators only process quorum rounds with finalizations + if finalization == nil { + return nil + } + + if n.isAccepted(block.BlockHeader().Seq) { + return fmt.Errorf("processing quorum round for a block we already indexed") + } + + epoch, ok := n.epochs[block.BlockHeader().Epoch] + if !ok { + n.Logger.Debug("Received a QR from an Epoch that we have not validated", zap.Uint64("Epoch", block.BlockHeader().Epoch), zap.Uint64("Block Seq", block.BlockHeader().Seq), zap.Stringer("Block digest", block.BlockHeader().Digest)) + n.sendRequest(qr.Block.BlockHeader().Epoch, from) + + // This block is in an epoch that we do not have. Therefore, we cannot verify its finalization. + // However, if it is a sealing block we may be able to validate the epoch if its part of the sealing block hash-chain. + if n.epochs.canValidate(block) { + n.Logger.Debug("We can validate an epoch block as we have validated the one after it.", zap.Stringer("Info", block.SealingBlockInfo())) + n.maybeValidateNextEpoch(block) + + // We are storing a quorum round with a finalization we have not yet verified. + // We do this to tell the replicator a valid sequence exists and to begin replication if necessary. + // We will check the validity when we process this round. + n.sequenceReplicator.StoreQuorumRound(qr) + return nil + } + + if n.highestEpochCollector.collectedQuorumRound(qr, from) { + n.Logger.Debug("We can validate an epoch because we have received a threshold of messages of it.", zap.Stringer("Info", block.SealingBlockInfo())) + n.maybeValidateNextEpoch(block) + + // We are storing a quorum round with a finalization we have not yet verified. + // We do this to tell the replicator a valid sequence exists and to begin replication if necessary. + // We will check the validity when we process this round. + n.sequenceReplicator.StoreQuorumRound(qr) + } + + return nil + } + + err := simplex.VerifyQC(qr.Finalization.QC, n.Logger, "Finalization", epoch.signatureAggregator.IsQuorum, epoch.nodeLookup, qr.Finalization, from) + if err != nil { + return fmt.Errorf("could not verify quorum round QC: %w", err) + } + + // This block could be a sealing block, validate the next epoch if so. + n.maybeValidateNextEpoch(block) + n.sequenceReplicator.StoreQuorumRound(qr) + return nil +} + +// TODO: add a re-broadcast timeout task until we have validated an epoch. +func (n *NonValidator) broadcastLatestEpoch() { + highestEpoch, _ := n.epochs.highestEpoch() + + // Sending a LatestFinalizedSeq of 0 gets ignored by validators. + if highestEpoch == 0 { + highestEpoch = 1 + } + + request := &simplex.ReplicationRequest{ + LatestFinalizedSeq: highestEpoch, + } + + n.Comm.Broadcast(&simplex.Message{ + ReplicationRequest: request, + }) +} + +// sendRequest sends a simplex.BlockDigestRequest for a given sequence to a node. +func (n *NonValidator) sendRequest(seq uint64, to simplex.NodeID) { + digestRequest := simplex.BlockDigestRequest{ + Seq: seq, + Digest: simplex.Digest{}, // TODO: In the epoch code, update how we process digests to not drop the request given an empty digest. + } + + n.Logger.Debug("Broadcasting sealing block request", zap.Uint64("Requesting Seq", seq)) + + n.Config.Comm.Send(&simplex.Message{ + BlockDigestRequest: &digestRequest, + }, to) +} + +func (n *NonValidator) nextSeqToCommit() uint64 { + return n.Storage.NumBlocks() +} diff --git a/nonvalidator/non_validator_test.go b/nonvalidator/non_validator_test.go new file mode 100644 index 00000000..9d821e29 --- /dev/null +++ b/nonvalidator/non_validator_test.go @@ -0,0 +1,642 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package nonvalidator + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +// errQC is a QuorumCertificate whose Verify always fails. Used to drive +// the "qc does not verify" code path without mutating message contents. +type errQC struct { + simplex.QuorumCertificate +} + +func (errQC) Verify([]byte) error { + return errors.New("qc verification failed") +} + +type TestConfig struct { + nodes simplex.Nodes + storage simplex.Storage + comm simplex.Communication + sigCreator simplex.SignatureAggregatorCreator +} + +func blockMessage(t *testing.T, block simplex.Block, from simplex.NodeID) *simplex.Message { + vote, err := testutil.NewTestVote(block, from) + require.NoError(t, err) + return &simplex.Message{ + BlockMessage: &simplex.BlockMessage{ + Block: block, + Vote: *vote, + }, + } +} + +func newBlock(seq, epoch uint64, prev simplex.Digest) *testutil.TestBlock { + return testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: seq, + Seq: seq, + Epoch: epoch, + Prev: prev, + }, simplex.Blacklist{}) +} + +// blockMsg wraps b in a messageInfo sent by the round leader. +func blockMsg(t *testing.T, b simplex.Block, nodes simplex.Nodes) *messageInfo { + leader := simplex.LeaderForRound(nodes.NodeIDs(), b.BlockHeader().Round) + return &messageInfo{ + msg: blockMessage(t, b, leader), + from: leader, + } +} + +// finalizationMsg mints a fresh Finalization for b signed by every node in +// nodes, and wraps it in a messageInfo sent by nodes[0]. It takes the block +// (not a pre-built finalization) so callers can chain straight from +// chain.appendBlock without keeping a separate finalization variable. +func finalizationMsg(t *testing.T, b simplex.VerifiedBlock, nodes simplex.Nodes) *messageInfo { + fin, _ := testutil.NewFinalizationRecord(t, &testutil.TestSignatureAggregator{}, b, nodes.NodeIDs()) + return &messageInfo{ + msg: &simplex.Message{Finalization: &fin}, + from: nodes.NodeIDs()[0], + } +} + +// TestHandleMessages drives the non-validator with blocks and finalizations +// arriving in various orders and asserts it verifies and indexes them up to +// the expected height, including across epoch transitions. +func TestHandleMessages(t *testing.T) { + // Each case starts from a chain seeded with seq 0 (genesis), seq 1 + // (sealing block opening epoch 1), and seq 2 (an epoch-1 block). + tests := []struct { + name string + setup func(t *testing.T) (*testChain, []*messageInfo) + expectedHeight uint64 + }{ + { + name: "blocks and finalizations out of order", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3, b4, b5 := tc.appendBlock(), tc.appendBlock(), tc.appendBlock() + + // finalization for seq 3 arrives last and should kick off + // verification for seqs 3-5. + return tc, []*messageInfo{ + blockMsg(t, b4, testNodes), + blockMsg(t, b3, testNodes), + finalizationMsg(t, b4, testNodes), + finalizationMsg(t, b5, testNodes), + blockMsg(t, b5, testNodes), + finalizationMsg(t, b3, testNodes), + } + }, + expectedHeight: 6, + }, + { + name: "nil block doesn't panic", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3 := tc.appendBlock() + msg := blockMsg(t, b3, testNodes) + msg.msg.BlockMessage.Block = nil + + return tc, []*messageInfo{msg} + }, + expectedHeight: 3, + }, + { + name: "same block doesn't get verified or indexed twice", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3, b4, b5 := tc.appendBlock(), tc.appendBlock(), tc.appendBlock() + + // sending a second b4 message will try and schedule the verification task for seq 4 again + // If the task is scheduled twice, the test will fail since storage will panic if we index the same seq twice. + return tc, []*messageInfo{ + blockMsg(t, b4, testNodes), + finalizationMsg(t, b4, testNodes), + blockMsg(t, b4, testNodes), + blockMsg(t, b3, testNodes), + finalizationMsg(t, b5, testNodes), + blockMsg(t, b5, testNodes), + finalizationMsg(t, b3, testNodes), + } + }, + expectedHeight: 6, + }, + { + name: "next block followed by finalization", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3 := tc.appendBlock() + + return tc, []*messageInfo{ + blockMsg(t, b3, testNodes), + finalizationMsg(t, b3, testNodes), + } + }, + expectedHeight: 4, + }, + { + name: "block message received from non leader", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3 := tc.appendBlock() + + blockMsg := blockMsg(t, b3, testNodes) + blockMsg.from = testNodes.NodeIDs()[0] + return tc, []*messageInfo{ + blockMsg, + finalizationMsg(t, b3, testNodes), + } + }, + expectedHeight: 3, + }, + { + name: "block digest mismatch with finalization", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3 := tc.appendBlock() + f3 := finalizationMsg(t, b3, testNodes) + + b3.Digest = simplex.Digest{} + blockMsg := blockMsg(t, b3, testNodes) + return tc, []*messageInfo{ + blockMsg, + f3, + } + }, + expectedHeight: 3, + }, + { + name: "qc does not verify", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + b3 := tc.appendBlock() + f3 := finalizationMsg(t, b3, testNodes) + f3.msg.Finalization.QC = errQC{QuorumCertificate: f3.msg.Finalization.QC} + + return tc, []*messageInfo{ + blockMsg(t, b3, testNodes), + f3, + } + }, + expectedHeight: 3, + }, + { + name: "multiple epochs", + setup: func(t *testing.T) (*testChain, []*messageInfo) { + tc := newSeededChain(t, testNodes, 2) + + var epoch3Nodes = simplex.Nodes{ + {Node: simplex.NodeID{1}, Weight: 1}, + {Node: simplex.NodeID{2}, Weight: 1}, + {Node: simplex.NodeID{3}, Weight: 1}, + {Node: simplex.NodeID{4}, Weight: 1}, + {Node: simplex.NodeID{5}, Weight: 1}, + } + + // Send the sealing block + finalization that transitions to epoch 3. + b3 := tc.appendSealing(epoch3Nodes) + // Blocks 4 and 5 live in epoch 3 and are finalized by the new validator set. + b4, b5 := tc.appendBlock(), tc.appendBlock() + // Block 6 is another sealing epoch + b6 := tc.appendSealing(testNodes) + // Block 7 is part of the new epoch + b7 := tc.appendBlock() + + return tc, []*messageInfo{ + blockMsg(t, b3, testNodes), + finalizationMsg(t, b3, testNodes), + finalizationMsg(t, b4, epoch3Nodes), + finalizationMsg(t, b5, epoch3Nodes), + blockMsg(t, b4, epoch3Nodes), + blockMsg(t, b5, epoch3Nodes), + blockMsg(t, b6, epoch3Nodes), + finalizationMsg(t, b6, epoch3Nodes), + blockMsg(t, b7, testNodes), + finalizationMsg(t, b7, testNodes), + } + }, + expectedHeight: 8, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc, msgs := tt.setup(t) + require.Equal(t, uint64(3), tc.NumBlocks()) + nv, err := NewNonValidator( + Config{ + Storage: tc, + Comm: testutil.NewNoopComm(tc.nodes().NodeIDs()), + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: simplex.DefaultMaxRoundWindow, + ID: testNodes[0].Node, + }, + ) + require.NoError(t, err) + + for _, m := range msgs { + require.NoError(t, m.send(nv)) + } + + tc.WaitForBlockCommit(tt.expectedHeight - 1) + require.Equal(t, tt.expectedHeight, nv.Storage.NumBlocks()) + }) + } +} + +func TestNonValidator_StopsGracefully(t *testing.T) { + tc := newSeededChain(t, testNodes, 2) + nv, err := NewNonValidator( + Config{ + Storage: tc, + Comm: testutil.NewNoopComm(tc.nodes().NodeIDs()), + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: simplex.DefaultMaxRoundWindow, + ID: testNodes[0].Node, + }, + ) + require.NoError(t, err) + + nv.Start() + nv.Stop() + + b3 := tc.appendBlock() + require.NoError(t, blockMsg(t, b3, testNodes).send(nv)) + require.NoError(t, finalizationMsg(t, b3, testNodes).send(nv)) + + require.Never(t, + func() bool { + return nv.Storage.NumBlocks() > 3 + }, + 2*time.Second, + 50*time.Millisecond, + ) +} + +// TestHandleMessages_DuplicateBlock tests that when a duplicate block is received, the block is verify & indexed only once +func TestHandleMessages_DuplicateBlock(t *testing.T) { + tc := newSeededChain(t, testNodes, 2) + nv, err := NewNonValidator( + Config{ + Storage: tc, + Comm: testutil.NewNoopComm(tc.nodes().NodeIDs()), + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: simplex.DefaultMaxRoundWindow, + ID: testNodes[0].Node, + }, + ) + require.NoError(t, err) + + nv.Start() + defer nv.Stop() + + // Send the sealing block + finalization that transitions to epoch 3. + b3 := tc.appendBlock() + require.NoError(t, blockMsg(t, b3, testNodes).send(nv)) + require.NoError(t, finalizationMsg(t, b3, testNodes).send(nv)) + + tc.WaitForBlockCommit(3) + + // Storage will panic if we try indexing the same block twice + require.NoError(t, blockMsg(t, b3, testNodes).send(nv)) + require.NoError(t, finalizationMsg(t, b3, testNodes).send(nv)) +} + +// TestNonValidator_RequestHighestEpochOnStart verifies that a non-validator +// starting behind the network issues a replication request for the highest +// epoch on startup. +func TestNonValidator_RequestHighestEpochOnStart(t *testing.T) { + tc := newSeededChain(t, testNodes, 2) + tc.addEpochs(4, 8) + responder := newTestResponder(t, testNodes.NodeIDs()[0], tc) + + nvStorage := tc.CloneUntil(2) + nv, err := NewNonValidator( + Config{ + Storage: nvStorage, + Comm: responder, + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: simplex.DefaultMaxRoundWindow, + ID: responder.ID, + }, + ) + + require.NoError(t, err) + + nv.Start() + defer nv.Stop() + + msg, ok := responder.popResponse() + require.True(t, ok) + require.NotNil(t, msg.msg.ReplicationResponse) + require.NotNil(t, msg.msg.ReplicationResponse.LatestSeq) +} + +// TestNonValidator_Bootstrap ensures a non-validator can replicate sequences given different states of the chain. +func TestNonValidator_Bootstrap(t *testing.T) { + tests := []struct { + name string + // setup builds the full network chain the non-validator replicates from. + setup func(t *testing.T) *testChain + maxSequenceWindow uint64 + // initialHeight is the number of blocks the non-validator's storage is + // seeded with before replication begins. + initialHeight uint64 + lastSeq uint64 + }{ + { + // Replicates multiple epochs within a single replication window. + name: "replicates epochs", + setup: func(t *testing.T) *testChain { + tc := newSeededChain(t, testNodes, 2) + tc.addEpochs(5, 10, 20, 30, 40) + return tc + }, + maxSequenceWindow: 50, + initialHeight: 2, + lastSeq: 40, + }, + { + // A small window forces replication well past the max round window. + name: "past max round window", + setup: func(t *testing.T) *testChain { + tc := newSeededChain(t, testNodes, 2) + tc.addEpochs(5, 10, 20, 30, 40, 50, 60, 80, 100) + return tc + }, + maxSequenceWindow: 5, // significantly lower + initialHeight: 2, + lastSeq: 100, + }, + { + // Storage starts with only the genesis block. + name: "from genesis", + setup: func(t *testing.T) *testChain { + tc := newSeededChain(t, testNodes, 2) + tc.addEpochs(5, 10, 20) + return tc + }, + maxSequenceWindow: 50, + initialHeight: 1, // genesis is the only block + lastSeq: 20, + }, + { + // Replicates a chain converted from snowman to simplex. + name: "converted simplex chain", + setup: func(t *testing.T) *testChain { + tc := newSnowToSimplexChain(t, 10) + firstBlock := tc.appendFirstSimplexAfterGenesis(testNodes) + tc.Index(context.Background(), firstBlock, tc.newFinalization(firstBlock)) + return tc + }, + maxSequenceWindow: 50, + initialHeight: 11, // lastSnowmanSeq + 1 + lastSeq: 11, + }, + { + // Converted chain that then spans several simplex epochs. + name: "converted simplex chain many epochs", + setup: func(t *testing.T) *testChain { + tc := newSnowToSimplexChain(t, 10) + firstBlock := tc.appendFirstSimplexAfterGenesis(testNodes) + tc.Index(context.Background(), firstBlock, tc.newFinalization(firstBlock)) + tc.addEpochs(20, 30) + return tc + }, + maxSequenceWindow: 50, + initialHeight: 11, // lastSnowmanSeq + 1 + lastSeq: 30, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := tt.setup(t) + responder := newTestResponder(t, testNodes.NodeIDs()[0], tc) + nvStorage := tc.CloneUntil(tt.initialHeight) + require.Equal(t, tt.initialHeight, nvStorage.NumBlocks()) + + nv, err := NewNonValidator( + Config{ + Storage: nvStorage, + Comm: responder, + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: tt.maxSequenceWindow, + ID: responder.ID, + StartTime: time.Now(), + }, + ) + require.NoError(t, err) + + nv.Start() + defer nv.Stop() + + advanceUntil(nv, responder, tt.lastSeq) + }) + } +} + +func TestNonValidator_ReplicationRequests(t *testing.T) { + tc := newSeededChain(t, testNodes, 2) + lastSeq := uint64(40) + initialHeight := uint64(2) + tc.addEpochs(5, 10, 20, 30, lastSeq) + responder := newTestResponder(t, testNodes.NodeIDs()[0], tc) + nvStorage := tc.CloneUntil(initialHeight) + startTime := time.Now() + nv, err := NewNonValidator( + Config{ + Storage: nvStorage, + Comm: responder, + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: 50, + ID: responder.ID, + StartTime: startTime, + }, + ) + require.NoError(t, err) + + nv.Start() + defer nv.Stop() + + count := 0 + for { + // Send any requests as responses back to the node + for msg, ok := responder.popResponse(); ok; { + // drop every other message + if count%2 == 0 { + require.NoError(t, msg.send(nv)) + } + count += 1 + msg, ok = responder.popResponse() + } + + // check if storage has indexed all + if lastSeq == nvStorage.NumBlocks()-1 { + break + } + + // update the time + startTime = startTime.Add(simplex.DefaultReplicationRequestTimeout) + nv.AdvanceTime(startTime) + } + + // clear in flight responses + startTime = startTime.Add(simplex.DefaultReplicationRequestTimeout) + nv.AdvanceTime(startTime) + responder.clearResponses() + + // ensure all timeout tasks were removed + count = 0 + for { + startTime = startTime.Add(simplex.DefaultReplicationRequestTimeout) + nv.AdvanceTime(startTime) + msg, ok := responder.popResponse() + require.False(t, ok, fmt.Sprintf("all replication request tasks should be finished %v", msg)) + + if count > 3 { + break + } + count += 1 + } +} + +// We optimistically store quorum rounds for sequences that validate an epoch(even though we have not verified the finalization) +// This test ensures, we verify the finalization when it is time to process it. If the finalization is incorrect, we need to re-request +// from the replicator. +func TestNonValidator_VerifiesFinalizationDuringReplication(t *testing.T) { + tc := newSeededChain(t, testNodes, 2) + epoch3Nodes := append(testNodes, simplex.Node{ + Node: simplex.NodeID{byte(10)}, + Weight: 1, + }) + epoch4Nodes := append(epoch3Nodes, simplex.Node{ + Node: simplex.NodeID{byte(11)}, + Weight: 1, + }) + + startTime := time.Now() + storage := testutil.NewInMemStorage() + require.NoError(t, storage.Index(context.Background(), genesis, simplex.Finalization{})) + + nv, err := NewNonValidator( + Config{ + Storage: storage, + Comm: testutil.NewNoopComm(epoch4Nodes.NodeIDs()), + Logger: testutil.MakeLogger(t, 1), + SignatureAggregatorCreator: tc.signatureAggregatorCreator, + MaxSequenceWindow: 5, // significantly lower the max round window + ID: testNodes.NodeIDs()[0], + StartTime: startTime, + }, + ) + + require.NoError(t, err) + nv.Start() + defer nv.Stop() + + s3, s4 := tc.appendSealing(epoch3Nodes), tc.appendSealing(epoch4Nodes) + f3 := tc.newFinalization(s3) + brokenFinalization := tc.newFinalization(s4) + brokenFinalization.Finalization.Epoch = 9000 + brokenFinalization.QC = errQC{QuorumCertificate: brokenFinalization.QC} + + brokenQR := simplex.QuorumRound{ + Block: s4, + Finalization: &brokenFinalization, + } + brokenReplicationResponse := &simplex.ReplicationResponse{ + LatestSeq: &brokenQR, + } + + // With 6 nodes, the threshold votes for validating the highest epoch is 2. Because F + 1 = 2. + require.NoError(t, nv.HandleMessage( + &simplex.Message{ + ReplicationResponse: brokenReplicationResponse, + }, + testNodes.NodeIDs()[2], + )) + require.NoError(t, nv.HandleMessage( + &simplex.Message{ + ReplicationResponse: brokenReplicationResponse, + }, + testNodes.NodeIDs()[3], + )) + + // send seq 1, 2, 3 + s1, f1, err := tc.Retrieve(1) + require.NoError(t, err) + s2, f2, err := tc.Retrieve(2) + require.NoError(t, err) + + require.NoError(t, nv.HandleMessage( + &simplex.Message{ + ReplicationResponse: &simplex.ReplicationResponse{ + Data: []simplex.QuorumRound{ + { + Block: s3, + Finalization: &f3, + }, + { + Block: s1.(simplex.Block), + Finalization: &f1, + }, + { + Block: s2.(simplex.Block), + Finalization: &f2, + }, + }, + }, + }, + testNodes.NodeIDs()[3], + )) + + require.Never(t, + func() bool { + return storage.NumBlocks() >= 5 + }, + 2*time.Second, + 50*time.Millisecond, + ) +} + +func advanceUntil(nv *NonValidator, responder *nonValidatorResponderComm, seq uint64) { + startTime := nv.StartTime + for { + // Send any requests as responses back to the node + for msg, ok := responder.popResponse(); ok; { + // drop every other message + require.NoError(responder.t, msg.send(nv)) + msg, ok = responder.popResponse() + } + + // check if storage has indexed all + if seq == nv.Storage.NumBlocks()-1 { + break + } + + // update the time + startTime = startTime.Add(simplex.DefaultReplicationRequestTimeout) + nv.AdvanceTime(startTime) + } +} diff --git a/testutil/block.go b/testutil/block.go index 176e39ee..bc832e9f 100644 --- a/testutil/block.go +++ b/testutil/block.go @@ -58,6 +58,10 @@ func (tb *TestBlock) Verify(context.Context) (simplex.VerifiedBlock, error) { return tb, nil } +func (tb *TestBlock) SealingBlockInfo() *simplex.SealingBlockInfo { + return nil +} + func (tb *TestBlock) ComputeDigest() { var bb bytes.Buffer tbBytes, err := tb.Bytes() diff --git a/testutil/random_network/block.go b/testutil/random_network/block.go index 59e3f3f9..535b8ff0 100644 --- a/testutil/random_network/block.go +++ b/testutil/random_network/block.go @@ -55,6 +55,10 @@ func (b *Block) BlockHeader() simplex.BlockHeader { } } +func (b *Block) SealingBlockInfo() *simplex.SealingBlockInfo { + return nil +} + type encodedBlock struct { ProtocolMetadata []byte TXs []asn1TX diff --git a/testutil/storage.go b/testutil/storage.go index dc8af15d..9018ebb3 100644 --- a/testutil/storage.go +++ b/testutil/storage.go @@ -46,9 +46,14 @@ func (mem *InMemStorage) NumBlocks() uint64 { } func (mem *InMemStorage) Clone() *InMemStorage { - clone := NewInMemStorage() height := mem.NumBlocks() - for seq := uint64(0); seq < height; seq++ { + return mem.CloneUntil(height) +} + +// CloneUntil will copy the storage up until a given sequence(not including). +func (mem *InMemStorage) CloneUntil(stopSeq uint64) *InMemStorage { + clone := NewInMemStorage() + for seq := uint64(0); seq < stopSeq; seq++ { block, finalization, err := mem.Retrieve(seq) if err != nil { panic(fmt.Sprintf("failed retrieving block %d: %v", seq, err)) diff --git a/util.go b/util.go index 45bdf2da..1ee3629d 100644 --- a/util.go +++ b/util.go @@ -376,7 +376,7 @@ func (t *walRound) String() string { return fmt.Sprintf("walRound{round: %d, hasEmptyNotarization: %t, hasEmptyVote: %t, hasNotarization: %t, hasFinalization: %t, hasBlock: %t}", t.round, hasEmptyNotarization, hasEmptyVote, hasNotarization, hasFinalization, hasBlock) } -func newRandomSource() (*rand.Rand, error) { +func NewRandomSource() (*rand.Rand, error) { var seedBytes [32]byte if _, err := cryptoRand.Read(seedBytes[:]); err != nil {