Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions service/dealpusher/pdp_onchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (

// defaultGasLimit is a fixed gas limit for FEVM transactions.
// FEVM gas estimation (NoSend=true) is unreliable, so we use a fixed value.
// EVM traces show ~200K gas but FEVM execution needs ~17M due to actor overhead.
const defaultGasLimit = 30_000_000
// EVM traces show ~200K gas but FEVM execution needs much more due to actor
// overhead. PDPVerifier.createDataSet observed at ~79M used on calibnet, so
// we leave a generous margin. Excess gas is refunded by the EVM, not
// charged at limit, so over-allocating is cheap.
const defaultGasLimit = 200_000_000

type confirmationClient interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
Expand Down
47 changes: 36 additions & 11 deletions service/dealpusher/pdp_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,57 @@ func (d *DealPusher) validatePDPPreparationPieceSizes(ctx context.Context, sched
return errors.Wrap(err, "failed to validate preparation piece sizes for PDP")
}

// resolveProviderEVMAddress looks up the provider's Actor record and derives
// the EVM address from its delegated (f410) filecoin address.
// resolveProviderEVMAddress derives the SP's EVM address from its f410
// delegated filecoin address. It first checks the local actors table
// (populated for wallets we own); on miss it queries the chain via
// StateLookupRobustAddress and caches the result so subsequent runs hit
// the fast path.
func (d *DealPusher) resolveProviderEVMAddress(ctx context.Context, provider string) (common.Address, error) {
db := d.dbNoContext.WithContext(ctx)

var actor model.Actor
err := db.Where("address = ? OR id = ?", provider, provider).First(&actor).Error
if err == nil {
return delegatedToEVM(actor.Address)
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return common.Address{}, errors.Wrapf(err, "failed to query actor for provider %s", provider)
}

// cache miss -- resolve robust (f410) address on-chain
var robustAddr string
if err := d.lotusClient.CallFor(ctx, &robustAddr, "Filecoin.StateLookupRobustAddress", provider, nil); err != nil {
return common.Address{}, errors.Wrapf(err, "failed to resolve robust address for provider %s on-chain", provider)
}

evm, err := delegatedToEVM(robustAddr)
if err != nil {
return common.Address{}, errors.Wrapf(err, "failed to resolve actor for provider %s", provider)
return common.Address{}, err
}

addr, err := address.NewFromString(actor.Address)
// best-effort cache; another worker may have raced us, so a duplicate-key
// failure is fine to swallow
if cerr := db.Create(&model.Actor{ID: provider, Address: robustAddr}).Error; cerr != nil {
Logger.Debugw("provider actor cache write skipped", "provider", provider, "address", robustAddr, "err", cerr)
}
return evm, nil
}

// delegatedToEVM extracts the 20-byte EVM address from an f410 delegated
// filecoin address string.
func delegatedToEVM(addrStr string) (common.Address, error) {
addr, err := address.NewFromString(addrStr)
if err != nil {
return common.Address{}, errors.Wrapf(err, "failed to parse actor address %s", actor.Address)
return common.Address{}, errors.Wrapf(err, "failed to parse actor address %s", addrStr)
}
if addr.Protocol() != address.Delegated {
return common.Address{}, fmt.Errorf("provider actor address %s is not a delegated (f410) address", actor.Address)
return common.Address{}, fmt.Errorf("actor address %s is not a delegated (f410) address", addrStr)
}

payload := addr.Payload()
// delegated address payload: first varint byte(s) for namespace, then 20 bytes for EVM address
// for f410 (namespace 10), payload[0] is the namespace varint, rest is the subaddress
// delegated address payload: namespace varint (1 byte for namespace 10) + 20 bytes EVM
if len(payload) < 21 {
return common.Address{}, fmt.Errorf("provider delegated address payload too short: %d bytes", len(payload))
return common.Address{}, fmt.Errorf("delegated address payload too short: %d bytes", len(payload))
}
// skip namespace varint (1 byte for namespace 10)
return common.BytesToAddress(payload[1:21]), nil
}

Expand Down
99 changes: 99 additions & 0 deletions service/dealpusher/pdp_schedule_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,110 @@
package dealpusher

import (
"context"
"errors"
"testing"

"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/util/testutil"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ybbus/jsonrpc/v3"
"gorm.io/gorm"
)

// rpcClientMock satisfies jsonrpc.RPCClient for the methods we exercise.
type rpcClientMock struct {
mock.Mock
}

func (m *rpcClientMock) Call(ctx context.Context, method string, params ...any) (*jsonrpc.RPCResponse, error) {
panic("not implemented")
}
func (m *rpcClientMock) CallRaw(ctx context.Context, request *jsonrpc.RPCRequest) (*jsonrpc.RPCResponse, error) {
panic("not implemented")
}
func (m *rpcClientMock) CallBatch(ctx context.Context, requests jsonrpc.RPCRequests) (jsonrpc.RPCResponses, error) {
panic("not implemented")
}
func (m *rpcClientMock) CallBatchRaw(ctx context.Context, requests jsonrpc.RPCRequests) (jsonrpc.RPCResponses, error) {
panic("not implemented")
}
func (m *rpcClientMock) CallFor(ctx context.Context, out any, method string, params ...any) error {
return m.Called(ctx, out, method, params).Error(0)
}

// f410 form of 0xE3e842B9D89ed2Ee3976b9b8916827302618c29e on testnet.
const (
testProviderID = "t0186503"
testProviderF410 = "t410f4puefooyt3jo4olwxg4jc2bhgatbrqu6hqc73uy"
testProviderEVM = "0xE3e842B9D89ed2Ee3976b9b8916827302618c29e"
)

func TestResolveProviderEVMAddress_CacheHit(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
require.NoError(t, db.Create(&model.Actor{ID: testProviderID, Address: testProviderF410}).Error)

d := &DealPusher{dbNoContext: db}
evm, err := d.resolveProviderEVMAddress(ctx, testProviderID)
require.NoError(t, err)
require.Equal(t, common.HexToAddress(testProviderEVM), evm)
})
}

func TestResolveProviderEVMAddress_OnchainFallback(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
rpc := new(rpcClientMock)
rpc.On("CallFor", mock.Anything, mock.Anything, "Filecoin.StateLookupRobustAddress", mock.Anything).
Run(func(args mock.Arguments) {
out := args.Get(1).(*string)
*out = testProviderF410
}).
Return(nil)

d := &DealPusher{dbNoContext: db, lotusClient: rpc}
evm, err := d.resolveProviderEVMAddress(ctx, testProviderID)
require.NoError(t, err)
require.Equal(t, common.HexToAddress(testProviderEVM), evm)

// caches the lookup for subsequent calls
var cached model.Actor
require.NoError(t, db.Where("id = ?", testProviderID).First(&cached).Error)
require.Equal(t, testProviderF410, cached.Address)
})
}

func TestResolveProviderEVMAddress_OnchainFailure(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
rpc := new(rpcClientMock)
rpc.On("CallFor", mock.Anything, mock.Anything, "Filecoin.StateLookupRobustAddress", mock.Anything).
Return(errors.New("rpc unavailable"))

d := &DealPusher{dbNoContext: db, lotusClient: rpc}
_, err := d.resolveProviderEVMAddress(ctx, testProviderID)
require.Error(t, err)
require.ErrorContains(t, err, "failed to resolve robust address")
})
}

func TestResolveProviderEVMAddress_NonDelegatedFromChain(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
rpc := new(rpcClientMock)
rpc.On("CallFor", mock.Anything, mock.Anything, "Filecoin.StateLookupRobustAddress", mock.Anything).
Run(func(args mock.Arguments) {
out := args.Get(1).(*string)
*out = testutil.TestWalletAddr
}).
Return(nil)

d := &DealPusher{dbNoContext: db, lotusClient: rpc}
_, err := d.resolveProviderEVMAddress(ctx, testProviderID)
require.Error(t, err)
require.ErrorContains(t, err, "not a delegated")
})
}

func TestValidatePDPProofSetPieceSize(t *testing.T) {
t.Parallel()

Expand Down
Loading