diff --git a/pkg/transaction/wrapped/cache/cache.go b/pkg/transaction/wrapped/cache/cache.go index 66d161253ac..1a72e1e7461 100644 --- a/pkg/transaction/wrapped/cache/cache.go +++ b/pkg/transaction/wrapped/cache/cache.go @@ -13,7 +13,10 @@ import ( ) type ( - Loader[T any] func() (T, error) + // Loader loads a fresh value when the cache is stale. Prev is the last + // successfully cached value, read immediately before the load runs. + Loader[T any] func(prev T) (T, error) + // ReuseEvaluator reports whether a cached value may still be used. ReuseEvaluator[T any] func(value T) bool ) @@ -50,6 +53,8 @@ func (c *SingleFlightCache[T]) Set(value T) { c.value = value } +// PeekOrLoad returns the cached value when canReuse allows it. Otherwise it +// loads a new value via loader, passing the current cache entry as prev. func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEvaluator[T], loader Loader[T]) (T, error) { c.mu.RLock() value := c.value @@ -64,7 +69,12 @@ func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEva result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) { c.metrics.Loads.Inc() - value, err := loader() + + c.mu.RLock() + prev := c.value + c.mu.RUnlock() + + value, err := loader(prev) if err != nil { c.metrics.LoadErrors.Inc() return value, err diff --git a/pkg/transaction/wrapped/cache/cache_test.go b/pkg/transaction/wrapped/cache/cache_test.go index d954a7aee4b..9010d185116 100644 --- a/pkg/transaction/wrapped/cache/cache_test.go +++ b/pkg/transaction/wrapped/cache/cache_test.go @@ -37,7 +37,7 @@ func TestPeekOrLoadHit(t *testing.T) { func(value uint64) bool { return true }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 0, errors.New("loader must not run") }, @@ -59,7 +59,7 @@ func TestPeekOrLoadMiss(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 99, nil }, @@ -75,7 +75,7 @@ func TestPeekOrLoadMiss(t *testing.T) { func(value uint64) bool { return true }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { verifyLoads.Add(1) return 0, errors.New("unexpected load on verify") }, @@ -97,7 +97,7 @@ func TestPeekOrLoadError(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 99, errLoad }, @@ -112,7 +112,7 @@ func TestPeekOrLoadError(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) return 0, errLoad }, @@ -143,7 +143,7 @@ func TestPeekOrLoadSingleflight(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { loadCount.Add(1) <-gate return value, nil @@ -186,7 +186,7 @@ func TestPeekOrLoadContextCancellation(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { <-gate return expectedVal, nil }, @@ -200,7 +200,7 @@ func TestPeekOrLoadContextCancellation(t *testing.T) { func(value uint64) bool { return false }, - func() (uint64, error) { + func(_ uint64) (uint64, error) { <-gate return expectedVal, nil }, diff --git a/pkg/transaction/wrapped/metrics.go b/pkg/transaction/wrapped/metrics.go index e4a0f8ee3bd..60a5023ef17 100644 --- a/pkg/transaction/wrapped/metrics.go +++ b/pkg/transaction/wrapped/metrics.go @@ -27,6 +27,7 @@ type metrics struct { SendTransactionCalls prometheus.Counter FilterLogsCalls prometheus.Counter ChainIDCalls prometheus.Counter + AverageBlockTimeSeconds prometheus.Gauge } func newMetrics() metrics { @@ -129,6 +130,12 @@ func newMetrics() metrics { Name: "calls_chain_id", Help: "Count of eth_chainId rpc calls", }), + AverageBlockTimeSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "average_block_time_seconds", + Help: "Observed average block time in seconds, updated on each block number cache refresh.", + }), } } diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index c8afd8599ab..416456bcfc6 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -18,11 +18,14 @@ import ( "github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache" ) +const maxAverageBlockTime = 30 * time.Second + var _ transaction.Backend = (*wrappedBackend)(nil) type blockNumberAnchor struct { - number uint64 - timestamp time.Time + number uint64 + timestamp time.Time + averageBlockTime time.Duration } type wrappedBackend struct { @@ -86,7 +89,7 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { return elapsedBlocks < b.blockSyncInterval } - loadFreshBlockFn := func() (blockNumberAnchor, error) { + loadFreshBlockFn := func(prev blockNumberAnchor) (blockNumberAnchor, error) { b.metrics.TotalRPCCalls.Inc() b.metrics.BlockHeaderAsBlockNumberCalls.Inc() @@ -99,9 +102,16 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) { b.metrics.TotalRPCErrors.Inc() return blockNumberAnchor{}, errors.New("latest block header unavailable") } + + newNumber := header.Number.Uint64() + newTime := time.Unix(int64(header.Time), 0).UTC() + averageBlockTime := b.computeAverageBlockTime(prev, newNumber, newTime) + b.metrics.AverageBlockTimeSeconds.Set(averageBlockTime.Seconds()) + return blockNumberAnchor{ - number: header.Number.Uint64(), - timestamp: time.Unix(int64(header.Time), 0).UTC(), + number: newNumber, + timestamp: newTime, + averageBlockTime: averageBlockTime, }, nil } @@ -126,10 +136,35 @@ func (b *wrappedBackend) estimatedBlockNumberWithElapsed(anchor blockNumberAncho return anchor.number, 0 } - elapsedBlocks := uint64(now.Sub(anchor.timestamp) / b.blockTime) + blockTime := anchor.averageBlockTime + if blockTime == 0 { + blockTime = b.blockTime + } + + elapsedBlocks := uint64(now.Sub(anchor.timestamp) / blockTime) return anchor.number + elapsedBlocks, elapsedBlocks } +// computeAverageBlockTime returns the observed block time between prev and the +// freshly loaded header. Falls back to the configured block time when prev is +// unset, the block number did not increase, or the header timestamp did not +// strictly increase. +func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime time.Time) time.Duration { + if prev.number == 0 || newNumber <= prev.number || !newTime.After(prev.timestamp) { + return b.blockTime + } + + elapsed := newTime.Sub(prev.timestamp) + blocks := newNumber - prev.number + + averageBlockTime := elapsed / time.Duration(blocks) + if averageBlockTime > maxAverageBlockTime { + return maxAverageBlockTime + } + + return averageBlockTime +} + func (b *wrappedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { b.metrics.TotalRPCCalls.Inc() b.metrics.BlockHeaderCalls.Inc() diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index b0af463f6a6..0817f0172f5 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "math/big" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -61,7 +62,7 @@ func Test_BlockNumberReturns_FreshCache(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: cachedBlock, timestamp: now, }) @@ -91,7 +92,7 @@ func Test_BlockNumber_ReturnsCalculatedBlock(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: anchorBlock, timestamp: now.Add(-time.Duration(elapsedBlocks) * testBlockTime), }) @@ -124,7 +125,7 @@ func Test_BlockNumber_ExpiredAnchor(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: staleBlock, timestamp: now.Add(-time.Duration(elapsedBlocks) * testBlockTime), }) @@ -161,7 +162,7 @@ func Test_BlockNumber_ExpiredAnchor_RetriesAfterRPCError(t *testing.T) { })) now := time.Now().UTC() - backend.blockNumberCache.Set(blockNumberAnchor{ + setBlockAnchor(backend, blockNumberAnchor{ number: staleBlock, timestamp: now.Add(-time.Duration(elapsedBlocks) * testBlockTime), }) @@ -180,14 +181,265 @@ func Test_BlockNumber_ExpiredAnchor_RetriesAfterRPCError(t *testing.T) { }) } +func Test_BlockNumber_UsesAverageBlockTime(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const genesisBlock = uint64(100) + + const ( + realBlockTime = 7 * time.Second + configBlockTime = 2 * time.Second + blockSyncInterval = uint64(3) + cacheHitAdvance = 4 * time.Second + untilSecondPoll = 4 * time.Second + ) + + // Chain produces blocks every 7s; goroutine must exit before the bubble ends. + chain := newChainSimulator(genesisBlock, realBlockTime) + go chain.advanceEvery(realBlockTime, 1) + synctest.Wait() + + var headerCalls atomic.Int32 + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + headerCalls.Add(1) + assert.Nil(t, number) + return chain.header(), nil + }), + ) + + // Step 1: initial poll, anchor at block 100, uses pre-set blocktime for further calculations + first, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock, first) + assert.Equal(t, int32(1), headerCalls.Load()) + + time.Sleep(cacheHitAdvance) + synctest.Wait() + + // Step 2: cache hit with config block time (2s) — node estimates 102, chain is at 100. + ahead, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Greater(t, ahead, chain.blockNumber()) + + time.Sleep(untilSecondPoll) + synctest.Wait() + + // Step 3: cache expired, second poll — average block time becomes 7s, estimate realigns to 101. + corrected, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, chain.blockNumber(), corrected) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(cacheHitAdvance) + synctest.Wait() + + // Step 4: cache hit uses average block time: block from shouldn't be ahead + steady, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, chain.blockNumber(), steady) + assert.NotEqual(t, corrected+1, steady) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +func Test_BlockNumber_AverageBlockTimeResetsWhenChainStuck(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const ( + genesisBlock = uint64(100) + configBlockTime = 5 * time.Second + observedBlockTime = 7 * time.Second + blockSyncInterval = uint64(100) + ) + + anchorTime := time.Now().UTC() + var headerCalls atomic.Int32 + + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + call := headerCalls.Add(1) + switch call { + case 1: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock)), + Time: uint64(anchorTime.Unix()), + }, nil + case 2: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock + 1)), + Time: uint64(anchorTime.Add(observedBlockTime).Unix()), + }, nil + case 3: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock + 1)), + Time: uint64(anchorTime.Add(observedBlockTime).Unix()), + }, nil + default: + return nil, errors.New("unexpected rpc call") + } + }), + ) + + _, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + + time.Sleep(time.Duration(blockSyncInterval) * configBlockTime) + synctest.Wait() + + _, err = backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(time.Duration(blockSyncInterval) * observedBlockTime) + synctest.Wait() + + afterStuckPoll, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int32(3), headerCalls.Load()) + + elapsed := time.Duration(blockSyncInterval)*configBlockTime + time.Duration(blockSyncInterval)*observedBlockTime + chainElapsed := elapsed - observedBlockTime + expectedWithConfig := genesisBlock + 1 + uint64(chainElapsed/configBlockTime) + expectedWithObserved := genesisBlock + 1 + uint64(chainElapsed/observedBlockTime) + + assert.Equal(t, expectedWithConfig, afterStuckPoll) + assert.NotEqual(t, expectedWithObserved, afterStuckPoll) + }) +} + +func Test_BlockNumber_AverageBlockTimeCappedAtMax(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + const ( + genesisBlock = uint64(100) + configBlockTime = 3 * time.Second + blockSyncInterval = uint64(10) + ) + + anchorTime := time.Now().UTC() + var headerCalls atomic.Int32 + + backend := newTestWrappedBackendWithConfig( + t, + configBlockTime, + blockSyncInterval, + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + call := headerCalls.Add(1) + switch call { + case 1: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock)), + Time: uint64(anchorTime.Unix()), + }, nil + case 2: + return &types.Header{ + Number: big.NewInt(int64(genesisBlock + 1)), + Time: uint64(anchorTime.Add(time.Hour).Unix()), + }, nil + default: + return nil, errors.New("unexpected rpc call") + } + }), + ) + + _, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + + time.Sleep(time.Duration(blockSyncInterval) * configBlockTime) + synctest.Wait() + + got, err := backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock+1, got) + assert.Equal(t, int32(2), headerCalls.Load()) + + time.Sleep(time.Hour + maxAverageBlockTime/2) // chain stuck and then resumed + synctest.Wait() + + got, err = backend.BlockNumber(context.Background()) + assert.NoError(t, err) + assert.Equal(t, genesisBlock+2, got) + assert.Equal(t, int32(2), headerCalls.Load()) + }) +} + +type chainSimulator struct { + mu sync.Mutex + genesisTime time.Time + genesisBlock uint64 + block uint64 + blockTime time.Duration +} + +func newChainSimulator(genesisBlock uint64, blockTime time.Duration) *chainSimulator { + return &chainSimulator{ + genesisTime: time.Now().UTC(), + genesisBlock: genesisBlock, + block: genesisBlock, + blockTime: blockTime, + } +} + +func (c *chainSimulator) advanceEvery(blockTime time.Duration, count int) { + for range count { + time.Sleep(blockTime) + c.mu.Lock() + c.block++ + c.mu.Unlock() + } +} + +func (c *chainSimulator) header() *types.Header { + c.mu.Lock() + defer c.mu.Unlock() + + blockOffset := c.block - c.genesisBlock + return &types.Header{ + Number: big.NewInt(int64(c.block)), + Time: uint64(c.genesisTime.Add(c.blockTime * time.Duration(blockOffset)).Unix()), + } +} + +func (c *chainSimulator) blockNumber() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.block +} + +func setBlockAnchor(backend *wrappedBackend, anchor blockNumberAnchor) { + backend.blockNumberCache.Set(anchor) +} + func newTestWrappedBackend(t *testing.T, opts ...backendmock.Option) *wrappedBackend { t.Helper() + return newTestWrappedBackendWithConfig(t, testBlockTime, testBlockSyncInterval, opts...) +} + +func newTestWrappedBackendWithConfig( + t *testing.T, + blockTime time.Duration, + blockSyncInterval uint64, + opts ...backendmock.Option, +) *wrappedBackend { + t.Helper() + backend, ok := NewBackend( backendmock.New(opts...), testMinimumGasTipCap, - testBlockTime, - testBlockSyncInterval, + blockTime, + blockSyncInterval, ).(*wrappedBackend) assert.True(t, ok)