Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/transaction/wrapped/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
)

type (
Loader[T any] func() (T, error)
// Loader loads a fresh value when the cache is stale. Prev is the last
// successfully cached value, read immediately before the load runs.
Loader[T any] func(prev T) (T, error)
// ReuseEvaluator reports whether a cached value may still be used.
ReuseEvaluator[T any] func(value T) bool
)

Expand Down Expand Up @@ -50,6 +53,8 @@ func (c *SingleFlightCache[T]) Set(value T) {
c.value = value
}

// PeekOrLoad returns the cached value when canReuse allows it. Otherwise it
// loads a new value via loader, passing the current cache entry as prev.
func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEvaluator[T], loader Loader[T]) (T, error) {
c.mu.RLock()
value := c.value
Expand All @@ -64,7 +69,12 @@ func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEva

result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) {
c.metrics.Loads.Inc()
value, err := loader()

c.mu.RLock()
prev := c.value
c.mu.RUnlock()

value, err := loader(prev)
if err != nil {
c.metrics.LoadErrors.Inc()
return value, err
Expand Down
16 changes: 8 additions & 8 deletions pkg/transaction/wrapped/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestPeekOrLoadHit(t *testing.T) {
func(value uint64) bool {
return true
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
loadCount.Add(1)
return 0, errors.New("loader must not run")
},
Expand All @@ -59,7 +59,7 @@ func TestPeekOrLoadMiss(t *testing.T) {
func(value uint64) bool {
return false
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
loadCount.Add(1)
return 99, nil
},
Expand All @@ -75,7 +75,7 @@ func TestPeekOrLoadMiss(t *testing.T) {
func(value uint64) bool {
return true
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
verifyLoads.Add(1)
return 0, errors.New("unexpected load on verify")
},
Expand All @@ -97,7 +97,7 @@ func TestPeekOrLoadError(t *testing.T) {
func(value uint64) bool {
return false
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
loadCount.Add(1)
return 99, errLoad
},
Expand All @@ -112,7 +112,7 @@ func TestPeekOrLoadError(t *testing.T) {
func(value uint64) bool {
return false
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
loadCount.Add(1)
return 0, errLoad
},
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestPeekOrLoadSingleflight(t *testing.T) {
func(value uint64) bool {
return false
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
loadCount.Add(1)
<-gate
return value, nil
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestPeekOrLoadContextCancellation(t *testing.T) {
func(value uint64) bool {
return false
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
<-gate
return expectedVal, nil
},
Expand All @@ -200,7 +200,7 @@ func TestPeekOrLoadContextCancellation(t *testing.T) {
func(value uint64) bool {
return false
},
func() (uint64, error) {
func(_ uint64) (uint64, error) {
<-gate
return expectedVal, nil
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/transaction/wrapped/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type metrics struct {
SendTransactionCalls prometheus.Counter
FilterLogsCalls prometheus.Counter
ChainIDCalls prometheus.Counter
AverageBlockTimeSeconds prometheus.Gauge
}

func newMetrics() metrics {
Expand Down Expand Up @@ -129,6 +130,12 @@ func newMetrics() metrics {
Name: "calls_chain_id",
Help: "Count of eth_chainId rpc calls",
}),
AverageBlockTimeSeconds: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "average_block_time_seconds",
Help: "Observed average block time in seconds, updated on each block number cache refresh.",
}),
}
}

Expand Down
47 changes: 41 additions & 6 deletions pkg/transaction/wrapped/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"github.com/ethersphere/bee/v2/pkg/transaction/wrapped/cache"
)

const maxAverageBlockTime = 30 * time.Second

var _ transaction.Backend = (*wrappedBackend)(nil)

type blockNumberAnchor struct {
number uint64
timestamp time.Time
number uint64
timestamp time.Time
averageBlockTime time.Duration
}

type wrappedBackend struct {
Expand Down Expand Up @@ -86,7 +89,7 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) {
return elapsedBlocks < b.blockSyncInterval
}

loadFreshBlockFn := func() (blockNumberAnchor, error) {
loadFreshBlockFn := func(prev blockNumberAnchor) (blockNumberAnchor, error) {
b.metrics.TotalRPCCalls.Inc()
b.metrics.BlockHeaderAsBlockNumberCalls.Inc()

Expand All @@ -99,9 +102,16 @@ func (b *wrappedBackend) BlockNumber(ctx context.Context) (uint64, error) {
b.metrics.TotalRPCErrors.Inc()
return blockNumberAnchor{}, errors.New("latest block header unavailable")
}

newNumber := header.Number.Uint64()
newTime := time.Unix(int64(header.Time), 0).UTC()
averageBlockTime := b.computeAverageBlockTime(prev, newNumber, newTime)
b.metrics.AverageBlockTimeSeconds.Set(averageBlockTime.Seconds())

return blockNumberAnchor{
number: header.Number.Uint64(),
timestamp: time.Unix(int64(header.Time), 0).UTC(),
number: newNumber,
timestamp: newTime,
averageBlockTime: averageBlockTime,
}, nil
}

Expand All @@ -126,10 +136,35 @@ func (b *wrappedBackend) estimatedBlockNumberWithElapsed(anchor blockNumberAncho
return anchor.number, 0
}

elapsedBlocks := uint64(now.Sub(anchor.timestamp) / b.blockTime)
blockTime := anchor.averageBlockTime
if blockTime == 0 {
blockTime = b.blockTime
}

elapsedBlocks := uint64(now.Sub(anchor.timestamp) / blockTime)
return anchor.number + elapsedBlocks, elapsedBlocks
}

// computeAverageBlockTime returns the observed block time between prev and the
// freshly loaded header. Falls back to the configured block time when prev is
// unset, the block number did not increase, or the header timestamp did not
// strictly increase.
func (b *wrappedBackend) computeAverageBlockTime(prev blockNumberAnchor, newNumber uint64, newTime time.Time) time.Duration {
if prev.number == 0 || newNumber <= prev.number || !newTime.After(prev.timestamp) {
return b.blockTime
}

elapsed := newTime.Sub(prev.timestamp)
blocks := newNumber - prev.number
Comment thread
acud marked this conversation as resolved.

averageBlockTime := elapsed / time.Duration(blocks)
if averageBlockTime > maxAverageBlockTime {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have a lower bound as minAverageBlockTime, for maybe some misbehaving RPCs returning a large block jump with a near-equal timestamp ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it is actually possible...

return maxAverageBlockTime
}

return averageBlockTime
}

func (b *wrappedBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
b.metrics.TotalRPCCalls.Inc()
b.metrics.BlockHeaderCalls.Inc()
Expand Down
Loading
Loading