From 19298519df75de547c7d51a8940dea50a72f4688 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 16 Apr 2026 19:38:33 +0200 Subject: [PATCH 1/3] feat: add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead --- block/internal/da/subscriber.go | 17 +++ block/internal/da/subscriber_test.go | 38 ++++++ block/internal/syncing/da_follower.go | 87 ++++++++++-- block/internal/syncing/da_follower_test.go | 152 +++++++++++++++++++++ block/internal/syncing/syncer.go | 13 ++ 5 files changed, 294 insertions(+), 13 deletions(-) diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index 8ff46773ce..e332af7a62 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -159,6 +159,23 @@ func (s *Subscriber) HasReachedHead() bool { return s.headReached.Load() } +// RewindTo sets localDAHeight back to the given height and signals the catchup +// loop so that DA heights are re-fetched. This is used when the primary source +// (P2P) stalls and DA needs to take over for the missing range. +func (s *Subscriber) RewindTo(daHeight uint64) { + for { + cur := s.localDAHeight.Load() + if daHeight >= cur { + return + } + if s.localDAHeight.CompareAndSwap(cur, daHeight) { + s.headReached.Store(false) + s.signalCatchup() + return + } + } +} + // signalCatchup sends a non-blocking signal to wake catchupLoop. func (s *Subscriber) signalCatchup() { select { diff --git a/block/internal/da/subscriber_test.go b/block/internal/da/subscriber_test.go index 2ed80886de..38d5f4edc0 100644 --- a/block/internal/da/subscriber_test.go +++ b/block/internal/da/subscriber_test.go @@ -101,6 +101,44 @@ func TestSubscriber_RunCatchup(t *testing.T) { }) } +func TestSubscriber_RewindTo(t *testing.T) { + t.Run("no_op_when_target_is_equal_or_higher", func(t *testing.T) { + sub := NewSubscriber(SubscriberConfig{ + Client: testmocks.NewMockClient(t), + Logger: zerolog.Nop(), + Handler: new(MockSubscriberHandler), + Namespaces: [][]byte{[]byte("ns")}, + StartHeight: 100, + DABlockTime: time.Millisecond, + }) + sub.localDAHeight.Store(100) + + sub.RewindTo(100) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) + + sub.RewindTo(200) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) + }) + + t.Run("rewinds_local_height_and_clears_head", func(t *testing.T) { + sub := NewSubscriber(SubscriberConfig{ + Client: testmocks.NewMockClient(t), + Logger: zerolog.Nop(), + Handler: new(MockSubscriberHandler), + Namespaces: [][]byte{[]byte("ns")}, + StartHeight: 100, + DABlockTime: time.Millisecond, + }) + sub.localDAHeight.Store(150) + sub.headReached.Store(true) + + sub.RewindTo(120) + + assert.Equal(t, uint64(120), sub.LocalDAHeight()) + assert.False(t, sub.HasReachedHead()) + }) +} + func TestSubscriber_RunSubscription_InlineDoesNotPrematurelyReachHead(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index 443fb876ae..8366371d7f 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -5,6 +5,7 @@ import ( "errors" "slices" "sync" + "sync/atomic" "time" "github.com/rs/zerolog" @@ -26,10 +27,19 @@ type DAFollower interface { // daFollower is the concrete implementation of DAFollower. type daFollower struct { - subscriber *da.Subscriber - retriever DARetriever - eventSink common.EventSink - logger zerolog.Logger + subscriber *da.Subscriber + retriever DARetriever + eventSink common.EventSink + logger zerolog.Logger + nodeHeightFn func() uint64 + p2pStalledFn func() bool + startDAHeight uint64 + + // walkbackActive is set when the follower detects a gap between the + // DA events it just processed and the node's current block height. + // While active, every DA height (even empty ones) triggers a rewind + // so the subscriber walks backwards until the gap is filled. + walkbackActive atomic.Bool // Priority queue for P2P hint heights (absorbed from DARetriever refactoring #2). priorityMu sync.Mutex @@ -48,6 +58,12 @@ type DAFollowerConfig struct { DataNamespace []byte // may be nil or equal to Namespace StartDAHeight uint64 DABlockTime time.Duration + // NodeHeight returns the node's current block height. Used together + // with P2PStalled to detect gaps that need a DA walkback. + NodeHeight func() uint64 + // P2PStalled returns true when the P2P sync worker has failed to + // deliver blocks. The follower only walks back when P2P is stalled. + P2PStalled func() bool } // NewDAFollower creates a new daFollower. @@ -61,6 +77,9 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower { retriever: cfg.Retriever, eventSink: cfg.EventSink, logger: cfg.Logger.With().Str("component", "da_follower").Logger(), + nodeHeightFn: cfg.NodeHeight, + p2pStalledFn: cfg.P2PStalled, + startDAHeight: cfg.StartDAHeight, priorityHeights: make([]uint64, 0), } @@ -123,6 +142,13 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve // HandleCatchup retrieves events at a single DA height and pipes them // to the event sink. Checks priority heights first. +// +// When a node-height callback is configured, HandleCatchup detects gaps +// between the block heights it just fetched and the node's current height. +// If the smallest block height is above nodeHeight+1 the subscriber is +// rewound by one DA height so it re-fetches the previous height on the +// next iteration. This "walk-back" continues automatically through empty +// DA heights until blocks contiguous with the node are found. func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { // 1. Drain stale or future priority heights from P2P hints for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() { @@ -134,23 +160,58 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { Uint64("da_height", priorityHeight). Msg("fetching priority DA height from P2P hint") - if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { + if _, err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { if errors.Is(err, datypes.ErrHeightFromFuture) { - // Priority hint points to a future height — silently ignore. f.logger.Debug().Uint64("priority_da_height", priorityHeight). Msg("priority hint is from future, ignoring") continue } - // Roll back so daHeight is attempted again next cycle after backoff. return err } break // continue with daHeight } // 2. Normal sequential fetch - if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil { + events, err := f.fetchAndPipeHeight(ctx, daHeight) + if err != nil { return err } + + // 3. Self-correction: walk back when P2P has stalled and DA blocks skip + // past the node height. Only active when P2P is confirmed stalled to + // avoid unnecessary rewinds during normal DA catchup. + p2pStalled := f.p2pStalledFn != nil && f.p2pStalledFn() + if p2pStalled && f.nodeHeightFn != nil && daHeight > f.startDAHeight { + nodeHeight := f.nodeHeightFn() + + needsWalkback := f.walkbackActive.Load() + if len(events) > 0 { + minHeight := events[0].Header.Height() + for _, e := range events[1:] { + if e.Header.Height() < minHeight { + minHeight = e.Header.Height() + } + } + if minHeight <= nodeHeight+1 { + f.walkbackActive.Store(false) + return nil + } + needsWalkback = true + } + + if needsWalkback { + f.walkbackActive.Store(true) + f.logger.Info(). + Uint64("da_height", daHeight). + Uint64("node_height", nodeHeight). + Int("events", len(events)). + Msg("P2P stalled with gap between DA blocks and node height, walking DA follower back") + f.subscriber.RewindTo(daHeight - 1) + } + } else if !p2pStalled { + f.walkbackActive.Store(false) + } + return nil } @@ -158,22 +219,22 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { // It does NOT handle ErrHeightFromFuture — callers must decide how to react // because the correct response depends on whether this is a normal sequential // catchup or a priority-hint fetch. -func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error { +func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { events, err := f.retriever.RetrieveFromDA(ctx, daHeight) if err != nil { if errors.Is(err, datypes.ErrBlobNotFound) { - return nil + return nil, nil } - return err + return nil, err } for _, event := range events { if err := f.eventSink.PipeEvent(ctx, event); err != nil { - return err + return nil, err } } - return nil + return events, nil } // QueuePriorityHeight queues a DA height for priority retrieval. diff --git a/block/internal/syncing/da_follower_test.go b/block/internal/syncing/da_follower_test.go index 710d2d81d0..2793ad8072 100644 --- a/block/internal/syncing/da_follower_test.go +++ b/block/internal/syncing/da_follower_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -11,7 +12,9 @@ import ( "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" datypes "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/types" ) func TestDAFollower_HandleEvent(t *testing.T) { @@ -251,3 +254,152 @@ func makeRange(start, end uint64) []uint64 { } return out } + +type mockSubHandler struct { + mock.Mock +} + +func (m *mockSubHandler) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error { + return m.Called(ctx, ev, isInline).Error(0) +} + +func (m *mockSubHandler) HandleCatchup(ctx context.Context, height uint64) error { + return m.Called(ctx, height).Error(0) +} + +func newTestSubscriber(startHeight uint64) *da.Subscriber { + return da.NewSubscriber(da.SubscriberConfig{ + Client: nil, + Logger: zerolog.Nop(), + Handler: &mockSubHandler{}, + Namespaces: nil, + StartHeight: startHeight, + DABlockTime: time.Millisecond, + }) +} + +func makeHeader(height uint64) *types.SignedHeader { + return &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}} +} + +func TestDAFollower_HandleCatchup_SelfCorrectingWalkback(t *testing.T) { + t.Run("rewinds_when_gap_detected", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() + + sub := newTestSubscriber(100) + sub.LocalDAHeight() // ensure initialized + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + + err := follower.HandleCatchup(t.Context(), 100) + require.NoError(t, err) + assert.True(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(99), sub.LocalDAHeight()) + }) + + t.Run("keeps_walking_back_on_empty_height", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(99)). + Return(nil, datypes.ErrBlobNotFound).Once() + + sub := newTestSubscriber(100) + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + follower.walkbackActive.Store(true) + + err := follower.HandleCatchup(t.Context(), 99) + require.NoError(t, err) + assert.True(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(98), sub.LocalDAHeight()) + }) + + t.Run("stops_walkback_when_contiguous", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(95)). + Return([]common.DAHeightEvent{{Header: makeHeader(41)}}, nil).Once() + + sub := newTestSubscriber(100) + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + follower.walkbackActive.Store(true) + + err := follower.HandleCatchup(t.Context(), 95) + require.NoError(t, err) + assert.False(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) // no rewind + }) + + t.Run("no_walkback_without_nodeHeightFn", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). + Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() + + sub := newTestSubscriber(100) + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + startDAHeight: 1, + } + + err := follower.HandleCatchup(t.Context(), 100) + require.NoError(t, err) + assert.False(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) // no rewind + }) + + t.Run("no_walkback_at_startDAHeight", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + daRetriever.On("RetrieveFromDA", mock.Anything, uint64(1)). + Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() + + sub := newTestSubscriber(1) + + var nodeHeight uint64 = 40 + + follower := &daFollower{ + subscriber: sub, + retriever: daRetriever, + eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), + logger: zerolog.Nop(), + nodeHeightFn: func() uint64 { return nodeHeight }, + startDAHeight: 1, + } + + err := follower.HandleCatchup(t.Context(), 1) + require.NoError(t, err) + assert.False(t, follower.walkbackActive.Load()) + assert.Equal(t, uint64(1), sub.LocalDAHeight()) // no rewind + }) +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4d2cbb4afe..a97767b25b 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -82,6 +82,11 @@ type Syncer struct { daFollower DAFollower + // p2pStalled is set by p2pWorkerLoop when P2P genuinely fails (not + // cancelled by a DA event). The DA follower reads it to decide whether + // to walk back and fill the gap from DA. + p2pStalled atomic.Bool + // Forced inclusion tracking forcedInclusionMu sync.RWMutex seenBlockTxs map[string]struct{} // SHA-256 hex of every tx seen in a DA-sourced block @@ -220,6 +225,11 @@ func (s *Syncer) Start(ctx context.Context) (err error) { DataNamespace: s.daClient.GetDataNamespace(), StartDAHeight: s.daRetrieverHeight.Load(), DABlockTime: s.config.DA.BlockTime.Duration, + NodeHeight: func() uint64 { + h, _ := s.store.Height(s.ctx) + return h + }, + P2PStalled: s.p2pStalled.Load, }) if err = s.daFollower.Start(ctx); err != nil { return fmt.Errorf("failed to start DA follower: %w", err) @@ -488,6 +498,7 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { } if waitCtx.Err() == nil { + s.p2pStalled.Store(true) logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height") } @@ -497,6 +508,8 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { continue } + s.p2pStalled.Store(false) + if err := s.waitForStoreHeight(ctx, targetHeight); err != nil { if errors.Is(err, context.Canceled) { return From 9b7ecceef7b5d561c5ad5db45eb754f831795b64 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 16 Apr 2026 19:40:04 +0200 Subject: [PATCH 2/3] add cl --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99026c6e04..63f1d97e68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes +- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [#3262](https://github.com/evstack/ev-node/pull/3262) - Make it easier to override `DefaultMaxBlobSize` by ldflags [#3235](https://github.com/evstack/ev-node/pull/3235) - Add solo sequencer (simple in memory single sequencer without force inclusion) [#3235](https://github.com/evstack/ev-node/pull/3235) - Improve reaper to sustain txs burst better [#3236](https://github.com/evstack/ev-node/pull/3236) From 31ceb936da53dbd0d1fd871cf3631a7085b259cb Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Sun, 19 Apr 2026 21:07:47 +0200 Subject: [PATCH 3/3] refactor: walk back in syncer --- block/internal/da/async_block_retriever.go | 13 +- block/internal/da/subscriber.go | 31 ++++- block/internal/da/subscriber_test.go | 13 +- block/internal/syncing/da_follower.go | 94 ++++--------- block/internal/syncing/da_follower_test.go | 152 +-------------------- block/internal/syncing/syncer.go | 81 +++++++++-- block/internal/syncing/syncer_test.go | 119 ++++++++++++++++ 7 files changed, 251 insertions(+), 252 deletions(-) diff --git a/block/internal/da/async_block_retriever.go b/block/internal/da/async_block_retriever.go index 2230df87c4..5c1ac156f2 100644 --- a/block/internal/da/async_block_retriever.go +++ b/block/internal/da/async_block_retriever.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "google.golang.org/protobuf/proto" + "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -186,31 +187,31 @@ func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.Subscr // HandleCatchup fetches a single height via Retrieve and caches it. // Also applies the prefetch window for speculative forward fetching. -func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) error { +func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { if err := ctx.Err(); err != nil { - return err + return nil, err } if _, err := f.cache.Get(ctx, newBlockDataKey(daHeight)); err != nil { if err := f.fetchAndCacheBlock(ctx, daHeight); err != nil { - return err + return nil, err } } // Speculatively prefetch ahead. target := daHeight + f.prefetchWindow for h := daHeight + 1; h <= target; h++ { if err := ctx.Err(); err != nil { - return err + return nil, err } if _, err := f.cache.Get(ctx, newBlockDataKey(h)); err == nil { continue // Already cached. } if err := f.fetchAndCacheBlock(ctx, h); err != nil { - return err + return nil, err } } - return nil + return nil, nil } // fetchAndCacheBlock fetches a block via Retrieve and caches it. diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index e332af7a62..d56a5f5952 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -23,9 +24,13 @@ type SubscriberHandler interface { HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error // HandleCatchup is called for each height during sequential catchup. - // The subscriber advances localDAHeight only after this returns (true, nil). + // The subscriber advances localDAHeight only after this returns nil. // Returning an error rolls back localDAHeight and triggers a backoff retry. - HandleCatchup(ctx context.Context, height uint64) error + // The returned events are the DAHeightEvents produced for this height + // (may be nil/empty). The subscriber does not interpret them; they are + // returned so that higher-level callers (via the Subscriber) can inspect + // the results without coupling to the handler's internals. + HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error) } // SubscriberConfig holds configuration for creating a Subscriber. @@ -39,6 +44,14 @@ type SubscriberConfig struct { FetchBlockTimestamp bool // the timestamp comes with an extra api call before Celestia v0.29.1-mocha. StartHeight uint64 // initial localDAHeight + + // WalkbackChecker is an optional callback invoked after each successful + // HandleCatchup call. It receives the DA height just processed and the + // events returned by the handler. If it returns a non-zero DA height, + // the subscriber rewinds to that height so it re-fetches on the next + // iteration. Return 0 to continue normally. + // This is nil for subscribers that don't need walkback (e.g. async block retriever). + WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64 } // Subscriber is a shared DA subscription primitive that encapsulates the @@ -48,9 +61,10 @@ type SubscriberConfig struct { // // Used by both DAFollower (syncing) and asyncBlockRetriever (forced inclusion). type Subscriber struct { - client Client - logger zerolog.Logger - handler SubscriberHandler + client Client + logger zerolog.Logger + handler SubscriberHandler + walkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64 // namespaces to subscribe on. When multiple, they are merged. namespaces [][]byte @@ -91,6 +105,7 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber { client: cfg.Client, logger: cfg.Logger, handler: cfg.Handler, + walkbackChecker: cfg.WalkbackChecker, namespaces: cfg.Namespaces, catchupSignal: make(chan struct{}, 1), daBlockTime: cfg.DABlockTime, @@ -373,7 +388,7 @@ func (s *Subscriber) runCatchup(ctx context.Context) { continue } - if err := s.handler.HandleCatchup(ctx, local); err != nil { + if events, err := s.handler.HandleCatchup(ctx, local); err != nil { // Roll back so we can retry after backoff. s.localDAHeight.Store(local) if errors.Is(err, datypes.ErrHeightFromFuture) && local >= highest { @@ -383,6 +398,10 @@ func (s *Subscriber) runCatchup(ctx context.Context) { if !s.shouldContinueCatchup(ctx, err, local) { return } + } else if s.walkbackChecker != nil { + if rewindTo := s.walkbackChecker(local, events); rewindTo > 0 { + s.RewindTo(rewindTo) + } } } } diff --git a/block/internal/da/subscriber_test.go b/block/internal/da/subscriber_test.go index 38d5f4edc0..2b873977b1 100644 --- a/block/internal/da/subscriber_test.go +++ b/block/internal/da/subscriber_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" testmocks "github.com/evstack/ev-node/test/mocks" ) @@ -24,9 +25,9 @@ func (m *MockSubscriberHandler) HandleEvent(ctx context.Context, ev datypes.Subs return args.Error(0) } -func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) error { +func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error) { args := m.Called(ctx, height) - return args.Error(0) + return args.Get(0).([]common.DAHeightEvent), args.Error(1) } func TestSubscriber_RunCatchup(t *testing.T) { @@ -49,8 +50,8 @@ func TestSubscriber_RunCatchup(t *testing.T) { // It should process observed heights [100..101] then stop when local passes highestSeen. sub.updateHighest(101) sub.seenSubscriptionEvent.Store(true) - mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return(nil).Once() - mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return(nil).Once() + mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return([]common.DAHeightEvent(nil), nil).Once() + mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return([]common.DAHeightEvent(nil), nil).Once() sub.runCatchup(ctx) @@ -84,13 +85,13 @@ func TestSubscriber_RunCatchup(t *testing.T) { Run(func(args mock.Arguments) { callCount++ }). - Return(errors.New("network failure")).Once() + Return([]common.DAHeightEvent(nil), errors.New("network failure")).Once() mockHandler.On("HandleCatchup", mock.Anything, uint64(100)). Run(func(args mock.Arguments) { callCount++ }). - Return(nil).Once() + Return([]common.DAHeightEvent(nil), nil).Once() sub.runCatchup(ctx) diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index 8366371d7f..70ce946570 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -5,7 +5,6 @@ import ( "errors" "slices" "sync" - "sync/atomic" "time" "github.com/rs/zerolog" @@ -23,6 +22,10 @@ type DAFollower interface { HasReachedHead() bool // QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints). QueuePriorityHeight(daHeight uint64) + // RewindTo moves the subscriber back to a lower DA height so it + // re-fetches from there. Used by the syncer for walkback when a gap + // is detected between the node height and the DA events. + RewindTo(daHeight uint64) } // daFollower is the concrete implementation of DAFollower. @@ -31,16 +34,8 @@ type daFollower struct { retriever DARetriever eventSink common.EventSink logger zerolog.Logger - nodeHeightFn func() uint64 - p2pStalledFn func() bool startDAHeight uint64 - // walkbackActive is set when the follower detects a gap between the - // DA events it just processed and the node's current block height. - // While active, every DA height (even empty ones) triggers a rewind - // so the subscriber walks backwards until the gap is filled. - walkbackActive atomic.Bool - // Priority queue for P2P hint heights (absorbed from DARetriever refactoring #2). priorityMu sync.Mutex priorityHeights []uint64 @@ -58,12 +53,9 @@ type DAFollowerConfig struct { DataNamespace []byte // may be nil or equal to Namespace StartDAHeight uint64 DABlockTime time.Duration - // NodeHeight returns the node's current block height. Used together - // with P2PStalled to detect gaps that need a DA walkback. - NodeHeight func() uint64 - // P2PStalled returns true when the P2P sync worker has failed to - // deliver blocks. The follower only walks back when P2P is stalled. - P2PStalled func() bool + // WalkbackChecker is forwarded to the underlying Subscriber. + // See da.SubscriberConfig.WalkbackChecker for details. + WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64 } // NewDAFollower creates a new daFollower. @@ -77,19 +69,18 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower { retriever: cfg.Retriever, eventSink: cfg.EventSink, logger: cfg.Logger.With().Str("component", "da_follower").Logger(), - nodeHeightFn: cfg.NodeHeight, - p2pStalledFn: cfg.P2PStalled, startDAHeight: cfg.StartDAHeight, priorityHeights: make([]uint64, 0), } f.subscriber = da.NewSubscriber(da.SubscriberConfig{ - Client: cfg.Client, - Logger: cfg.Logger, - Namespaces: [][]byte{cfg.Namespace, dataNs}, - DABlockTime: cfg.DABlockTime, - Handler: f, - StartHeight: cfg.StartDAHeight, + Client: cfg.Client, + Logger: cfg.Logger, + Namespaces: [][]byte{cfg.Namespace, dataNs}, + DABlockTime: cfg.DABlockTime, + Handler: f, + StartHeight: cfg.StartDAHeight, + WalkbackChecker: cfg.WalkbackChecker, }) return f @@ -110,6 +101,11 @@ func (f *daFollower) HasReachedHead() bool { return f.subscriber.HasReachedHead() } +// RewindTo moves the subscriber back to a lower DA height for re-fetching. +func (f *daFollower) RewindTo(daHeight uint64) { + f.subscriber.RewindTo(daHeight) +} + // HandleEvent processes a subscription event. When the follower is // caught up (ev.Height == localDAHeight) and blobs are available, it processes // them inline — avoiding a DA re-fetch round trip. Otherwise, it just lets @@ -142,14 +138,7 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve // HandleCatchup retrieves events at a single DA height and pipes them // to the event sink. Checks priority heights first. -// -// When a node-height callback is configured, HandleCatchup detects gaps -// between the block heights it just fetched and the node's current height. -// If the smallest block height is above nodeHeight+1 the subscriber is -// rewound by one DA height so it re-fetches the previous height on the -// next iteration. This "walk-back" continues automatically through empty -// DA heights until blocks contiguous with the node are found. -func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { +func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { // 1. Drain stale or future priority heights from P2P hints for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() { if priorityHeight < daHeight { @@ -166,53 +155,18 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { Msg("priority hint is from future, ignoring") continue } - return err + return nil, err } - break // continue with daHeight + break } // 2. Normal sequential fetch events, err := f.fetchAndPipeHeight(ctx, daHeight) if err != nil { - return err - } - - // 3. Self-correction: walk back when P2P has stalled and DA blocks skip - // past the node height. Only active when P2P is confirmed stalled to - // avoid unnecessary rewinds during normal DA catchup. - p2pStalled := f.p2pStalledFn != nil && f.p2pStalledFn() - if p2pStalled && f.nodeHeightFn != nil && daHeight > f.startDAHeight { - nodeHeight := f.nodeHeightFn() - - needsWalkback := f.walkbackActive.Load() - if len(events) > 0 { - minHeight := events[0].Header.Height() - for _, e := range events[1:] { - if e.Header.Height() < minHeight { - minHeight = e.Header.Height() - } - } - if minHeight <= nodeHeight+1 { - f.walkbackActive.Store(false) - return nil - } - needsWalkback = true - } - - if needsWalkback { - f.walkbackActive.Store(true) - f.logger.Info(). - Uint64("da_height", daHeight). - Uint64("node_height", nodeHeight). - Int("events", len(events)). - Msg("P2P stalled with gap between DA blocks and node height, walking DA follower back") - f.subscriber.RewindTo(daHeight - 1) - } - } else if !p2pStalled { - f.walkbackActive.Store(false) + return nil, err } - return nil + return events, nil } // fetchAndPipeHeight retrieves events at a single DA height and pipes them. diff --git a/block/internal/syncing/da_follower_test.go b/block/internal/syncing/da_follower_test.go index 2793ad8072..802b685c5d 100644 --- a/block/internal/syncing/da_follower_test.go +++ b/block/internal/syncing/da_follower_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "testing" - "time" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -12,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/block/internal/da" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/types" ) @@ -167,7 +165,6 @@ func TestDAFollower_HandleCatchup(t *testing.T) { initialPriorityHeights: []uint64{99}, wantPipedHeights: []uint64{100}, setupMock: func(m *MockDARetriever) { - // stale priority hint (< daHeight) is discarded; only sequential height is fetched m.On("RetrieveFromDA", mock.Anything, uint64(100)). Return([]common.DAHeightEvent{{DaHeight: 100}}, nil).Once() }, @@ -182,7 +179,7 @@ func TestDAFollower_HandleCatchup(t *testing.T) { } follower, getPipedEvents := newFollower(t, s, daRetriever) - err := follower.HandleCatchup(t.Context(), s.daHeight) + events, err := follower.HandleCatchup(t.Context(), s.daHeight) if s.wantErrIs != nil { require.ErrorIs(t, err, s.wantErrIs) @@ -206,6 +203,8 @@ func TestDAFollower_HandleCatchup(t *testing.T) { } else { assert.Empty(t, follower.priorityHeights) } + + _ = events }) } } @@ -255,151 +254,6 @@ func makeRange(start, end uint64) []uint64 { return out } -type mockSubHandler struct { - mock.Mock -} - -func (m *mockSubHandler) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error { - return m.Called(ctx, ev, isInline).Error(0) -} - -func (m *mockSubHandler) HandleCatchup(ctx context.Context, height uint64) error { - return m.Called(ctx, height).Error(0) -} - -func newTestSubscriber(startHeight uint64) *da.Subscriber { - return da.NewSubscriber(da.SubscriberConfig{ - Client: nil, - Logger: zerolog.Nop(), - Handler: &mockSubHandler{}, - Namespaces: nil, - StartHeight: startHeight, - DABlockTime: time.Millisecond, - }) -} - func makeHeader(height uint64) *types.SignedHeader { return &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}} } - -func TestDAFollower_HandleCatchup_SelfCorrectingWalkback(t *testing.T) { - t.Run("rewinds_when_gap_detected", func(t *testing.T) { - daRetriever := NewMockDARetriever(t) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() - - sub := newTestSubscriber(100) - sub.LocalDAHeight() // ensure initialized - - var nodeHeight uint64 = 40 - - follower := &daFollower{ - subscriber: sub, - retriever: daRetriever, - eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), - logger: zerolog.Nop(), - nodeHeightFn: func() uint64 { return nodeHeight }, - startDAHeight: 1, - } - - err := follower.HandleCatchup(t.Context(), 100) - require.NoError(t, err) - assert.True(t, follower.walkbackActive.Load()) - assert.Equal(t, uint64(99), sub.LocalDAHeight()) - }) - - t.Run("keeps_walking_back_on_empty_height", func(t *testing.T) { - daRetriever := NewMockDARetriever(t) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(99)). - Return(nil, datypes.ErrBlobNotFound).Once() - - sub := newTestSubscriber(100) - - var nodeHeight uint64 = 40 - - follower := &daFollower{ - subscriber: sub, - retriever: daRetriever, - eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), - logger: zerolog.Nop(), - nodeHeightFn: func() uint64 { return nodeHeight }, - startDAHeight: 1, - } - follower.walkbackActive.Store(true) - - err := follower.HandleCatchup(t.Context(), 99) - require.NoError(t, err) - assert.True(t, follower.walkbackActive.Load()) - assert.Equal(t, uint64(98), sub.LocalDAHeight()) - }) - - t.Run("stops_walkback_when_contiguous", func(t *testing.T) { - daRetriever := NewMockDARetriever(t) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(95)). - Return([]common.DAHeightEvent{{Header: makeHeader(41)}}, nil).Once() - - sub := newTestSubscriber(100) - - var nodeHeight uint64 = 40 - - follower := &daFollower{ - subscriber: sub, - retriever: daRetriever, - eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), - logger: zerolog.Nop(), - nodeHeightFn: func() uint64 { return nodeHeight }, - startDAHeight: 1, - } - follower.walkbackActive.Store(true) - - err := follower.HandleCatchup(t.Context(), 95) - require.NoError(t, err) - assert.False(t, follower.walkbackActive.Load()) - assert.Equal(t, uint64(100), sub.LocalDAHeight()) // no rewind - }) - - t.Run("no_walkback_without_nodeHeightFn", func(t *testing.T) { - daRetriever := NewMockDARetriever(t) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() - - sub := newTestSubscriber(100) - - follower := &daFollower{ - subscriber: sub, - retriever: daRetriever, - eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), - logger: zerolog.Nop(), - startDAHeight: 1, - } - - err := follower.HandleCatchup(t.Context(), 100) - require.NoError(t, err) - assert.False(t, follower.walkbackActive.Load()) - assert.Equal(t, uint64(100), sub.LocalDAHeight()) // no rewind - }) - - t.Run("no_walkback_at_startDAHeight", func(t *testing.T) { - daRetriever := NewMockDARetriever(t) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(1)). - Return([]common.DAHeightEvent{{Header: makeHeader(50)}}, nil).Once() - - sub := newTestSubscriber(1) - - var nodeHeight uint64 = 40 - - follower := &daFollower{ - subscriber: sub, - retriever: daRetriever, - eventSink: common.EventSinkFunc(func(_ context.Context, _ common.DAHeightEvent) error { return nil }), - logger: zerolog.Nop(), - nodeHeightFn: func() uint64 { return nodeHeight }, - startDAHeight: 1, - } - - err := follower.HandleCatchup(t.Context(), 1) - require.NoError(t, err) - assert.False(t, follower.walkbackActive.Load()) - assert.Equal(t, uint64(1), sub.LocalDAHeight()) // no rewind - }) -} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 21c5d2bf25..16c13807d5 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -83,10 +83,16 @@ type Syncer struct { daFollower DAFollower // p2pStalled is set by p2pWorkerLoop when P2P genuinely fails (not - // cancelled by a DA event). The DA follower reads it to decide whether - // to walk back and fill the gap from DA. + // cancelled by a DA event). Used by the walkback check to decide + // whether to rewind the DA follower. p2pStalled atomic.Bool + // walkbackActive is set when the syncer detects a gap between the + // node's block height and the DA events being processed. While active, + // the DA follower is rewound one height at a time until the gap is + // filled. + walkbackActive atomic.Bool + // Forced inclusion tracking forcedInclusionMu sync.RWMutex seenBlockTxs map[string]struct{} // SHA-256 hex of every tx seen in a DA-sourced block @@ -217,19 +223,15 @@ func (s *Syncer) Start(ctx context.Context) (err error) { // Start the DA follower (subscribe + catchup) and other workers s.daFollower = NewDAFollower(DAFollowerConfig{ - Client: s.daClient, - Retriever: s.daRetriever, - Logger: s.logger, - EventSink: s, - Namespace: s.daClient.GetHeaderNamespace(), - DataNamespace: s.daClient.GetDataNamespace(), - StartDAHeight: s.daRetrieverHeight.Load(), - DABlockTime: s.config.DA.BlockTime.Duration, - NodeHeight: func() uint64 { - h, _ := s.store.Height(s.ctx) - return h - }, - P2PStalled: s.p2pStalled.Load, + Client: s.daClient, + Retriever: s.daRetriever, + Logger: s.logger, + EventSink: s, + Namespace: s.daClient.GetHeaderNamespace(), + DataNamespace: s.daClient.GetDataNamespace(), + StartDAHeight: s.daRetrieverHeight.Load(), + DABlockTime: s.config.DA.BlockTime.Duration, + WalkbackChecker: s.walkbackCheck, }) if err = s.daFollower.Start(ctx); err != nil { return fmt.Errorf("failed to start DA follower: %w", err) @@ -519,6 +521,55 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { } } +// walkbackCheck is the WalkbackChecker callback for the DA follower's +// subscriber. It decides whether the subscriber should rewind to re-fetch +// previous DA heights, based on the gap between the node's block height +// and the block heights found in the DA events. +// +// Returns a DA height to rewind to, or 0 to continue normally. +func (s *Syncer) walkbackCheck(daHeight uint64, events []common.DAHeightEvent) uint64 { + if !s.p2pStalled.Load() { + s.walkbackActive.Store(false) + return 0 + } + + if daHeight <= s.daRetrieverHeight.Load() { + return 0 + } + + nodeHeight, err := s.store.Height(s.ctx) + if err != nil { + return 0 + } + + needsWalkback := s.walkbackActive.Load() + if len(events) > 0 { + minHeight := events[0].Header.Height() + for _, e := range events[1:] { + if e.Header.Height() < minHeight { + minHeight = e.Header.Height() + } + } + if minHeight <= nodeHeight+1 { + s.walkbackActive.Store(false) + return 0 + } + needsWalkback = true + } + + if needsWalkback { + s.walkbackActive.Store(true) + s.logger.Info(). + Uint64("da_height", daHeight). + Uint64("node_height", nodeHeight). + Int("events", len(events)). + Msg("P2P stalled with gap between DA blocks and node height, walking DA follower back") + return daHeight - 1 + } + + return 0 +} + func (s *Syncer) waitForGenesis() bool { if delay := time.Until(s.genesis.StartTime); delay > 0 { timer := time.NewTimer(delay) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..c8b1b48b90 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -1454,3 +1454,122 @@ func TestSyncer_Stop_DrainWorksWithoutCriticalError(t *testing.T) { mockExec.AssertExpectations(t) }) } + +func TestSyncer_walkbackCheck(t *testing.T) { + makeEvents := func(heights ...uint64) []common.DAHeightEvent { + events := make([]common.DAHeightEvent, len(heights)) + for i, h := range heights { + events[i] = common.DAHeightEvent{Header: makeHeader(h)} + } + return events + } + + t.Run("returns_zero_when_p2p_not_stalled", func(t *testing.T) { + s := &Syncer{ + ctx: t.Context(), + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + + got := s.walkbackCheck(100, makeEvents(50)) + assert.Equal(t, uint64(0), got) + assert.False(t, s.walkbackActive.Load()) + }) + + t.Run("returns_zero_at_startDAHeight", func(t *testing.T) { + s := &Syncer{ + ctx: t.Context(), + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + s.store = st + s.cache = cm + + got := s.walkbackCheck(1, makeEvents(50)) + assert.Equal(t, uint64(0), got) + }) + + t.Run("activates_walkback_when_gap_detected", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(40)) + require.NoError(t, batch.Commit()) + + s := &Syncer{ + ctx: t.Context(), + store: st, + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + + got := s.walkbackCheck(100, makeEvents(50)) + assert.Equal(t, uint64(99), got) + assert.True(t, s.walkbackActive.Load()) + }) + + t.Run("continues_walkback_on_empty_events", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(40)) + require.NoError(t, batch.Commit()) + + s := &Syncer{ + ctx: t.Context(), + store: st, + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + s.walkbackActive.Store(true) + + got := s.walkbackCheck(99, nil) + assert.Equal(t, uint64(98), got) + assert.True(t, s.walkbackActive.Load()) + }) + + t.Run("stops_walkback_when_contiguous", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(40)) + require.NoError(t, batch.Commit()) + + s := &Syncer{ + ctx: t.Context(), + store: st, + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + s.walkbackActive.Store(true) + + got := s.walkbackCheck(95, makeEvents(41)) + assert.Equal(t, uint64(0), got) + assert.False(t, s.walkbackActive.Load()) + }) + + t.Run("clears_walkback_when_p2p_recovers", func(t *testing.T) { + s := &Syncer{ + ctx: t.Context(), + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.walkbackActive.Store(true) + + got := s.walkbackCheck(100, nil) + assert.Equal(t, uint64(0), got) + assert.False(t, s.walkbackActive.Load()) + }) +}