From 2afb8971b7ce0528f01f37b32e986e48336c923b Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 17 Jun 2026 14:36:46 +0200 Subject: [PATCH 1/7] feat: block time average --- pkg/transaction/wrapped/cache/cache.go | 9 +- pkg/transaction/wrapped/cache/cache_test.go | 16 +-- pkg/transaction/wrapped/wrapped.go | 37 +++++- pkg/transaction/wrapped/wrapped_test.go | 136 +++++++++++++++++++- 4 files changed, 176 insertions(+), 22 deletions(-) diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go index 66d161253ac..0e717cb96db 100644 --- a/pkg/transaction/wrapped/cache/cache.go +++ b/pkg/transaction/wrapped/cache/cache.go @@ -13,7 +13,7 @@ import ( ) type ( - Loader[T any] func() (T, error) + Loader[T any] func(prev T) (T, error) ReuseEvaluator[T any] func(value T) bool ) @@ -64,7 +64,12 @@ func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEva result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) { c.metrics.Loads.Inc() - value, err := loader() + + c.mu.RLock() + prev := c.value + c.mu.RUnlock() + + value, err := loader(prev) if err != nil { c.metrics.LoadErrors.Inc() return value, err diff --git a/pkg/transaction/wrapped/cache/cache_test.go b/pkg/transaction/wrapped/cache/cache_test.go index d954a7aee4b..9010d185116 100644 --- a/pkg/transaction/wrapped/cache/cache_test.go +++ b/pkg/transaction/wrapped/cache/cache_test.go @@ -37,7 +37,7 @@ func TestPeekOrLoadHit(t *testing.T) { func(value uint64) bool { return true }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 0, errors.New("loader must not run") }, @@ -59,7 +59,7 @@ func TestPeekOrLoadMiss(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 99, nil }, @@ -75,7 +75,7 @@ func TestPeekOrLoadMiss(t *testing.T) { func(value uint64) bool { return true }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { verifyLoads.Add(1) return 0, errors.New("unexpected load on verify") }, @@ -97,7 +97,7 @@ func TestPeekOrLoadError(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 99, errLoad }, @@ -112,7 +112,7 @@ func TestPeekOrLoadError(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 0, errLoad }, @@ -143,7 +143,7 @@ func TestPeekOrLoadSingleflight(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) <-gate return value, nil @@ -186,7 +186,7 @@ func TestPeekOrLoadContextCancellation(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { <-gate return expectedVal, nil }, @@ -200,7 +200,7 @@ func TestPeekOrLoadContextCancellation(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { <-gate return expectedVal, nil }, diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index c8afd8599ab..900825f5035 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -21,8 +21,9 @@ import ( var _ transaction.Backend = (*wrappedBackend)(nil) type blockNumberAnchor struct { - number uint64 - timestamp time.Time + number uint64 + timestamp time.Time + averageBlockTime time.Duration } type wrappedBackend struct { @@ -86,7 +87,7 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { return elapsedBlocks < b.blockSyncInterval } - loadFreshBlockFn := func() (blockNumberAnchor, error) { + loadFreshBlockFn := func(prev blockNumberAnchor) (blockNumberAnchor, error) { b.metrics.TotalRPCCalls.Inc() b.metrics.BlockHeaderAsBlockNumberCalls.Inc() @@ -99,9 +100,14 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { b.metrics.TotalRPCErrors.Inc() return blockNumberAnchor{}, errors.New("latest block header unavailable") } + + newNumber := header.Number.Uint64() + newTime := time.Unix(int64(header.Time), 0).UTC() + return blockNumberAnchor{ - number: header.Number.Uint64(), - timestamp: time.Unix(int64(header.Time), 0).UTC(), + number: newNumber, + timestamp: newTime, + averageBlockTime: b.computeAverageBlockTime(prev, newNumber, newTime), }, nil } @@ -126,10 +132,29 @@ func (b *wrappedBackend) estimatedBlockNumberWithElapsed(anchor blockNumberAncho return anchor.number, 0 } - elapsedBlocks := uint64(now.Sub(anchor.timestamp) / b.blockTime) + blockTime := anchor.averageBlockTime + if blockTime == 0 { + blockTime = b.blockTime + } + + elapsedBlocks := uint64(now.Sub(anchor.timestamp) / blockTime) return anchor.number + elapsedBlocks, elapsedBlocks } +func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime time.Time) time.Duration { + if prev.number == 0 || newNumber <= prev.number { + return b.blockTime + } + if !newTime.After(prev.timestamp) { + return b.blockTime + } + + elapsed := newTime.Sub(prev.timestamp) + blocks := newNumber - prev.number + + return elapsed / time.Duration(blocks) +} + func (b *wrappedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { b.metrics.TotalRPCCalls.Inc() b.metrics.BlockHeaderCalls.Inc() diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index b0af463f6a6..4813218ca33 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "math/big" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -61,7 +62,7 @@ func Test_BlockNumberReturns_FreshCache(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: cachedBlock, timestamp: now, }) @@ -91,7 +92,7 @@ func Test_BlockNumber_ReturnsCalculatedBlock(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: anchorBlock, timestamp: now.Add(-time.Duration(elapsedBlocks) * testBlockTime), }) @@ -124,7 +125,7 @@ func Test_BlockNumber_ExpiredAnchor(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: staleBlock, timestamp: now.Add(-time.Duration(elapsedBlocks) * testBlockTime), }) @@ -161,7 +162,7 @@ func Test_BlockNumber_ExpiredAnchor_RetriesAfterRPCError(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: staleBlock, timestamp: now.Add(-time.Duration(elapsedBlocks) * testBlockTime), }) @@ -180,14 +181,137 @@ func Test_BlockNumber_ExpiredAnchor_RetriesAfterRPCError(t *testing.T) { }) } +func Test_BlockNumber_UsesAverageBlockTime(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const genesisBlock = uint64(100) + + const ( + realBlockTime = 7 * time.Second + configBlockTime = 2 * time.Second + blockSyncInterval = uint64(3) + cacheHitAdvance = 4 * time.Second + untilSecondPoll = 4 * time.Second + ) + + // Chain produces blocks every 7s; goroutine must exit before the bubble ends. + chain := newChainSimulator(genesisBlock, realBlockTime) + go chain.advanceEvery(realBlockTime, 1) + synctest.Wait() + + var headerCalls atomic.Int32 + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + assert.Nil(t, number) + return chain.header(), nil + }), + ) + + // Step 1: initial poll, anchor at block 100, uses pre-set blocktime for further calculations + first, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock, first) + assert.Equal(t, int32(1), headerCalls.Load()) + + time.Sleep(cacheHitAdvance) + synctest.Wait() + + // Step 2: cache hit with config block time (2s) — node estimates 102, chain is at 100. + ahead, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Greater(t, ahead, chain.blockNumber()) + + time.Sleep(untilSecondPoll) + synctest.Wait() + + // Step 3: cache expired, second poll — average block time becomes 7s, estimate realigns to 101. + corrected, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, chain.blockNumber(), corrected) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(cacheHitAdvance) + synctest.Wait() + + // Step 4: cache hit uses average block time: block from shouldn't be ahead + steady, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, chain.blockNumber(), steady) + assert.NotEqual(t, corrected+1, steady) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +type chainSimulator struct { + mu sync.Mutex + genesisTime time.Time + genesisBlock uint64 + block uint64 + blockTime time.Duration +} + +func newChainSimulator(genesisBlock uint64, blockTime time.Duration) *chainSimulator { + return &chainSimulator{ + genesisTime: time.Now().UTC(), + genesisBlock: genesisBlock, + block: genesisBlock, + blockTime: blockTime, + } +} + +func (c *chainSimulator) advanceEvery(blockTime time.Duration, count int) { + for range count { + time.Sleep(blockTime) + c.mu.Lock() + c.block++ + c.mu.Unlock() + } +} + +func (c *chainSimulator) header() *types.Header { + c.mu.Lock() + defer c.mu.Unlock() + + blockOffset := c.block - c.genesisBlock + return &types.Header{ + Number: big.NewInt(int64(c.block)), + Time: uint64(c.genesisTime.Add(c.blockTime * time.Duration(blockOffset)).Unix()), + } +} + +func (c *chainSimulator) blockNumber() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.block +} + +func setBlockAnchor(backend *wrappedBackend, anchor blockNumberAnchor) { + backend.blockNumberCache.Set(anchor) +} + func newTestWrappedBackend(t *testing.T, opts ...backendmock.Option) *wrappedBackend { + return newTestWrappedBackendWithConfig(t, testBlockTime, testBlockSyncInterval, opts...) +} + +func newTestWrappedBackendWithConfig( + t *testing.T, + blockTime time.Duration, + blockSyncInterval uint64, + opts ...backendmock.Option, +) *wrappedBackend { t.Helper() backend, ok := NewBackend( backendmock.New(opts...), testMinimumGasTipCap, - testBlockTime, - testBlockSyncInterval, + blockTime, + blockSyncInterval, ).(*wrappedBackend) assert.True(t, ok) From deaf6d8a4a3814ccb1313aae949fe5f23eeefff5 Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 17 Jun 2026 15:02:02 +0200 Subject: [PATCH 2/7] fix: metric + lint fix + comments --- pkg/transaction/wrapped/cache/cache.go | 7 +++- pkg/transaction/wrapped/metrics.go | 7 ++++ pkg/transaction/wrapped/wrapped.go | 10 ++++- pkg/transaction/wrapped/wrapped_test.go | 49 +++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go index 0e717cb96db..1a72e1e7461 100644 --- a/pkg/transaction/wrapped/cache/cache.go +++ b/pkg/transaction/wrapped/cache/cache.go @@ -13,7 +13,10 @@ import ( ) type ( - Loader[T any] func(prev T) (T, error) + // Loader loads a fresh value when the cache is stale. Prev is the last + // successfully cached value, read immediately before the load runs. + Loader[T any] func(prev T) (T, error) + // ReuseEvaluator reports whether a cached value may still be used. ReuseEvaluator[T any] func(value T) bool ) @@ -50,6 +53,8 @@ func (c *SingleFlightCache[T]) Set(value T) { c.value = value } +// PeekOrLoad returns the cached value when canReuse allows it. Otherwise it +// loads a new value via loader, passing the current cache entry as prev. func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEvaluator[T], loader Loader[T]) (T, error) { c.mu.RLock() value := c.value diff --git a/pkg/transaction/wrapped/metrics.go b/pkg/transaction/wrapped/metrics.go index e4a0f8ee3bd..60a5023ef17 100644 --- a/pkg/transaction/wrapped/metrics.go +++ b/pkg/transaction/wrapped/metrics.go @@ -27,6 +27,7 @@ type metrics struct { SendTransactionCalls prometheus.Counter FilterLogsCalls prometheus.Counter ChainIDCalls prometheus.Counter + AverageBlockTimeSeconds prometheus.Gauge } func newMetrics() metrics { @@ -129,6 +130,12 @@ func newMetrics() metrics { Name: "calls_chain_id", Help: "Count of eth_chainId rpc calls", }), + AverageBlockTimeSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "average_block_time_seconds", + Help: "Observed average block time in seconds, updated on each block number cache refresh.", + }), } } diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 900825f5035..b9923c83fab 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -103,11 +103,13 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { newNumber := header.Number.Uint64() newTime := time.Unix(int64(header.Time), 0).UTC() + averageBlockTime := b.computeAverageBlockTime(prev, newNumber, newTime, time.Now().UTC()) + b.metrics.AverageBlockTimeSeconds.Set(averageBlockTime.Seconds()) return blockNumberAnchor{ number: newNumber, timestamp: newTime, - averageBlockTime: b.computeAverageBlockTime(prev, newNumber, newTime), + averageBlockTime: averageBlockTime, }, nil } @@ -141,7 +143,11 @@ func (b *wrappedBackend) estimatedBlockNumberWithElapsed(anchor blockNumberAncho return anchor.number + elapsedBlocks, elapsedBlocks } -func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime time.Time) time.Duration { +// computeAverageBlockTime returns the observed block time between prev and the +// freshly loaded header. Falls back to the configured block time only when prev +// is unset. When the chain block number did not advance, wall-clock elapsed +// since prev is treated as the time for one block. +func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime, now time.Time) time.Duration { if prev.number == 0 || newNumber <= prev.number { return b.blockTime } diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index 4813218ca33..3ea21208eff 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -247,6 +247,53 @@ func Test_BlockNumber_UsesAverageBlockTime(t *testing.T) { }) } +func Test_BlockNumber_AverageBlockTimeWhenChainHasNotAdvanced(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const ( + genesisBlock = uint64(100) + configBlockTime = 2 * time.Second + blockSyncInterval = uint64(3) + untilSecondPoll = 6 * time.Second + ) + + chain := newChainSimulator(genesisBlock, 7*time.Second) + + var headerCalls atomic.Int32 + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + return chain.header(), nil + }), + ) + + first, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock, first) + + time.Sleep(untilSecondPoll) + synctest.Wait() + + second, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock, second) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(4 * time.Second) + synctest.Wait() + + // With observed ~6s average, extrapolation stays at 100. With config 2s it would reach 102. + third, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock, third) + assert.NotEqual(t, genesisBlock+2, third) + }) +} + type chainSimulator struct { mu sync.Mutex genesisTime time.Time @@ -296,6 +343,8 @@ func setBlockAnchor(backend *wrappedBackend, anchor blockNumberAnchor) { } func newTestWrappedBackend(t *testing.T, opts ...backendmock.Option) *wrappedBackend { + t.Helper() + return newTestWrappedBackendWithConfig(t, testBlockTime, testBlockSyncInterval, opts...) } From 340915f48ec24ff531cb0169d3c978dd93a2f6da Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 17 Jun 2026 15:12:18 +0200 Subject: [PATCH 3/7] fix: cleanup --- pkg/transaction/wrapped/wrapped.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index b9923c83fab..5f28254567c 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -103,7 +103,7 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { newNumber := header.Number.Uint64() newTime := time.Unix(int64(header.Time), 0).UTC() - averageBlockTime := b.computeAverageBlockTime(prev, newNumber, newTime, time.Now().UTC()) + averageBlockTime := b.computeAverageBlockTime(prev, newNumber, newTime) b.metrics.AverageBlockTimeSeconds.Set(averageBlockTime.Seconds()) return blockNumberAnchor{ @@ -147,7 +147,7 @@ func (b *wrappedBackend) estimatedBlockNumberWithElapsed(anchor blockNumberAncho // freshly loaded header. Falls back to the configured block time only when prev // is unset. When the chain block number did not advance, wall-clock elapsed // since prev is treated as the time for one block. -func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime, now time.Time) time.Duration { +func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime time.Time) time.Duration { if prev.number == 0 || newNumber <= prev.number { return b.blockTime } From f344a6f06d2eaf12c698f94bb8d52a611c6278fb Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 17 Jun 2026 15:22:10 +0200 Subject: [PATCH 4/7] fix: remove useless test --- pkg/transaction/wrapped/wrapped_test.go | 47 ------------------------- 1 file changed, 47 deletions(-) diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index 3ea21208eff..9fd0f45722b 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -247,53 +247,6 @@ func Test_BlockNumber_UsesAverageBlockTime(t *testing.T) { }) } -func Test_BlockNumber_AverageBlockTimeWhenChainHasNotAdvanced(t *testing.T) { - t.Parallel() - - synctest.Test(t, func(t *testing.T) { - const ( - genesisBlock = uint64(100) - configBlockTime = 2 * time.Second - blockSyncInterval = uint64(3) - untilSecondPoll = 6 * time.Second - ) - - chain := newChainSimulator(genesisBlock, 7*time.Second) - - var headerCalls atomic.Int32 - backend := newTestWrappedBackendWithConfig( - t, - configBlockTime, - blockSyncInterval, - backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { - headerCalls.Add(1) - return chain.header(), nil - }), - ) - - first, err := backend.BlockNumber(context.Background()) - assert.NoError(t, err) - assert.Equal(t, genesisBlock, first) - - time.Sleep(untilSecondPoll) - synctest.Wait() - - second, err := backend.BlockNumber(context.Background()) - assert.NoError(t, err) - assert.Equal(t, genesisBlock, second) - assert.Equal(t, int32(2), headerCalls.Load()) - - time.Sleep(4 * time.Second) - synctest.Wait() - - // With observed ~6s average, extrapolation stays at 100. With config 2s it would reach 102. - third, err := backend.BlockNumber(context.Background()) - assert.NoError(t, err) - assert.Equal(t, genesisBlock, third) - assert.NotEqual(t, genesisBlock+2, third) - }) -} - type chainSimulator struct { mu sync.Mutex genesisTime time.Time From 12b686364ba4ad4ceebd5571d460757a6e287fce Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 17 Jun 2026 16:01:46 +0200 Subject: [PATCH 5/7] fix: cap block time average to 5 min --- pkg/transaction/wrapped/wrapped.go | 9 +- pkg/transaction/wrapped/wrapped_test.go | 126 ++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 5f28254567c..06525e355c2 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -18,6 +18,8 @@ import ( "github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache" ) +const maxAverageBlockTime = 5 * time.Minute + var _ transaction.Backend = (*wrappedBackend)(nil) type blockNumberAnchor struct { @@ -158,7 +160,12 @@ func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumb elapsed := newTime.Sub(prev.timestamp) blocks := newNumber - prev.number - return elapsed / time.Duration(blocks) + averageBlockTime := elapsed / time.Duration(blocks) + if averageBlockTime > maxAverageBlockTime { + return maxAverageBlockTime + } + + return averageBlockTime } func (b *wrappedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index 9fd0f45722b..4ee29e4b3e3 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -247,6 +247,132 @@ func Test_BlockNumber_UsesAverageBlockTime(t *testing.T) { }) } +func Test_BlockNumber_AverageBlockTimeResetsWhenChainStuck(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const ( + genesisBlock = uint64(100) + configBlockTime = 5 * time.Second + observedBlockTime = 7 * time.Second + blockSyncInterval = uint64(100) + ) + + anchorTime := time.Now().UTC() + var headerCalls atomic.Int32 + + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + call := headerCalls.Add(1) + switch call { + case 1: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock)), + Time: uint64(anchorTime.Unix()), + }, nil + case 2: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock + 1)), + Time: uint64(anchorTime.Add(observedBlockTime).Unix()), + }, nil + case 3: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock + 1)), + Time: uint64(anchorTime.Add(observedBlockTime).Unix()), + }, nil + default: + return nil, errors.New("unexpected rpc call") + } + }), + ) + + _, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + + time.Sleep(time.Duration(blockSyncInterval) * configBlockTime) + synctest.Wait() + + _, err = backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(time.Duration(blockSyncInterval) * observedBlockTime) + synctest.Wait() + + afterStuckPoll, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int32(3), headerCalls.Load()) + + elapsed := time.Duration(blockSyncInterval)*configBlockTime + time.Duration(blockSyncInterval)*observedBlockTime + chainElapsed := elapsed - observedBlockTime + expectedWithConfig := genesisBlock + 1 + uint64(chainElapsed/configBlockTime) + expectedWithObserved := genesisBlock + 1 + uint64(chainElapsed/observedBlockTime) + + assert.Equal(t, expectedWithConfig, afterStuckPoll) + assert.NotEqual(t, expectedWithObserved, afterStuckPoll) + }) +} + +func Test_BlockNumber_AverageBlockTimeCappedAtMax(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const ( + genesisBlock = uint64(100) + configBlockTime = 5 * time.Second + blockSyncInterval = uint64(10) + ) + + anchorTime := time.Now().UTC() + var headerCalls atomic.Int32 + + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + call := headerCalls.Add(1) + switch call { + case 1: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock)), + Time: uint64(anchorTime.Unix()), + }, nil + case 2: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock + 1)), + Time: uint64(anchorTime.Add(time.Hour).Unix()), + }, nil + default: + return nil, errors.New("unexpected rpc call") + } + }), + ) + + _, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + + time.Sleep(time.Duration(blockSyncInterval) * configBlockTime) + synctest.Wait() + + got, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock+1, got) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(time.Hour + maxAverageBlockTime) + synctest.Wait() + + got, err = backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock+2, got) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + type chainSimulator struct { mu sync.Mutex genesisTime time.Time From 51d5dc4927547c5ea6f9c726326e7d72f4cfcad5 Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 17 Jun 2026 16:35:52 +0200 Subject: [PATCH 6/7] fix: cap block time average to 30 sec --- pkg/transaction/wrapped/wrapped.go | 2 +- pkg/transaction/wrapped/wrapped_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index 06525e355c2..c7634eca6b0 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -18,7 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache" ) -const maxAverageBlockTime = 5 * time.Minute +const maxAverageBlockTime = 30 * time.Second var _ transaction.Backend = (*wrappedBackend)(nil) diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index 4ee29e4b3e3..0817f0172f5 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -322,7 +322,7 @@ func Test_BlockNumber_AverageBlockTimeCappedAtMax(t *testing.T) { synctest.Test(t, func(t *testing.T) { const ( genesisBlock = uint64(100) - configBlockTime = 5 * time.Second + configBlockTime = 3 * time.Second blockSyncInterval = uint64(10) ) @@ -363,7 +363,7 @@ func Test_BlockNumber_AverageBlockTimeCappedAtMax(t *testing.T) { assert.Equal(t, genesisBlock+1, got) assert.Equal(t, int32(2), headerCalls.Load()) - time.Sleep(time.Hour + maxAverageBlockTime) + time.Sleep(time.Hour + maxAverageBlockTime/2) // chain stuck and then resumed synctest.Wait() got, err = backend.BlockNumber(context.Background()) From e74fbec6d8e8382767b3f3c8c7028bb6bf102533 Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 18 Jun 2026 12:04:56 +0200 Subject: [PATCH 7/7] fix: cosmetic fixes --- pkg/transaction/wrapped/wrapped.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index c7634eca6b0..416456bcfc6 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -146,14 +146,11 @@ func (b *wrappedBackend) estimatedBlockNumberWithElapsed(anchor blockNumberAncho } // computeAverageBlockTime returns the observed block time between prev and the -// freshly loaded header. Falls back to the configured block time only when prev -// is unset. When the chain block number did not advance, wall-clock elapsed -// since prev is treated as the time for one block. +// freshly loaded header. Falls back to the configured block time when prev is +// unset, the block number did not increase, or the header timestamp did not +// strictly increase. func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime time.Time) time.Duration { - if prev.number == 0 || newNumber <= prev.number { - return b.blockTime - } - if !newTime.After(prev.timestamp) { + if prev.number == 0 || newNumber <= prev.number || !newTime.After(prev.timestamp) { return b.blockTime }