From e26af620db93f4faa2ac79c3bc5af93048eccc0b Mon Sep 17 00:00:00 2001 From: Zyra-V21 Date: Tue, 26 May 2026 07:50:31 +0200 Subject: [PATCH] fix: prevent panic when beacon committee data is missing (#271) A transient beacon-node/proxy failure (e.g. 'unexpected EOF') while fetching committees made NewEpochData swallow the error and build a state with empty committees. ElectraMetrics.GetAttestingIndices then dereferenced a nil committee, crashing the whole indexer with a SIGSEGV; the legacy Altair/Deneb path had the same latent hazard via an out-of-range slice access. Root cause fix: NewEpochData now retries the committees/proposer-duties fetch up to --max-request-retries and returns an error on failure, so the state download is retried instead of caching a state with empty committees. Defense in depth: GetAttestingIndices (Electra) and GetValidatorFromCommitteeIndex (Altair/Deneb) now return a clear error on a missing committee instead of panicking, and the Electra attestation path propagates it for a graceful shutdown rather than log.Fatalf. Adds unit tests covering the missing-committee guards. --- pkg/clientapi/duties.go | 105 +++++++++++++++++------ pkg/clientapi/state.go | 6 +- pkg/spec/metrics/committee_guard_test.go | 38 ++++++++ pkg/spec/metrics/standard.go | 10 ++- pkg/spec/metrics/state_electra.go | 38 +++++--- pkg/spec/metrics/utils.go | 6 ++ 6 files changed, 159 insertions(+), 44 deletions(-) create mode 100644 pkg/spec/metrics/committee_guard_test.go diff --git a/pkg/clientapi/duties.go b/pkg/clientapi/duties.go index a757d2a3..639e251f 100644 --- a/pkg/clientapi/duties.go +++ b/pkg/clientapi/duties.go @@ -1,59 +1,108 @@ package clientapi import ( + "errors" "fmt" + "time" "github.com/attestantio/go-eth2-client/api" + apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/migalabs/goteth/pkg/spec" + "github.com/migalabs/goteth/pkg/utils" ) -func (s *APIClient) NewEpochData(slot phase0.Slot) spec.EpochDuties { - - epochCommittees, err := s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{ - State: fmt.Sprintf("%d", slot), - }) +func (s *APIClient) NewEpochData(slot phase0.Slot) (spec.EpochDuties, error) { + // Beacon committees are required to resolve the attesting indices of every + // attestation. If the fetch fails we must surface the error so the caller can + // retry the whole state download instead of building a state with empty + // committees, which later nil-panics in (Electra)Metrics.GetAttestingIndices + // (see https://github.com/migalabs/goteth/issues/271). + epochCommittees, err := s.requestBeaconCommittees(slot) if err != nil { - log.Errorf("could not get beacon committees at slot %d: %s", slot, err) + return spec.EpochDuties{}, fmt.Errorf("could not get beacon committees at slot %d: %w", slot, err) } validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex) - if epochCommittees != nil { - for _, committee := range epochCommittees.Data { - for _, valID := range committee.Validators { - validatorsAttSlot[valID] = committee.Slot - - if val, ok := validatorsPerSlot[committee.Slot]; ok { - // the slot exists in the map - validatorsPerSlot[committee.Slot] = append(val, valID) - } else { - // the slot does not exist, create - validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID} - } + for _, committee := range epochCommittees { + for _, valID := range committee.Validators { + validatorsAttSlot[valID] = committee.Slot + + if val, ok := validatorsPerSlot[committee.Slot]; ok { + // the slot exists in the map + validatorsPerSlot[committee.Slot] = append(val, valID) + } else { + // the slot does not exist, create + validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID} } } } - proposerDuties, err := s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{ - Epoch: phase0.Epoch(slot / spec.SlotsPerEpoch), - }) + result := spec.EpochDuties{ + ValidatorAttSlot: validatorsAttSlot, + BeaconCommittees: epochCommittees, + } + // Proposer duties are not needed to resolve attesting indices, so a failure + // here is logged but not fatal (preserves the previous lenient behavior). + proposerDuties, err := s.requestProposerDuties(phase0.Epoch(slot / spec.SlotsPerEpoch)) if err != nil { log.Errorf("could not get proposer duties at slot %d: %s", slot, err) + } else { + result.ProposerDuties = proposerDuties } - result := spec.EpochDuties{ - ValidatorAttSlot: validatorsAttSlot, + return result, nil +} + +// requestBeaconCommittees fetches the beacon committees for the epoch containing +// slot, retrying transient failures (e.g. "unexpected EOF" from a flaky beacon +// node or proxy) up to maxRetries with incremental backoff. +func (s *APIClient) requestBeaconCommittees(slot phase0.Slot) ([]*apiv1.BeaconCommittee, error) { + err := errors.New("first attempt") + var resp *api.Response[[]*apiv1.BeaconCommittee] + + attempts := 0 + for err != nil && attempts < s.maxRetries { + resp, err = s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{ + State: fmt.Sprintf("%d", slot), + }) + if err != nil { + log.Warnf("retrying beacon committees request at slot %d (attempt %d): %s", slot, attempts+1, err) + time.Sleep(utils.RoutineFlushTimeout * time.Duration(attempts+1)) + } + attempts += 1 } - if proposerDuties != nil { - result.ProposerDuties = proposerDuties.Data + if err != nil { + return nil, err } - if epochCommittees != nil { - result.BeaconCommittees = epochCommittees.Data + + return resp.Data, nil +} + +// requestProposerDuties fetches the proposer duties for epoch, retrying transient +// failures up to maxRetries with incremental backoff. +func (s *APIClient) requestProposerDuties(epoch phase0.Epoch) ([]*apiv1.ProposerDuty, error) { + err := errors.New("first attempt") + var resp *api.Response[[]*apiv1.ProposerDuty] + + attempts := 0 + for err != nil && attempts < s.maxRetries { + resp, err = s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{ + Epoch: epoch, + }) + if err != nil { + log.Warnf("retrying proposer duties request at epoch %d (attempt %d): %s", epoch, attempts+1, err) + time.Sleep(utils.RoutineFlushTimeout * time.Duration(attempts+1)) + } + attempts += 1 + } + if err != nil { + return nil, err } - return result + return resp.Data, nil } diff --git a/pkg/clientapi/state.go b/pkg/clientapi/state.go index e2e51761..b706161b 100644 --- a/pkg/clientapi/state.go +++ b/pkg/clientapi/state.go @@ -61,7 +61,11 @@ func (s *APIClient) requestBeaconStateWithID(slot phase0.Slot, stateID string, k } log.Infof("state at slot %d downloaded in %f seconds (id=%s)", slot, time.Since(startTime).Seconds(), stateID) - resultState, err := local_spec.GetCustomState(*newState.Data, s.NewEpochData(slot)) + epochData, err := s.NewEpochData(slot) + if err != nil { + return nil, fmt.Errorf("unable to retrieve epoch duties for state at slot %d: %w", slot, err) + } + resultState, err := local_spec.GetCustomState(*newState.Data, epochData) if err != nil { return nil, fmt.Errorf("unable to open beacon state, closing requester routine. %s", err.Error()) } diff --git a/pkg/spec/metrics/committee_guard_test.go b/pkg/spec/metrics/committee_guard_test.go new file mode 100644 index 00000000..6ddad40e --- /dev/null +++ b/pkg/spec/metrics/committee_guard_test.go @@ -0,0 +1,38 @@ +package metrics + +import ( + "testing" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/migalabs/goteth/pkg/spec" +) + +// Regression guard for https://github.com/migalabs/goteth/issues/271: +// when beacon committee data is missing (e.g. it failed to download), resolving +// a validator from a committee index must return an error instead of panicking +// with a nil/out-of-range slice access. +func TestGetValidatorFromCommitteeIndexMissingCommittee(t *testing.T) { + base := StateMetricsBase{ + PrevState: &spec.AgnosticState{Epoch: 0}, + CurrentState: &spec.AgnosticState{Epoch: 1}, // empty EpochStructs => no committees + NextState: &spec.AgnosticState{Epoch: 2}, + } + m := AltairMetrics{} + m.baseMetrics = base + + // A slot inside CurrentState's epoch, whose committee data is absent. + slot := phase0.Slot(1*spec.SlotsPerEpoch + 1) + + if _, err := m.GetValidatorFromCommitteeIndex(slot, 0, 0); err == nil { + t.Fatal("expected an error when the beacon committee is missing, got nil (would have panicked before the fix)") + } +} + +// GetValList must return nil (not panic) when the committee does not exist, which +// is the safe primitive the guards above rely on. +func TestGetValListMissingCommitteeReturnsNil(t *testing.T) { + duties := spec.EpochDuties{} // no BeaconCommittees populated + if got := duties.GetValList(phase0.Slot(33), 0); got != nil { + t.Fatalf("expected nil validator list for a missing committee, got %v", got) + } +} diff --git a/pkg/spec/metrics/standard.go b/pkg/spec/metrics/standard.go index 1063c54d..943bbee6 100644 --- a/pkg/spec/metrics/standard.go +++ b/pkg/spec/metrics/standard.go @@ -91,10 +91,12 @@ func StateMetricsByForkVersion( case spec.DataVersionDeneb: return NewDenebMetrics(nextState, currentState, prevState), nil - case spec.DataVersionElectra: - return NewElectraMetrics(nextState, currentState, prevState), nil - case spec.DataVersionFulu: - return NewElectraMetrics(nextState, currentState, prevState), nil + case spec.DataVersionElectra, spec.DataVersionFulu: + electraMetrics, err := NewElectraMetrics(nextState, currentState, prevState) + if err != nil { + return nil, err + } + return electraMetrics, nil default: return nil, fmt.Errorf("could not figure out the State Metrics Fork Version: %s", currentState.Version) } diff --git a/pkg/spec/metrics/state_electra.go b/pkg/spec/metrics/state_electra.go index bc768205..e6df3ab6 100644 --- a/pkg/spec/metrics/state_electra.go +++ b/pkg/spec/metrics/state_electra.go @@ -2,6 +2,7 @@ package metrics import ( "bytes" + "fmt" "math" "slices" @@ -18,14 +19,16 @@ type ElectraMetrics struct { func NewElectraMetrics( nextState *spec.AgnosticState, currentState *spec.AgnosticState, - prevState *spec.AgnosticState) ElectraMetrics { + prevState *spec.AgnosticState) (ElectraMetrics, error) { electraObj := ElectraMetrics{} electraObj.InitBundle(nextState, currentState, prevState) - electraObj.PreProcessBundle() + if err := electraObj.PreProcessBundle(); err != nil { + return electraObj, err + } - return electraObj + return electraObj, nil } func (p *ElectraMetrics) InitBundle(nextState *spec.AgnosticState, @@ -42,10 +45,12 @@ func (p *ElectraMetrics) InitBundle(nextState *spec.AgnosticState, p.SyncCommitteeParticipation = make(map[phase0.ValidatorIndex]uint8) } -func (p *ElectraMetrics) PreProcessBundle() { +func (p *ElectraMetrics) PreProcessBundle() error { if !p.baseMetrics.CurrentState.EmptyStateRoot() { - p.ProcessAttestations() + if err := p.ProcessAttestations(); err != nil { + return err + } p.processPendingDeposits() // FIX: Clear state maps before processing (state objects are reused between iterations) p.baseMetrics.CurrentState.ConsolidatedAmounts = make(map[phase0.ValidatorIndex]phase0.Gwei) @@ -64,9 +69,12 @@ func (p *ElectraMetrics) PreProcessBundle() { p.processWithdrawalRequests() p.processDepositRequests() p.GetMaxFlagIndexDeltas() - p.ProcessInclusionDelays() + if err := p.ProcessInclusionDelays(); err != nil { + return err + } } } + return nil } // https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/beacon-chain.md#new-get_pending_balance_to_withdraw @@ -420,6 +428,12 @@ func (p ElectraMetrics) GetAttestingIndices(attestation electra.Attestation) ([] for _, committeeIndex := range committeeIndices { committee := stateAtSlot.EpochStructs.GetBeaconCommittee(attestation.Data.Slot, phase0.CommitteeIndex(committeeIndex)) + if committee == nil { + // Committee data is missing (e.g. it failed to download). Returning an + // error lets the caller stop gracefully instead of dereferencing nil + // (see https://github.com/migalabs/goteth/issues/271). + return nil, fmt.Errorf("missing beacon committee at slot %d committee %d", attestation.Data.Slot, committeeIndex) + } for i, attesterIndex := range committee.Validators { // Check if the corresponding aggregation bit is set if attestation.AggregationBits.BitAt(uint64(committeeOffset + i)) { @@ -432,10 +446,10 @@ func (p ElectraMetrics) GetAttestingIndices(attestation electra.Attestation) ([] } // https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/beacon-chain.md#modified-process_attestation -func (p ElectraMetrics) ProcessAttestations() { +func (p ElectraMetrics) ProcessAttestations() error { if p.baseMetrics.CurrentState.Blocks == nil { // only process attestations when CurrentState available - return + return nil } currentEpochParticipation := make([][]bool, len(p.baseMetrics.CurrentState.Validators)) @@ -464,7 +478,7 @@ func (p ElectraMetrics) ProcessAttestations() { participationFlags := p.getParticipationFlags(*attestation, *block) attestingIndices, err := p.GetAttestingIndices(*attestation) if err != nil { - log.Fatalf("error processing attestations at block %d: %s", block.Slot, err) + return fmt.Errorf("error processing attestations at block %d: %w", block.Slot, err) } for _, valIdx := range attestingIndices { block.VotesIncluded += 1 @@ -512,9 +526,10 @@ func (p ElectraMetrics) ProcessAttestations() { } } + return nil } -func (p *ElectraMetrics) ProcessInclusionDelays() { +func (p *ElectraMetrics) ProcessInclusionDelays() error { for _, block := range append(p.baseMetrics.PrevState.Blocks, p.baseMetrics.CurrentState.Blocks...) { // we assume the blocks are in order asc for _, attestation := range block.ElectraAttestations { @@ -528,7 +543,7 @@ func (p *ElectraMetrics) ProcessInclusionDelays() { attestingIndices, err := p.GetAttestingIndices(*attestation) if err != nil { - log.Fatalf("error processing attestations at block %d: %s", block.Slot, err) + return fmt.Errorf("error processing inclusion delays at block %d: %w", block.Slot, err) } for _, valIdx := range attestingIndices { @@ -544,6 +559,7 @@ func (p *ElectraMetrics) ProcessInclusionDelays() { p.baseMetrics.InclusionDelays[valIdx] = p.maxInclusionDelay(phase0.ValidatorIndex(valIdx)) + 1 } } + return nil } // Changed the attestation struct to electra. diff --git a/pkg/spec/metrics/utils.go b/pkg/spec/metrics/utils.go index b354cfe8..c7f9980c 100644 --- a/pkg/spec/metrics/utils.go +++ b/pkg/spec/metrics/utils.go @@ -37,6 +37,12 @@ func (p AltairMetrics) GetValidatorFromCommitteeIndex(slot phase0.Slot, committe return 0, fmt.Errorf("could not get validator from any epoch (slot %d, committee %d, index %d): %w", slot, committeeIndex, idx, err) } valList := state.EpochStructs.GetValList(slot, committeeIndex) + if idx < 0 || idx >= len(valList) { + // GetValList returns nil when the committee is missing (e.g. committee + // data failed to download). Guard against the nil/out-of-range slice + // instead of panicking (see https://github.com/migalabs/goteth/issues/271). + return 0, fmt.Errorf("validator at index %d not found in committee %d at slot %d (committee size %d)", idx, committeeIndex, slot, len(valList)) + } return valList[idx], nil }