Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 39 additions & 49 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/bigint"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
"github.com/ethersphere/bee/v2/pkg/pricing"
Expand Down Expand Up @@ -352,7 +351,7 @@ func (c *creditAction) Apply() error {

loggerV2.Debug("credit action apply", "crediting_peer_address", c.peer, "price", c.price, "new_balance", nextBalance)

err = c.accounting.store.Put(peerBalanceKey(c.peer), &bigint.BigInt{Int: nextBalance})
err = c.accounting.store.Put(peerBalanceKey(c.peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
Expand Down Expand Up @@ -405,7 +404,7 @@ func (c *creditAction) Apply() error {
loggerV2.Debug("credit action apply; decreasing originated balance", "crediting_peer_address", c.peer, "current_balance", nextOriginBalance)
}

err = c.accounting.store.Put(originatedBalanceKey(c.peer), &bigint.BigInt{Int: nextOriginBalance})
err = c.accounting.store.Put(originatedBalanceKey(c.peer), nextOriginBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
Expand Down Expand Up @@ -518,48 +517,45 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {

// Balance returns the current balance for the given peer.
func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) {
var w bigint.BigInt
err = a.store.Get(peerBalanceKey(peer), &w)
err = a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return big.NewInt(0), ErrPeerNoBalance
}
return nil, err
}

return w.Int, nil
return balance, nil
}

// OriginatedBalance returns the current balance for the given peer.
func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) {
var w bigint.BigInt
err = a.store.Get(originatedBalanceKey(peer), &w)
err = a.store.Get(originatedBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return big.NewInt(0), ErrPeerNoBalance
}
return nil, err
}

return w.Int, nil
return balance, nil
}

// SurplusBalance returns the current balance for the given peer.
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) {
var w bigint.BigInt
err = a.store.Get(peerSurplusBalanceKey(peer), &w)
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return big.NewInt(0), nil
}
return nil, err
}

if w.Cmp(big.NewInt(0)) < 0 {
if balance.Cmp(big.NewInt(0)) < 0 {
return nil, ErrInvalidValue
}

return w.Int, nil
return balance, nil
}

// CompensatedBalance returns balance decreased by surplus balance
Expand Down Expand Up @@ -680,13 +676,13 @@ func (a *Accounting) Balances() (map[string]*big.Int, error) {
}

if _, ok := s[addr.String()]; !ok {
var w bigint.BigInt
err = a.store.Get(peerBalanceKey(addr), &w)
var storevalue *big.Int
err = a.store.Get(peerBalanceKey(addr), &storevalue)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %w", addr.String(), err)
}

s[addr.String()] = w.Int
s[addr.String()] = storevalue
}

return false, nil
Expand Down Expand Up @@ -861,15 +857,14 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()

var w bigint.BigInt
balance := new(big.Int)
zero := big.NewInt(0)

err := a.store.Get(peerBalanceKey(peer), &w)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance := w.Int
if balance == nil {
err := a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance = big.NewInt(0)
}

Expand All @@ -886,15 +881,14 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {
accountingPeer := a.getAccountingPeer(peer)

var wl bigint.BigInt
balance := new(big.Int)
zero := big.NewInt(0)

err := a.store.Get(peerBalanceKey(peer), &wl)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance := wl.Int
if balance == nil {
err := a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance = big.NewInt(0)
}

Expand All @@ -915,20 +909,16 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {
// shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance
// this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve
func (a *Accounting) shadowBalance(peer swarm.Address, accountingPeer *accountingPeer) (shadowBalance *big.Int, err error) {
var ws bigint.BigInt
balance := new(big.Int)
zero := big.NewInt(0)

err = a.store.Get(peerBalanceKey(peer), &ws)
err = a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return zero, nil
}
return nil, err
}
balance := ws.Int
if balance == nil {
balance = zero
}

if balance.Cmp(zero) >= 0 {
return zero, nil
Expand Down Expand Up @@ -986,7 +976,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece

loggerV2.Debug("registering payment sent", "peer_address", peer, "amount", amount, "new_balance", nextBalance)

err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
a.logger.Error(err, "notify payment sent; failed to persist balance")
return
Expand Down Expand Up @@ -1042,7 +1032,7 @@ func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int)

loggerV2.Debug("surplus crediting peer", "peer_address", peer, "amount", amount, "new_balance", increasedSurplus)

err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: increasedSurplus})
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
Expand All @@ -1063,7 +1053,7 @@ func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int)

loggerV2.Debug("crediting peer", "peer_address", peer, "amount", amount, "new_balance", nextBalance)

err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
Expand All @@ -1082,7 +1072,7 @@ func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int)

loggerV2.Debug("surplus crediting peer due to refreshment", "peer_address", peer, "amount", surplusGrowth, "new_balance", increasedSurplus)

err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: increasedSurplus})
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
Expand Down Expand Up @@ -1164,7 +1154,7 @@ func (a *Accounting) NotifyRefreshmentSent(peer swarm.Address, attemptedAmount,

newBalance := new(big.Int).Add(currentBalance, amount)

err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: newBalance})
err = a.store.Put(peerBalanceKey(peer), newBalance)
if err != nil {
a.logger.Error(err, "notifyrefreshmentsent failed to persist balance")
return
Expand Down Expand Up @@ -1204,7 +1194,7 @@ func (a *Accounting) NotifyRefreshmentReceived(peer swarm.Address, amount *big.I

// We allow a refreshment to potentially put us into debt as it was previously negotiated and be limited to the peer's outstanding debt plus shadow reserve
loggerV2.Debug("crediting peer", "peer_address", peer, "amount", amount, "new_balance", nextBalance)
err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
Expand Down Expand Up @@ -1267,7 +1257,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric
if newSurplusBalance.Cmp(big.NewInt(0)) >= 0 {
loggerV2.Debug("surplus debiting peer", "peer_address", peer, "price", price, "new_balance", newSurplusBalance)

err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: newSurplusBalance})
err = a.store.Put(peerSurplusBalanceKey(peer), newSurplusBalance)
if err != nil {
return nil, fmt.Errorf("failed to persist surplus balance: %w", err)
}
Expand All @@ -1288,7 +1278,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric
// let's store 0 as surplus balance
loggerV2.Debug("surplus debiting peer", "peer_address", peer, "amount", debitIncrease, "new_balance", 0)

err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: big.NewInt(0)})
err = a.store.Put(peerSurplusBalanceKey(peer), big.NewInt(0))
if err != nil {
return nil, fmt.Errorf("failed to persist surplus balance: %w", err)
}
Expand All @@ -1306,7 +1296,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric

loggerV2.Debug("debiting peer", "peer_address", peer, "price", price, "new_balance", nextBalance)

err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return nil, fmt.Errorf("failed to persist balance: %w", err)
}
Expand Down Expand Up @@ -1442,12 +1432,12 @@ func (a *Accounting) Connect(peer swarm.Address, fullNode bool) {
accountingPeer.thresholdGrowAt.Set(thresholdGrowStep)
accountingPeer.disconnectLimit.Set(disconnectLimit)

err := a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: zero})
err := a.store.Put(peerBalanceKey(peer), zero)
if err != nil {
a.logger.Error(err, "failed to persist balance")
}

err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: zero})
err = a.store.Put(peerSurplusBalanceKey(peer), zero)
if err != nil {
a.logger.Error(err, "failed to persist surplus balance")
}
Expand All @@ -1472,7 +1462,7 @@ func (a *Accounting) decreaseOriginatedBalanceTo(peer swarm.Address, limit *big.

// If originated balance is more into the negative domain, set it to limit
if originatedBalance.Cmp(toSet) < 0 {
err = a.store.Put(originatedBalanceKey(peer), &bigint.BigInt{Int: toSet})
err = a.store.Put(originatedBalanceKey(peer), toSet)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
Expand All @@ -1494,7 +1484,7 @@ func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big
// Move originated balance into the positive domain by amount
newOriginatedBalance := new(big.Int).Add(originatedBalance, amount)

err = a.store.Put(originatedBalanceKey(peer), &bigint.BigInt{Int: newOriginatedBalance})
err = a.store.Put(originatedBalanceKey(peer), newOriginatedBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
Expand Down
18 changes: 0 additions & 18 deletions pkg/bigint/bigint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,3 @@ func (i *BigInt) UnmarshalJSON(b []byte) error {
func Wrap(i *big.Int) *BigInt {
return &BigInt{Int: i}
}

// MarshalBinary implements encoding.BinaryMarshaler using Gob encoding.
// Panics if the underlying *big.Int is nil, as this indicates a programmer error.
func (i *BigInt) MarshalBinary() ([]byte, error) {
if i.Int == nil {
panic("bigint: MarshalBinary called on nil Int")
}
return i.GobEncode()
}

// UnmarshalBinary implements encoding.BinaryUnmarshaler using Gob decoding.
func (i *BigInt) UnmarshalBinary(data []byte) error {
if len(data) == 0 {
return fmt.Errorf("bigint: UnmarshalBinary called with empty data")
}
i.Int = new(big.Int)
return i.GobDecode(data)
}
88 changes: 0 additions & 88 deletions pkg/bigint/bigint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,94 +14,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/bigint"
)

func TestBinaryMarshalingRoundTrip(t *testing.T) {
t.Parallel()

tests := []struct {
name string
val *big.Int
}{
{"positive", big.NewInt(123456789)},
{"negative", big.NewInt(-987654321)},
{"zero", big.NewInt(0)},
{"large", new(big.Int).Mul(big.NewInt(math.MaxInt64), big.NewInt(math.MaxInt64))},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

original := bigint.Wrap(tc.val)
data, err := original.MarshalBinary()
if err != nil {
t.Fatalf("MarshalBinary: %v", err)
}

var got bigint.BigInt
if err := got.UnmarshalBinary(data); err != nil {
t.Fatalf("UnmarshalBinary: %v", err)
}

if got.Cmp(tc.val) != 0 {
t.Fatalf("got %v, want %v", got.Int, tc.val)
}
})
}
}

// TestBinaryMarshalingGobCompatibility verifies that MarshalBinary produces
// byte-identical output to big.Int.GobEncode, confirming that nodes upgrading
// from the old code (which stored raw *big.Int via GobEncode) will write
// identical bytes after migration.
func TestBinaryMarshalingGobCompatibility(t *testing.T) {
t.Parallel()

val := big.NewInt(555000)
gobData, err := val.GobEncode()
if err != nil {
t.Fatal(err)
}

newData, err := bigint.Wrap(val).MarshalBinary()
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(gobData, newData) {
t.Fatalf("MarshalBinary output differs from GobEncode: got %v, want %v", newData, gobData)
}

var got bigint.BigInt
if err := got.UnmarshalBinary(gobData); err != nil {
t.Fatalf("UnmarshalBinary of gob data: %v", err)
}
if got.Cmp(val) != 0 {
t.Fatalf("got %v, want %v", got.Int, val)
}
}

func TestMarshalBinaryNilPanics(t *testing.T) {
t.Parallel()

defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic on MarshalBinary with nil Int, got none")
}
}()

var w bigint.BigInt // Int is nil
_, _ = w.MarshalBinary()
}

func TestUnmarshalBinaryEmptyErrors(t *testing.T) {
t.Parallel()

var w bigint.BigInt
if err := w.UnmarshalBinary([]byte{}); err == nil {
t.Fatal("expected error on UnmarshalBinary with empty data, got nil")
}
}

func TestMarshaling(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading