Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage/batchstore"
"github.com/ethersphere/bee/v2/pkg/postage/listener"
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
"github.com/ethersphere/bee/v2/pkg/postage/snapshot"
"github.com/ethersphere/bee/v2/pkg/postage/snapshot/archive"
"github.com/ethersphere/bee/v2/pkg/pricer"
"github.com/ethersphere/bee/v2/pkg/pricing"
"github.com/ethersphere/bee/v2/pkg/pss"
Expand Down Expand Up @@ -787,11 +789,6 @@ func NewBee(
eventListener = listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)
b.listenerCloser = eventListener

batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
if err != nil {
return nil, fmt.Errorf("init batch service: %w", err)
}

// Construct protocols.
pingPong := pingpong.New(p2ps, logger, tracer)

Expand Down Expand Up @@ -892,28 +889,29 @@ func NewBee(
}
)

// When the postage snapshot applies, hand it to the batch service so it
// rebuilds the store from the snapshot during construction; otherwise the
// store is reset only when --resync is requested. Either way the store is
// reset in a single place (batchservice.New), which avoids a second reset
// wiping the snapshot that was just loaded (see #5495).
var batchSnapshot *batchservice.Snapshot
if !o.SkipPostageSnapshot && !batchStoreExists && (networkID == mainnetNetworkID) && beeNodeMode != api.UltraLightMode {
chainBackend := NewSnapshotLogFilterer(logger, archiveSnapshotGetter{})

snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)

snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
batchSnapshot, err = snapshot.New(ctx, logger, archive.Getter{}, b.syncingStopped, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout, postageSyncStart)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I think passing empty archive.Getter{} looks strange, it makes me wondering why it is needed. So I would suggest, in archive.go use var Default Getter and then pass here archive.Default

if err != nil {
logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...")
} else {
err = snapshotBatchSvc.Start(ctx, postageSyncStart)
syncStatus.Store(true)
if err != nil {
syncErr.Store(err)
logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...")
} else {
postageSyncStart = chainBackend.maxBlockHeight
}
}
if errClose := snapshotEventListener.Close(); errClose != nil {
logger.Error(errClose, "failed to close event listener (snapshot) failure")
// A corrupt snapshot is not fatal: rebuild from the chain instead.
logger.Error(err, "postage snapshot unavailable, syncing from chain instead")
}
}

var snapshotLoaded bool
batchSvc, snapshotLoaded, err = batchservice.New(ctx, stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, batchSnapshot, o.Resync)
if err != nil {
return nil, fmt.Errorf("init batch service: %w", err)
}
if snapshotLoaded {
// The snapshot rebuilt the store up to its block height, so the node can
// already serve postage requests while the remaining gap syncs live.
syncStatus.Store(true)
}

if batchSvc != nil && chainEnabled {
Expand Down
154 changes: 120 additions & 34 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,51 @@ type batchService struct {
batchListener postage.BatchEventListener

checksum hash.Hash // checksum hasher
resync bool

// snapshotResumeBlock is the chain block height the store was rebuilt to
// from a postage snapshot. When set, live sync resumes from here.
snapshotResumeBlock uint64
}

type Interface interface {
postage.EventUpdater
}

// Snapshot carries the optional inputs needed to rebuild the batch store from a
// postage snapshot. When passed to New (non-nil), the store is reset and
// replayed from the snapshot before the service is returned, and live sync
// later resumes from the snapshot's block height. New takes ownership of
// Listener and closes it once the snapshot has been replayed.
type Snapshot struct {
// Listener replays the snapshot's events into the batch store.
Listener postage.Listener
// StartBlock is the block height from which the snapshot is replayed (the
// postage contract start block).
StartBlock uint64
// ResumeBlock is the block height the snapshot reached, from which live sync
// resumes once the replay completes.
ResumeBlock uint64
}

// New will create a new BatchService.
//
// The batch store is reset here, in the constructor, when a resync was requested
// or a dirty shutdown was detected. A provided snapshot is then replayed onto the
// (possibly reset) store; if that replay fails the store is reset again so live
// sync can rebuild it from the chain. Start never resets the store. The returned
// bool reports whether the snapshot was replayed successfully.
func New(
ctx context.Context,
stateStore storage.StateStorer,
storer postage.Storer,
logger log.Logger,
listener postage.Listener,
owner []byte,
batchListener postage.BatchEventListener,
checksumFunc func() hash.Hash,
snapshot *Snapshot,
resync bool,
) (Interface, error) {
) (Interface, bool, error) {
if checksumFunc == nil {
checksumFunc = sha3.New256
}
Expand All @@ -65,37 +92,113 @@ func New(
sum = checksumFunc()
)

logger = logger.WithName(loggerName).Register()

dirty := false
err := stateStore.Get(dirtyDBKey, &dirty)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err
if err := stateStore.Get(dirtyDBKey, &dirty); err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, false, err
}
if dirty {
logger.Warning("batch service: dirty shutdown detected, resetting batch store")
}

if resync {
logger.Warning("batch service: resync requested, resetting batch store")
if err := stateStore.Delete(checksumDBKey); err != nil {
return nil, err
return nil, false, err
}
} else if !dirty {
if err := stateStore.Get(checksumDBKey, &b); err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
return nil, false, err
}
} else {
s, err := hex.DecodeString(b)
if err != nil {
return nil, err
return nil, false, err
}
n, err := sum.Write(s)
if err != nil {
return nil, err
return nil, false, err
}
if n != len(s) {
return nil, errors.New("batchstore checksum init")
return nil, false, errors.New("batchstore checksum init")
}
}
}

bs := &batchService{
stateStore: stateStore,
storer: storer,
logger: logger,
listener: listener,
owner: owner,
batchListener: batchListener,
checksum: sum,
}

// Reset the store once, here, when a resync was requested or a dirty shutdown
// was detected (both already logged above). A snapshot is then replayed onto
// the store; Start does not reset, so this is the only unconditional reset.
if dirty || resync {
if err := bs.reset(); err != nil {
return nil, false, err
}
}

snapshotLoaded := false
if snapshot != nil {
if err := bs.loadSnapshot(ctx, snapshot); err != nil {
logger.Error(err, "failed to start batch service from snapshot, continuing outside snapshot block...")
// A partial replay may have written to (and dirtied) the store, so
// reset it again to rebuild cleanly from the chain during live sync.
if err := bs.reset(); err != nil {
return nil, false, err
}
} else {
snapshotLoaded = true
}
}

return bs, snapshotLoaded, nil
}

// reset wipes the batch store so it can be rebuilt from scratch. It runs only
// during New. The reason for the reset is logged by the caller at the point of
// detection, so the intent is recorded even if the reset itself then fails.
func (svc *batchService) reset() error {
if err := svc.storer.Reset(); err != nil {
return err
}
if err := svc.stateStore.Delete(dirtyDBKey); err != nil {
return err
}
svc.logger.Warning("batch service: batch store has been reset. your node will now resync chain data. this might take a while...")

return nil
}

// loadSnapshot rebuilds the (already reset) store from a postage snapshot by
// replaying its events and records the block height to resume live sync from.
func (svc *batchService) loadSnapshot(ctx context.Context, snapshot *Snapshot) error {
defer func() {
if err := snapshot.Listener.Close(); err != nil {
svc.logger.Error(err, "failed to close event listener (snapshot) failure")
}
}()

startBlock := snapshot.StartBlock
if cs := svc.storer.GetChainState(); cs.Block > startBlock {
startBlock = cs.Block
}

return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, sum, resync}, nil
if err := <-snapshot.Listener.Listen(ctx, startBlock+1, svc); err != nil {
return err
}

svc.snapshotResumeBlock = snapshot.ResumeBlock

return nil
}

// Create will create a new batch with the given ID, owner value and depth and
Expand Down Expand Up @@ -242,33 +345,16 @@ func (svc *batchService) TransactionEnd() error {
var ErrInterruped = errors.New("postage sync interrupted")

func (svc *batchService) Start(ctx context.Context, startBlock uint64) (err error) {
dirty := false
err = svc.stateStore.Get(dirtyDBKey, &dirty)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}

if dirty || svc.resync {

if dirty {
svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store")
} else {
svc.logger.Warning("batch service: resync requested, resetting batch store")
}

if err := svc.storer.Reset(); err != nil {
return err
}
if err := svc.stateStore.Delete(dirtyDBKey); err != nil {
return err
}
svc.logger.Warning("batch service: batch store has been reset. your node will now resync chain data. this might take a while...")
}

// The store reset already happened in New, so Start only drives live sync.
cs := svc.storer.GetChainState()
if cs.Block > startBlock {
startBlock = cs.Block
}
// When the store was rebuilt from a snapshot, resume live sync from the
// snapshot's block height rather than the requested start block.
if svc.snapshotResumeBlock > startBlock {
startBlock = svc.snapshotResumeBlock
}

syncedChan := svc.listener.Listen(ctx, startBlock+1, svc)

Expand Down
Loading
Loading