Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,10 @@ func initL1SequencerComponents(
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create L1Sequencer caller: %w", err)
}
verifier = l1sequencer.NewSequencerVerifier(caller, logger)
verifier, err = l1sequencer.NewSequencerVerifier(caller, logger)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize sequencer verifier: %w", err)
}
logger.Info("Sequencer verifier initialized", "contract", contractAddr.Hex())
} else {
return nil, nil, nil, fmt.Errorf("L1 Sequencer contract address is required, check l1.sequencerContract configuration")
Expand Down Expand Up @@ -377,7 +380,7 @@ func startMetricsServer(addr string, logger tmlog.Logger) {
}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := &http.Server{Addr: addr, Handler: mux}
srv := &http.Server{Addr: addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("metrics server", "addr", addr, "err", err)
Expand Down
2 changes: 1 addition & 1 deletion node/derivation/derivation.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (d *Derivation) withReactorsQuiesced(ctx context.Context, batchIndex uint64
return err
}
defer func() {
// Use background context so a cancelled parent ctx doesn't
// Use background context so a canceled parent ctx doesn't
// prevent reactor restart.
height := preWrite
if cur, readErr := d.l2Client.BlockNumber(context.Background()); readErr == nil {
Expand Down
4 changes: 2 additions & 2 deletions node/hakeeper/block_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (f *BlockFSM) Snapshot() (raft.FSMSnapshot, error) {
// Reads the 8-byte appliedHeight from the snapshot. Does NOT call onApplied --
// geth state must be recovered independently (Fullnode P2P sync).
func (f *BlockFSM) Restore(rc io.ReadCloser) error {
defer rc.Close()
defer func() { _ = rc.Close() }()

data, err := io.ReadAll(rc)
if err != nil {
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *blockSnapshot) Persist(sink raft.SnapshotSink) error {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], s.height)
if _, err := sink.Write(buf[:]); err != nil {
sink.Cancel()
_ = sink.Cancel()
return fmt.Errorf("blockSnapshot.Persist: write failed: %w", err)
}
return sink.Close()
Expand Down
22 changes: 11 additions & 11 deletions node/hakeeper/ha_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ const (
// HAService implements the SequencerHA interface from tendermint/sequencer.
// It also satisfies rpc.ConsensusAdapter so it can be passed directly to the RPC server.
type HAService struct {
logger tmlog.Logger
cfg *Config
logger tmlog.Logger
cfg *Config
advertisedAddr string // resolved once in New(), used throughout
fsm *BlockFSM
rpcServer *hakeeperrpc.Server
fsm *BlockFSM
rpcServer *hakeeperrpc.Server

// Raft internals (initialised in Start)
// Raft internals (initialized in Start)
r *raft.Raft
transport *raft.NetworkTransport

Expand Down Expand Up @@ -69,7 +69,7 @@ func (h *HAService) SetOnBlockApplied(fn func(*types.BlockV2) error) {

// ── SequencerHA interface ────────────────────────────────────────────────────

// Start initialises Raft and the management RPC server.
// Start initializes Raft and the management RPC server.
// Called by StateV2.OnStart() at upgrade height.
func (h *HAService) Start() error {
if err := h.initRaft(); err != nil {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (h *HAService) Addr() string { return h.advertisedAddr }
// initRaft creates the Raft instance. Called once from Start().
// On failure, all opened resources are cleaned up via a single deferred closure.
func (h *HAService) initRaft() (retErr error) {
if err := os.MkdirAll(h.cfg.StorageDir, 0o755); err != nil {
if err := os.MkdirAll(h.cfg.StorageDir, 0o750); err != nil {
return fmt.Errorf("mkdir %q: %w", h.cfg.StorageDir, err)
}

Expand All @@ -287,13 +287,13 @@ func (h *HAService) initRaft() (retErr error) {
r.Shutdown()
}
if transport != nil {
transport.Close()
_ = transport.Close()
}
if stableStore != nil {
stableStore.Close()
_ = stableStore.Close()
}
if logStore != nil {
logStore.Close()
_ = logStore.Close()
}
}
}()
Expand Down Expand Up @@ -364,7 +364,7 @@ func (h *HAService) initRaft() (retErr error) {
h.r = r
h.transport = transport

h.logger.Info("hakeeper: raft initialised", "bind", bindAddr)
h.logger.Info("hakeeper: raft initialized", "bind", bindAddr)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions node/hakeeper/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"sync"
"time"

ethrpc "github.com/morph-l2/go-ethereum/rpc"
"github.com/pkg/errors"
Expand Down Expand Up @@ -40,8 +41,9 @@ func New(log log.Logger, listenAddr string, listenPort int, cons ConsensusAdapte

addr := fmt.Sprintf("%s:%d", listenAddr, listenPort)
httpSrv := &http.Server{
Addr: addr,
Handler: mux,
Addr: addr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
}

return &Server{
Expand Down
4 changes: 2 additions & 2 deletions node/l1sequencer/enclave_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *EnclaveSigner) probe() error {
if err := conn.SetDeadline(time.Now().Add(requestTimeout)); err != nil {
return fmt.Errorf("set deadline: %w", err)
}
defer conn.SetDeadline(time.Time{}) // clear on exit so signOnce can manage its own deadline
defer func() { _ = conn.SetDeadline(time.Time{}) }() // clear on exit so signOnce can manage its own deadline

if _, err := conn.Write([]byte{opGetPubkey}); err != nil {
return fmt.Errorf("write GetPubkey: %w", err)
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *EnclaveSigner) signOnce(conn net.Conn, data []byte) ([]byte, error) {
if err := conn.SetDeadline(time.Now().Add(requestTimeout)); err != nil {
return nil, err
}
defer conn.SetDeadline(time.Time{})
defer func() { _ = conn.SetDeadline(time.Time{}) }()

req := make([]byte, 1+hashLen)
req[0] = opSign
Expand Down
1 change: 0 additions & 1 deletion node/l1sequencer/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ func (s *LocalSigner) Sign(data []byte) ([]byte, error) {
func (s *LocalSigner) Address() common.Address {
return s.address
}

25 changes: 20 additions & 5 deletions node/l1sequencer/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type sequencerCursor struct {
// History is loaded from L1 at construction and refreshed every 5 minutes.
// All L1 reads use the finalized block tag to avoid ingesting reorged data.
type SequencerVerifier struct {
mu sync.Mutex
mu sync.Mutex
history []bindings.L1SequencerHistoryRecord
cursor sequencerCursor

Expand All @@ -46,19 +46,34 @@ type SequencerVerifier struct {
// NewSequencerVerifier creates a new SequencerVerifier, loads the full sequencer
// history from L1 (finalized), and starts a background refresh goroutine.
// Call Stop to terminate the background loop.
func NewSequencerVerifier(caller *bindings.L1SequencerCaller, logger tmlog.Logger) *SequencerVerifier {
//
// Startup is fail-fast: if the initial syncHistory() fails or returns empty
// history, the global upgrade.UpgradeBlockHeight is never set and stays at its
// -1 sentinel. Running with UpgradeBlockHeight=-1 is unsafe: IsUpgraded()
// returns false for every height, so the PBFT state machine can run past the
// true upgrade height, and on a block-synced fullnode restart the handshake's
// sequencer-mode replay exemption fails (ErrAppBlockHeightTooHigh) while
// reconstructLastCommit runs against a nil commit and panics. We therefore
// refuse to start rather than boot into that state; the operator's supervisor
// restarts us once L1 is reachable.
func NewSequencerVerifier(caller *bindings.L1SequencerCaller, logger tmlog.Logger) (*SequencerVerifier, error) {
ctx, cancel := context.WithCancel(context.Background())
v := &SequencerVerifier{
caller: caller,
logger: logger.With("module", "l1sequencer_verifier"),
cancel: cancel,
}
if err := v.syncHistory(); err != nil {
v.logger.Error("Failed to load sequencer history from L1", "err", err)
cancel()
return nil, fmt.Errorf("refusing to start with UpgradeBlockHeight=-1: initial sequencer history sync from L1 failed: %w", err)
}
if upgrade.UpgradeBlockHeight < 0 {
cancel()
return nil, fmt.Errorf("refusing to start with UpgradeBlockHeight=-1: L1 returned empty sequencer history; upgrade height unknown")
}
v.logCurrentState()
go v.refreshLoop(ctx)
return v
return v, nil
}

// logCurrentState prints a one-line snapshot of the loaded contract state at
Expand Down Expand Up @@ -127,7 +142,7 @@ func (c *SequencerVerifier) syncHistory() error {
return nil
}

// refreshLoop polls L1 until ctx is cancelled.
// refreshLoop polls L1 until ctx is canceled.
// Uses exponential backoff (10s -> 20s -> ... -> 5min) while history is empty,
// then switches to the normal 5-minute interval once loaded.
func (c *SequencerVerifier) refreshLoop(ctx context.Context) {
Expand Down
Loading