Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 77 additions & 28 deletions pkg/clientapi/duties.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,108 @@
package clientapi

import (
"errors"
"fmt"
"time"

"github.com/attestantio/go-eth2-client/api"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/migalabs/goteth/pkg/spec"
"github.com/migalabs/goteth/pkg/utils"
)

func (s *APIClient) NewEpochData(slot phase0.Slot) spec.EpochDuties {

epochCommittees, err := s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{
State: fmt.Sprintf("%d", slot),
})
func (s *APIClient) NewEpochData(slot phase0.Slot) (spec.EpochDuties, error) {

// Beacon committees are required to resolve the attesting indices of every
// attestation. If the fetch fails we must surface the error so the caller can
// retry the whole state download instead of building a state with empty
// committees, which later nil-panics in (Electra)Metrics.GetAttestingIndices
// (see https://github.com/migalabs/goteth/issues/271).
epochCommittees, err := s.requestBeaconCommittees(slot)
if err != nil {
log.Errorf("could not get beacon committees at slot %d: %s", slot, err)
return spec.EpochDuties{}, fmt.Errorf("could not get beacon committees at slot %d: %w", slot, err)
}

validatorsAttSlot := make(map[phase0.ValidatorIndex]phase0.Slot) // each validator, when it had to attest
validatorsPerSlot := make(map[phase0.Slot][]phase0.ValidatorIndex)

if epochCommittees != nil {
for _, committee := range epochCommittees.Data {
for _, valID := range committee.Validators {
validatorsAttSlot[valID] = committee.Slot

if val, ok := validatorsPerSlot[committee.Slot]; ok {
// the slot exists in the map
validatorsPerSlot[committee.Slot] = append(val, valID)
} else {
// the slot does not exist, create
validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID}
}
for _, committee := range epochCommittees {
for _, valID := range committee.Validators {
validatorsAttSlot[valID] = committee.Slot

if val, ok := validatorsPerSlot[committee.Slot]; ok {
// the slot exists in the map
validatorsPerSlot[committee.Slot] = append(val, valID)
} else {
// the slot does not exist, create
validatorsPerSlot[committee.Slot] = []phase0.ValidatorIndex{valID}
}
}
}

proposerDuties, err := s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{
Epoch: phase0.Epoch(slot / spec.SlotsPerEpoch),
})
result := spec.EpochDuties{
ValidatorAttSlot: validatorsAttSlot,
BeaconCommittees: epochCommittees,
}

// Proposer duties are not needed to resolve attesting indices, so a failure
// here is logged but not fatal (preserves the previous lenient behavior).
proposerDuties, err := s.requestProposerDuties(phase0.Epoch(slot / spec.SlotsPerEpoch))
if err != nil {
log.Errorf("could not get proposer duties at slot %d: %s", slot, err)
} else {
result.ProposerDuties = proposerDuties
}

result := spec.EpochDuties{
ValidatorAttSlot: validatorsAttSlot,
return result, nil
}

// requestBeaconCommittees fetches the beacon committees for the epoch containing
// slot, retrying transient failures (e.g. "unexpected EOF" from a flaky beacon
// node or proxy) up to maxRetries with incremental backoff.
func (s *APIClient) requestBeaconCommittees(slot phase0.Slot) ([]*apiv1.BeaconCommittee, error) {
err := errors.New("first attempt")
var resp *api.Response[[]*apiv1.BeaconCommittee]

attempts := 0
for err != nil && attempts < s.maxRetries {
resp, err = s.Api.BeaconCommittees(s.ctx, &api.BeaconCommitteesOpts{
State: fmt.Sprintf("%d", slot),
})
if err != nil {
log.Warnf("retrying beacon committees request at slot %d (attempt %d): %s", slot, attempts+1, err)
time.Sleep(utils.RoutineFlushTimeout * time.Duration(attempts+1))
}
attempts += 1
}
if proposerDuties != nil {
result.ProposerDuties = proposerDuties.Data
if err != nil {
return nil, err
}
if epochCommittees != nil {
result.BeaconCommittees = epochCommittees.Data

return resp.Data, nil
}

// requestProposerDuties fetches the proposer duties for epoch, retrying transient
// failures up to maxRetries with incremental backoff.
func (s *APIClient) requestProposerDuties(epoch phase0.Epoch) ([]*apiv1.ProposerDuty, error) {
err := errors.New("first attempt")
var resp *api.Response[[]*apiv1.ProposerDuty]

attempts := 0
for err != nil && attempts < s.maxRetries {
resp, err = s.Api.ProposerDuties(s.ctx, &api.ProposerDutiesOpts{
Epoch: epoch,
})
if err != nil {
log.Warnf("retrying proposer duties request at epoch %d (attempt %d): %s", epoch, attempts+1, err)
time.Sleep(utils.RoutineFlushTimeout * time.Duration(attempts+1))
}
attempts += 1
}
if err != nil {
return nil, err
}

return result
return resp.Data, nil
}
6 changes: 5 additions & 1 deletion pkg/clientapi/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (s *APIClient) requestBeaconStateWithID(slot phase0.Slot, stateID string, k
}

log.Infof("state at slot %d downloaded in %f seconds (id=%s)", slot, time.Since(startTime).Seconds(), stateID)
resultState, err := local_spec.GetCustomState(*newState.Data, s.NewEpochData(slot))
epochData, err := s.NewEpochData(slot)
if err != nil {
return nil, fmt.Errorf("unable to retrieve epoch duties for state at slot %d: %w", slot, err)
}
resultState, err := local_spec.GetCustomState(*newState.Data, epochData)
if err != nil {
return nil, fmt.Errorf("unable to open beacon state, closing requester routine. %s", err.Error())
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/spec/metrics/committee_guard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package metrics

import (
"testing"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/migalabs/goteth/pkg/spec"
)

// Regression guard for https://github.com/migalabs/goteth/issues/271:
// when beacon committee data is missing (e.g. it failed to download), resolving
// a validator from a committee index must return an error instead of panicking
// with a nil/out-of-range slice access.
func TestGetValidatorFromCommitteeIndexMissingCommittee(t *testing.T) {
base := StateMetricsBase{
PrevState: &spec.AgnosticState{Epoch: 0},
CurrentState: &spec.AgnosticState{Epoch: 1}, // empty EpochStructs => no committees
NextState: &spec.AgnosticState{Epoch: 2},
}
m := AltairMetrics{}
m.baseMetrics = base

// A slot inside CurrentState's epoch, whose committee data is absent.
slot := phase0.Slot(1*spec.SlotsPerEpoch + 1)

if _, err := m.GetValidatorFromCommitteeIndex(slot, 0, 0); err == nil {
t.Fatal("expected an error when the beacon committee is missing, got nil (would have panicked before the fix)")
}
}

// GetValList must return nil (not panic) when the committee does not exist, which
// is the safe primitive the guards above rely on.
func TestGetValListMissingCommitteeReturnsNil(t *testing.T) {
duties := spec.EpochDuties{} // no BeaconCommittees populated
if got := duties.GetValList(phase0.Slot(33), 0); got != nil {
t.Fatalf("expected nil validator list for a missing committee, got %v", got)
}
}
10 changes: 6 additions & 4 deletions pkg/spec/metrics/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func StateMetricsByForkVersion(

case spec.DataVersionDeneb:
return NewDenebMetrics(nextState, currentState, prevState), nil
case spec.DataVersionElectra:
return NewElectraMetrics(nextState, currentState, prevState), nil
case spec.DataVersionFulu:
return NewElectraMetrics(nextState, currentState, prevState), nil
case spec.DataVersionElectra, spec.DataVersionFulu:
electraMetrics, err := NewElectraMetrics(nextState, currentState, prevState)
if err != nil {
return nil, err
}
return electraMetrics, nil
default:
return nil, fmt.Errorf("could not figure out the State Metrics Fork Version: %s", currentState.Version)
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/spec/metrics/state_electra.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"bytes"
"fmt"
"math"

"slices"
Expand All @@ -18,14 +19,16 @@ type ElectraMetrics struct {
func NewElectraMetrics(
nextState *spec.AgnosticState,
currentState *spec.AgnosticState,
prevState *spec.AgnosticState) ElectraMetrics {
prevState *spec.AgnosticState) (ElectraMetrics, error) {

electraObj := ElectraMetrics{}

electraObj.InitBundle(nextState, currentState, prevState)
electraObj.PreProcessBundle()
if err := electraObj.PreProcessBundle(); err != nil {
return electraObj, err
}

return electraObj
return electraObj, nil
}

func (p *ElectraMetrics) InitBundle(nextState *spec.AgnosticState,
Expand All @@ -42,10 +45,12 @@ func (p *ElectraMetrics) InitBundle(nextState *spec.AgnosticState,
p.SyncCommitteeParticipation = make(map[phase0.ValidatorIndex]uint8)
}

func (p *ElectraMetrics) PreProcessBundle() {
func (p *ElectraMetrics) PreProcessBundle() error {

if !p.baseMetrics.CurrentState.EmptyStateRoot() {
p.ProcessAttestations()
if err := p.ProcessAttestations(); err != nil {
return err
}
p.processPendingDeposits()
// FIX: Clear state maps before processing (state objects are reused between iterations)
p.baseMetrics.CurrentState.ConsolidatedAmounts = make(map[phase0.ValidatorIndex]phase0.Gwei)
Expand All @@ -64,9 +69,12 @@ func (p *ElectraMetrics) PreProcessBundle() {
p.processWithdrawalRequests()
p.processDepositRequests()
p.GetMaxFlagIndexDeltas()
p.ProcessInclusionDelays()
if err := p.ProcessInclusionDelays(); err != nil {
return err
}
}
}
return nil
}

// https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/beacon-chain.md#new-get_pending_balance_to_withdraw
Expand Down Expand Up @@ -420,6 +428,12 @@ func (p ElectraMetrics) GetAttestingIndices(attestation electra.Attestation) ([]

for _, committeeIndex := range committeeIndices {
committee := stateAtSlot.EpochStructs.GetBeaconCommittee(attestation.Data.Slot, phase0.CommitteeIndex(committeeIndex))
if committee == nil {
// Committee data is missing (e.g. it failed to download). Returning an
// error lets the caller stop gracefully instead of dereferencing nil
// (see https://github.com/migalabs/goteth/issues/271).
return nil, fmt.Errorf("missing beacon committee at slot %d committee %d", attestation.Data.Slot, committeeIndex)
}
for i, attesterIndex := range committee.Validators {
// Check if the corresponding aggregation bit is set
if attestation.AggregationBits.BitAt(uint64(committeeOffset + i)) {
Expand All @@ -432,10 +446,10 @@ func (p ElectraMetrics) GetAttestingIndices(attestation electra.Attestation) ([]
}

// https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/beacon-chain.md#modified-process_attestation
func (p ElectraMetrics) ProcessAttestations() {
func (p ElectraMetrics) ProcessAttestations() error {

if p.baseMetrics.CurrentState.Blocks == nil { // only process attestations when CurrentState available
return
return nil
}

currentEpochParticipation := make([][]bool, len(p.baseMetrics.CurrentState.Validators))
Expand Down Expand Up @@ -464,7 +478,7 @@ func (p ElectraMetrics) ProcessAttestations() {
participationFlags := p.getParticipationFlags(*attestation, *block)
attestingIndices, err := p.GetAttestingIndices(*attestation)
if err != nil {
log.Fatalf("error processing attestations at block %d: %s", block.Slot, err)
return fmt.Errorf("error processing attestations at block %d: %w", block.Slot, err)
}
for _, valIdx := range attestingIndices {
block.VotesIncluded += 1
Expand Down Expand Up @@ -512,9 +526,10 @@ func (p ElectraMetrics) ProcessAttestations() {
}

}
return nil
}

func (p *ElectraMetrics) ProcessInclusionDelays() {
func (p *ElectraMetrics) ProcessInclusionDelays() error {
for _, block := range append(p.baseMetrics.PrevState.Blocks, p.baseMetrics.CurrentState.Blocks...) {
// we assume the blocks are in order asc
for _, attestation := range block.ElectraAttestations {
Expand All @@ -528,7 +543,7 @@ func (p *ElectraMetrics) ProcessInclusionDelays() {

attestingIndices, err := p.GetAttestingIndices(*attestation)
if err != nil {
log.Fatalf("error processing attestations at block %d: %s", block.Slot, err)
return fmt.Errorf("error processing inclusion delays at block %d: %w", block.Slot, err)
}
for _, valIdx := range attestingIndices {

Expand All @@ -544,6 +559,7 @@ func (p *ElectraMetrics) ProcessInclusionDelays() {
p.baseMetrics.InclusionDelays[valIdx] = p.maxInclusionDelay(phase0.ValidatorIndex(valIdx)) + 1
}
}
return nil
}

// Changed the attestation struct to electra.
Expand Down
6 changes: 6 additions & 0 deletions pkg/spec/metrics/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func (p AltairMetrics) GetValidatorFromCommitteeIndex(slot phase0.Slot, committe
return 0, fmt.Errorf("could not get validator from any epoch (slot %d, committee %d, index %d): %w", slot, committeeIndex, idx, err)
}
valList := state.EpochStructs.GetValList(slot, committeeIndex)
if idx < 0 || idx >= len(valList) {
// GetValList returns nil when the committee is missing (e.g. committee
// data failed to download). Guard against the nil/out-of-range slice
// instead of panicking (see https://github.com/migalabs/goteth/issues/271).
return 0, fmt.Errorf("validator at index %d not found in committee %d at slot %d (committee size %d)", idx, committeeIndex, slot, len(valList))
}
return valList[idx], nil
}

Expand Down
Loading