diff --git a/nitronode/event_handlers/service.go b/nitronode/event_handlers/service.go index e246c1ce7..522f10405 100644 --- a/nitronode/event_handlers/service.go +++ b/nitronode/event_handlers/service.go @@ -57,7 +57,11 @@ func (s *EventHandlerService) HandleNodeBalanceUpdated(ctx context.Context, tx c func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCreatedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating status: + // the lock guards against a concurrent submit_state flipping channel status (e.g. Void→Open + // receiver issuance) between the read and our write. See HandleHomeChannelCheckpointed and + // HandleHomeChannelClosed for the same pattern. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -70,18 +74,6 @@ func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx c return nil } - // Acquire the user's balance-row lock before mutating channel status or - // backfilling the off-chain head. Receiver/release issuance paths lock - // the same row up front and then re-check Status via CheckActiveChannel; - // without this lock an in-flight RPC can read Status=Void, decide to store - // an unsigned receiver row, and commit before we flip to Open — leaving - // the latest head unsigned on a technically opened channel. See - // HandleHomeChannelCheckpointed and HandleHomeChannelClosed for the same - // pattern. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - if channel.Status >= core.ChannelStatusOpen { logger.Warn("ignoring replayed HomeChannelCreated event on already-initialized channel", "channelId", chanID, "currentStatus", channel.Status, "currentStateVersion", channel.StateVersion, "eventStateVersion", event.StateVersion) @@ -137,7 +129,17 @@ func (s *EventHandlerService) HandleHomeChannelMigrated(ctx context.Context, tx func (s *EventHandlerService) HandleHomeChannelCheckpointed(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCheckpointedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status or backfilling the off-chain head. Reading the channel before the + // lock would race: a concurrent submit_state can co-sign a Finalize and flip the + // channel Open→Closing between the read and the lock, and the non-challenged path + // below persists the channel snapshot verbatim — a pre-lock Open snapshot would + // silently reopen the finalized channel. Receiver issuance paths lock the same row + // and then re-check Status; without this lock an RPC can read Status=Challenged, + // decide to store an unsigned receiver row, but commit after we flip to Open and + // backfill the prior head — leaving the latest head unsigned on an Open channel. See + // HandleHomeChannelClosed for the same pattern. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -150,16 +152,6 @@ func (s *EventHandlerService) HandleHomeChannelCheckpointed(ctx context.Context, return nil } - // Acquire the user's balance-row lock before mutating channel status or - // backfilling the off-chain head. Receiver issuance paths lock the same row - // and then re-check Status; without this lock an RPC can read Status=Challenged, - // decide to store an unsigned receiver row, but commit after we flip to Open - // and backfill the prior head — leaving the latest head unsigned on an Open - // channel. See HandleHomeChannelClosed for the same pattern. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - channel.StateVersion = event.StateVersion wasChallenged := channel.Status == core.ChannelStatusChallenged @@ -269,7 +261,15 @@ func (s *EventHandlerService) backfillOffChainHeadNodeSig(ctx context.Context, t func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelChallengedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status. Receiver issuance paths (issueTransferReceiverState / + // issueReleaseReceiverState) lock the same row up front and then re-check Status via + // CheckActiveChannel; without this lock an in-flight RPC can read Status=Open, + // node-sign a receiver state, and commit after we flip to Challenged — leaving a + // node-signed higher-version receiver state on a disputed channel. The lock+read is a + // single store call so the channel snapshot is consistent with the lock. See + // HandleHomeChannelClosed for the same pattern. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -282,17 +282,6 @@ func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, t return nil } - // Acquire the user's balance-row lock before mutating channel status. Receiver - // issuance paths (issueTransferReceiverState / issueReleaseReceiverState) lock - // the same row up front and then re-check Status via CheckActiveChannel; without - // this lock an in-flight RPC can read Status=Open, node-sign a receiver state, - // and commit after we flip to Challenged — leaving a node-signed higher-version - // receiver state on a disputed channel. See HandleHomeChannelClosed for the same - // pattern. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - if event.StateVersion < channel.StateVersion { // Per protocol the challenged version cannot be lower than the last known on-chain version. // Treat as an anomaly (replay, indexer mis-order, contract bug): warn and skip persistence. @@ -360,7 +349,14 @@ func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, t func (s *EventHandlerService) HandleHomeChannelClosed(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelClosedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status or summing receiver credits. issueTransferReceiverState / + // issueReleaseReceiverState lock the same row up front, so this serializes the close + // against any in-flight RPC receiver-issuance for the same user: either the RPC commits + // its unsigned row before we sum (it lands in the rescue), or it blocks until we set the + // channel to Closed and then sees that via its own re-check. The lock+read is a single + // store call so the channel snapshot is consistent with the lock. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -373,16 +369,6 @@ func (s *EventHandlerService) HandleHomeChannelClosed(ctx context.Context, tx co return nil } - // Acquire the user's balance-row lock before mutating channel status or summing - // receiver credits. issueTransferReceiverState / issueReleaseReceiverState lock - // the same row up front, so this serializes the close against any in-flight RPC - // receiver-issuance for the same user: either the RPC commits its unsigned row - // before we sum (it lands in the rescue), or it blocks until we set the channel - // to Closed and then sees that via its own re-check. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - wasChallenged := channel.Status == core.ChannelStatusChallenged channel.StateVersion = event.StateVersion diff --git a/nitronode/event_handlers/service_test.go b/nitronode/event_handlers/service_test.go index 98730f1f0..210c1595b 100644 --- a/nitronode/event_handlers/service_test.go +++ b/nitronode/event_handlers/service_test.go @@ -43,8 +43,7 @@ func TestHandleHomeChannelCreated_Success(t *testing.T) { } // Mock expectations - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusOpen && @@ -101,9 +100,8 @@ func TestHandleHomeChannelCreated_IgnoresReplayOnInitializedChannel(t *testing.T StateVersion: 1, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - // LockUserState is called before the replay guard, so it fires even on replays. - mockStore.On("LockUserState", channel.UserWallet, channel.Asset).Return(decimal.Zero, nil) + // LockUserStateForHomeChannel is called before the replay guard, so it fires even on replays. + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) err := service.HandleHomeChannelCreated(ctx, mockStore, event) @@ -117,7 +115,7 @@ func TestHandleHomeChannelCreated_IgnoresReplayOnInitializedChannel(t *testing.T } // TestHandleHomeChannelCreated_AcquiresUserLockBeforeMutation pins the race fix: -// the handler must call LockUserState(userWallet, asset) before UpdateChannel so an +// the handler must call LockUserStateForHomeChannel before UpdateChannel so an // in-flight receiver-issuance RPC cannot read Status=Void, store an unsigned receiver // row, and commit before we flip to Open. See HandleHomeChannelCheckpointed and // HandleHomeChannelClosed for the same pattern. @@ -146,12 +144,11 @@ func TestHandleHomeChannelCreated_AcquiresUserLockBeforeMutation(t *testing.T) { } var locked bool - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset). + mockStore.On("LockUserStateForHomeChannel", channelID). Run(func(mock.Arguments) { locked = true }). - Return(decimal.Zero, nil) + Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(core.Channel) bool { - require.True(t, locked, "LockUserState must be called before UpdateChannel") + require.True(t, locked, "LockUserStateForHomeChannel must be called before UpdateChannel") return true })).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) @@ -226,8 +223,7 @@ func TestHandleHomeChannelCreated_BackfillsUnsignedReceiverStateOnOpen(t *testin StateVersion: createVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusOpen && @@ -315,8 +311,7 @@ func TestHandleHomeChannelCreated_HeadAlreadySigned_NoBackfill(t *testing.T) { StateVersion: createVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, createVersion, "", "").Return(nil) @@ -358,8 +353,7 @@ func TestHandleHomeChannelCheckpointed_Success(t *testing.T) { } // Mock expectations - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusOpen && @@ -381,6 +375,55 @@ func TestHandleHomeChannelCheckpointed_Success(t *testing.T) { mockStore.AssertExpectations(t) } +// TestHandleHomeChannelCheckpointed_DoesNotReopenFinalizedChannel pins the race fix: a +// concurrent submit_state can co-sign a Finalize and flip the channel Open→Closing in the +// window between reading the channel and acquiring the lock. Because the handler now reads the +// channel UNDER the lock (LockUserStateForHomeChannel), it observes Closing and the +// non-challenged path leaves the status untouched — it must NOT persist Open and silently +// reopen the finalized channel. Returning the post-lock Closing snapshot here simulates the +// concurrent finalize having committed first. +func TestHandleHomeChannelCheckpointed_DoesNotReopenFinalizedChannel(t *testing.T) { + mockStore := new(MockStore) + ctx := log.SetContextLogger(context.Background(), log.NewNoopLogger()) + + service, _ := newTestEventHandlerService(t) + + channelID := "0xHomeChannel123" + userWallet := "0x1234567890123456789012345678901234567890" + + // Post-lock snapshot: a concurrent submit_state Finalize already flipped Open→Closing. + channel := &core.Channel{ + ChannelID: channelID, + UserWallet: userWallet, + Asset: "usdc", + Type: core.ChannelTypeHome, + Status: core.ChannelStatusClosing, + StateVersion: 3, + } + + event := &core.HomeChannelCheckpointedEvent{ + ChannelID: channelID, + StateVersion: 5, + } + + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) + // Status must be preserved as Closing — never reopened to Open. + mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { + return ch.ChannelID == channelID && + ch.Status == core.ChannelStatusClosing && + ch.StateVersion == 5 + })).Return(nil) + mockStore.On("RefreshUserEnforcedBalance", userWallet, "usdc").Return(nil) + mockStore.On("UpdateStateSigsIfMissing", channelID, uint64(5), "", "").Return(nil) + + err := service.HandleHomeChannelCheckpointed(ctx, mockStore, event) + require.NoError(t, err) + mockStore.AssertExpectations(t) + // Not challenged → no Finalize lookup and no head-sig backfill on this path. + mockStore.AssertNotCalled(t, "HasSignedFinalize", channelID) + mockStore.AssertNotCalled(t, "GetLastStateByChannelID", channelID, false) +} + func TestHandleHomeChannelChallenged_PersistsChallenge(t *testing.T) { // Channel must be marked Challenged with the challenge expiry so CheckActiveChannel and // RefreshUserEnforcedBalance stop treating it as open. Auto-checkpoint stays disabled: @@ -410,8 +453,7 @@ func TestHandleHomeChannelChallenged_PersistsChallenge(t *testing.T) { ChallengeExpiry: challengeExpiry, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusChallenged && @@ -456,8 +498,7 @@ func TestHandleHomeChannelChallenged_StaleVersionIgnored(t *testing.T) { ChallengeExpiry: uint64(time.Now().Add(time.Hour).Unix()), } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) err := service.HandleHomeChannelChallenged(ctx, mockStore, event) @@ -481,7 +522,7 @@ func TestHandleHomeChannelChallenged_ChannelNotFound(t *testing.T) { ChallengeExpiry: uint64(time.Now().Add(time.Hour).Unix()), } - mockStore.On("GetChannelByID", channelID).Return(nil, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(nil, nil) err := service.HandleHomeChannelChallenged(ctx, mockStore, event) @@ -509,7 +550,7 @@ func TestHandleHomeChannelChallenged_TypeMismatch(t *testing.T) { ChallengeExpiry: uint64(time.Now().Add(time.Hour).Unix()), } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) err := service.HandleHomeChannelChallenged(ctx, mockStore, event) @@ -548,8 +589,7 @@ func TestHandleHomeChannelChallenged_FromClosingState(t *testing.T) { ChallengeExpiry: challengeExpiry, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusChallenged && @@ -567,7 +607,7 @@ func TestHandleHomeChannelChallenged_FromClosingState(t *testing.T) { } // TestHandleHomeChannelChallenged_AcquiresUserLockBeforeMutation pins the race fix: -// the handler must call LockUserState(userWallet, asset) before UpdateChannel so an +// the handler must call LockUserStateForHomeChannel before UpdateChannel so an // in-flight receiver-issuance RPC cannot read Status=Open, node-sign a receiver state // and commit after the status flip to Challenged. See HandleHomeChannelClosed for the // same pattern. @@ -597,12 +637,11 @@ func TestHandleHomeChannelChallenged_AcquiresUserLockBeforeMutation(t *testing.T } var locked bool - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset). + mockStore.On("LockUserStateForHomeChannel", channelID). Run(func(mock.Arguments) { locked = true }). - Return(decimal.Zero, nil) + Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(core.Channel) bool { - require.True(t, locked, "LockUserState must be called before UpdateChannel") + require.True(t, locked, "LockUserStateForHomeChannel must be called before UpdateChannel") return true })).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) @@ -639,8 +678,7 @@ func TestHandleHomeChannelClosed_Success(t *testing.T) { } // Mock expectations - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusClosed && @@ -1403,8 +1441,7 @@ func TestHandleHomeChannelCheckpointed_BackfillsUserSig(t *testing.T) { UserSig: userSig, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.StateVersion == 5 })).Return(nil) @@ -1444,8 +1481,7 @@ func TestHandleHomeChannelCheckpointed_BackfillError(t *testing.T) { UserSig: "0xdeadbeef", } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, "usdc").Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, uint64(5), "0xdeadbeef", "").Return(errors.New("db error")) @@ -1533,8 +1569,7 @@ func TestHandleHomeChannelCheckpointed_BackfillsHeadNodeSig(t *testing.T) { UserSig: checkpointUserSig, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusOpen && ch.StateVersion == checkpointVersion && @@ -1623,8 +1658,7 @@ func TestHandleHomeChannelCheckpointed_HeadAlreadySigned_NoBackfill(t *testing.T UserSig: "0xusersig", } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusOpen && ch.StateVersion == checkpointVersion && @@ -1699,8 +1733,7 @@ func TestHandleHomeChannelCheckpointed_FromChallengedWithSignedFinalize(t *testi UserSig: userSig, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusClosing && @@ -1723,7 +1756,7 @@ func TestHandleHomeChannelCheckpointed_FromChallengedWithSignedFinalize(t *testi } // TestHandleHomeChannelCheckpointed_AcquiresUserLockBeforeMutation pins the race fix: -// the handler must call LockUserState before flipping Status from Challenged to Open +// the handler must call LockUserStateForHomeChannel before flipping Status from Challenged to Open // and backfilling the off-chain head. Otherwise an in-flight receiver-issuance RPC // can read Status=Challenged, choose to store an unsigned receiver row, and commit // after the backfill — leaving the latest head unsigned on a now-Open channel. @@ -1752,12 +1785,11 @@ func TestHandleHomeChannelCheckpointed_AcquiresUserLockBeforeMutation(t *testing } var locked bool - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset). + mockStore.On("LockUserStateForHomeChannel", channelID). Run(func(mock.Arguments) { locked = true }). - Return(decimal.Zero, nil) + Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(core.Channel) bool { - require.True(t, locked, "LockUserState must be called before UpdateChannel") + require.True(t, locked, "LockUserStateForHomeChannel must be called before UpdateChannel") return true })).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) @@ -1823,8 +1855,7 @@ func TestHandleHomeChannelClosed_ChallengeRescue_Squash(t *testing.T) { StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && ch.StateVersion == closureVersion })).Return(nil) @@ -1923,8 +1954,7 @@ func TestHandleHomeChannelClosed_ChallengeRescue_NoCredits(t *testing.T) { StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) // Terminal Close clears any lingering challenge expiry alongside the status flip. mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && @@ -2007,8 +2037,7 @@ func TestHandleHomeChannelClosed_ChallengeRescue_NegativeNet_ClampsToZero(t *tes StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, closureVersion, "", "").Return(nil) @@ -2092,8 +2121,7 @@ func TestHandleHomeChannelClosed_TimeoutAfterFinalize_AppendsRescue(t *testing.T StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && ch.StateVersion == closureVersion && @@ -2201,8 +2229,7 @@ func TestHandleHomeChannelClosed_CooperativeCloseAfterChallenge_ZeroRescue(t *te StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && ch.StateVersion == closureVersion && @@ -2279,8 +2306,7 @@ func TestHandleHomeChannelClosed_OpenChannel_NoRescue(t *testing.T) { StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "USDC").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, "USDC").Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, closureVersion, "", "").Return(nil) diff --git a/nitronode/event_handlers/testing.go b/nitronode/event_handlers/testing.go index dfe5be52f..8baefe5b0 100644 --- a/nitronode/event_handlers/testing.go +++ b/nitronode/event_handlers/testing.go @@ -123,6 +123,16 @@ func (m *MockStore) LockUserState(wallet, asset string) (decimal.Decimal, error) return args.Get(0).(decimal.Decimal), args.Error(1) } +// LockUserStateForHomeChannel mocks locking the balance row of the channel owner and +// returning the channel read under that lock. +func (m *MockStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, error) { + args := m.Called(channelID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*core.Channel), args.Error(1) +} + // HasSignedFinalize mocks the existence check for a node-signed Finalize state on the given home channel. func (m *MockStore) HasSignedFinalize(channelID string) (bool, error) { args := m.Called(channelID) diff --git a/nitronode/store/database/channel_test.go b/nitronode/store/database/channel_test.go index 8bcfa40af..1c765c463 100644 --- a/nitronode/store/database/channel_test.go +++ b/nitronode/store/database/channel_test.go @@ -2,6 +2,7 @@ package database import ( "fmt" + "os" "testing" "time" @@ -9,6 +10,8 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) func TestChannel_TableName(t *testing.T) { @@ -194,6 +197,141 @@ func TestDBStore_GetChannelByID(t *testing.T) { }) } +func TestDBStore_LockUserStateForHomeChannel(t *testing.T) { + t.Run("Success - returns channel and ensures balance row", func(t *testing.T) { + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + channel := core.Channel{ + ChannelID: "0xhomechannel123", + UserWallet: "0xuser123", + Asset: "usdc", + Type: core.ChannelTypeHome, + BlockchainID: 1, + TokenAddress: "0xtoken123", + ChallengeDuration: 86400, + Nonce: 1, + Status: core.ChannelStatusOpen, + StateVersion: 7, + } + require.NoError(t, store.CreateChannel(channel)) + + result, err := store.LockUserStateForHomeChannel("0xhomechannel123") + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, "0xhomechannel123", result.ChannelID) + assert.Equal(t, "0xuser123", result.UserWallet) + assert.Equal(t, core.ChannelStatusOpen, result.Status) + assert.Equal(t, uint64(7), result.StateVersion) + + // The balance row keyed by the channel's (wallet, asset) is ensured by the lock. + balances, err := store.GetUserBalances("0xuser123") + require.NoError(t, err) + require.Len(t, balances, 1) + assert.Equal(t, "usdc", balances[0].Asset) + }) + + t.Run("Channel not found returns nil", func(t *testing.T) { + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + result, err := store.LockUserStateForHomeChannel("0xnonexistent") + require.NoError(t, err) + assert.Nil(t, result) + }) + + // Regression for MF3-H02: a status flip that commits while LockUserStateForHomeChannel is + // blocked on the balance lock must be reflected in the returned channel. Requires real + // FOR UPDATE concurrency, so it only runs against Postgres. + t.Run("Postgres - returns status committed while waiting on the lock", func(t *testing.T) { + if os.Getenv("TEST_DB_DRIVER") != "postgres" { + t.Skip("requires postgres for FOR UPDATE concurrency semantics") + } + + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + channel := core.Channel{ + ChannelID: "0xhomechannel123", + UserWallet: "0xuser123", + Asset: "usdc", + Type: core.ChannelTypeHome, + BlockchainID: 1, + Status: core.ChannelStatusOpen, + StateVersion: 7, + } + require.NoError(t, store.CreateChannel(channel)) + // Ensure the balance row exists so the competing transaction has a row to lock. + _, err := store.LockUserStateForHomeChannel(channel.ChannelID) + require.NoError(t, err) + + locked := make(chan struct{}) + proceed := make(chan struct{}) + txDone := make(chan error, 1) + + // Competing transaction: hold the balance lock, flip status to Closing, commit only + // once the main call is blocking on the lock. + go func() { + txDone <- db.Transaction(func(tx *gorm.DB) error { + var b UserBalance + if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). + Where("user_wallet = ? AND asset = ?", channel.UserWallet, channel.Asset). + First(&b).Error; err != nil { + return err + } + if err := tx.Model(&Channel{}). + Where("channel_id = ?", channel.ChannelID). + Update("status", core.ChannelStatusClosing).Error; err != nil { + return err + } + close(locked) + <-proceed + return nil + }) + }() + + <-locked + + type lockResult struct { + channel *core.Channel + err error + } + got := make(chan lockResult, 1) + go func() { + ch, err := store.LockUserStateForHomeChannel(channel.ChannelID) + got <- lockResult{channel: ch, err: err} + }() + + // Wait until the call is actually blocked on the balance-row lock before letting the + // competing transaction commit Closing. A fixed sleep is non-deterministic: under load + // the goroutine might not yet be in SELECT ... FOR UPDATE, so the competitor would commit + // before the lock is contended and the race would never be exercised. Poll pg_locks for an + // ungranted lock instead — that only appears once a backend is waiting on the lock. + require.Eventually(t, func() bool { + var waiting int64 + if err := db.Raw(`SELECT count(*) FROM pg_locks WHERE NOT granted`).Scan(&waiting).Error; err != nil { + return false + } + return waiting > 0 + }, 5*time.Second, 5*time.Millisecond, "LockUserStateForHomeChannel never blocked on the balance lock") + close(proceed) + + require.NoError(t, <-txDone) + + res := <-got + require.NoError(t, res.err) + require.NotNil(t, res.channel) + assert.Equal(t, core.ChannelStatusClosing, res.channel.Status, + "channel status must reflect the transition committed while waiting on the lock") + }) +} + func TestDBStore_GetActiveHomeChannel(t *testing.T) { t.Run("Success - Get active home channel", func(t *testing.T) { db, cleanup := SetupTestDB(t) diff --git a/nitronode/store/database/db_store.go b/nitronode/store/database/db_store.go index f4b77611e..78b684819 100644 --- a/nitronode/store/database/db_store.go +++ b/nitronode/store/database/db_store.go @@ -145,6 +145,66 @@ func (s *DBStore) LockUserState(wallet, asset string) (decimal.Decimal, error) { return balance.Balance, nil } +// LockUserStateForHomeChannel acquires the balance-row lock of the user owning channelID and +// returns the channel read *after* the lock is held. The order matters: the lock is taken first, +// then the channel is read in a separate statement, so a concurrent transaction (e.g. submit_state +// co-signing a Finalize and flipping the channel to Closing) that commits while we wait on the lock +// is reflected in the returned status. Callers that previously did GetChannelByID followed by +// LockUserState must use this instead — the separate read is read-before-lock and races. +// +// Returns (nil, nil) if the channel does not exist. Channel-type checks remain the caller's +// responsibility. +func (s *DBStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, error) { + channelID = strings.ToLower(channelID) + if !strings.HasPrefix(channelID, "0x") { + channelID = "0x" + channelID + } + + // Non-postgres (sqlite in tests) cannot SELECT ... FOR UPDATE and has no real concurrency + // in those paths; resolve, ensure the balance row via LockUserState, and read directly. + if s.db.Dialector.Name() != "postgres" { + channel, err := s.GetChannelByID(channelID) + if err != nil || channel == nil { + return channel, err + } + if _, err := s.LockUserState(channel.UserWallet, channel.Asset); err != nil { + return nil, err + } + return channel, nil + } + + // Resolve the channel's (wallet, asset) lock key. These columns are immutable for a given + // channel, so reading them at this statement's snapshot is safe even though status is not. + var key struct { + UserWallet string + Asset string + } + result := s.db.Raw(`SELECT user_wallet, asset FROM channels WHERE channel_id = ?`, channelID).Scan(&key) + if result.Error != nil { + return nil, fmt.Errorf("failed to resolve lock key for channel %s: %w", channelID, result.Error) + } + if result.RowsAffected == 0 { + return nil, nil + } + + // Acquire the (wallet, asset) balance-row lock first (ensures the row, then SELECT ... FOR + // UPDATE). This blocks until any concurrent transaction holding the row commits. + if _, err := s.LockUserState(key.UserWallet, key.Asset); err != nil { + return nil, err + } + + // Read the channel only after the lock is held. A single SELECT ... FOR UPDATE OF b that + // joins channels would return c.* from the statement-start snapshot — a Finalize that flips + // status to Closing while we wait on the balance lock would not be reflected. This separate + // statement takes a fresh snapshot once the lock is acquired, so the returned status reflects + // any such committed transition. + // + // NOTE: requires READ COMMITTED isolation (Postgres default, and nitronode never overrides it). + // Under REPEATABLE READ or SERIALIZABLE this statement still sees the transaction-start + // snapshot, returning the stale pre-lock status and negating the fix. + return s.GetChannelByID(channelID) +} + // EnsureNoOngoingStateTransitions validates that no conflicting blockchain operations are pending. // This method prevents race conditions by ensuring blockchain state versions // match the user's last signed state version before accepting new transitions. diff --git a/nitronode/store/database/interface.go b/nitronode/store/database/interface.go index 98d5fc408..a3981ffb6 100644 --- a/nitronode/store/database/interface.go +++ b/nitronode/store/database/interface.go @@ -28,6 +28,13 @@ type DatabaseStore interface { // Returns the current balance or zero if the row was just inserted. LockUserState(wallet, asset string) (decimal.Decimal, error) + // LockUserStateForHomeChannel locks the balance row of the user owning channelID (must be used + // within a transaction). On postgres the lock key is derived from the channel in SQL and the + // channel is read under the lock, avoiding the stale pre-lock snapshot of a GetChannelByID + + // LockUserState pair; on non-postgres (sqlite in tests) the snapshot is taken before the lock + // for test compatibility. Returns nil if the channel does not exist. + LockUserStateForHomeChannel(channelID string) (*core.Channel, error) + // GetUserTransactions retrieves transaction history for a user with optional filters. GetUserTransactions(wallet string, asset *string, txType *core.TransactionType, fromTime *uint64, toTime *uint64, paginate *core.PaginationParams) ([]core.Transaction, core.PaginationMetadata, error) diff --git a/pkg/blockchain/evm/channel_hub_reactor.go b/pkg/blockchain/evm/channel_hub_reactor.go index e25296e25..6d71ed43c 100644 --- a/pkg/blockchain/evm/channel_hub_reactor.go +++ b/pkg/blockchain/evm/channel_hub_reactor.go @@ -81,6 +81,13 @@ type ChannelHubReactorStore interface { // in tests. LockUserState(wallet, asset string) (decimal.Decimal, error) + // LockUserStateForHomeChannel locks the balance row of the user owning channelID and + // returns the channel read under that lock, deriving the lock key from the channel in + // SQL so no stale pre-lock snapshot is used. Event handlers must use this instead of a + // GetChannelByID + LockUserState pair, which reads channel status before the lock and + // races a concurrent submit_state finalization. Returns nil if the channel is absent. + LockUserStateForHomeChannel(channelID string) (*core.Channel, error) + // UpdateStateSigsIfMissing backfills the user and/or node signatures for a stored // state when the corresponding column is currently NULL. Either signature may be // empty to skip that side. diff --git a/pkg/blockchain/evm/channel_hub_reactor_test.go b/pkg/blockchain/evm/channel_hub_reactor_test.go index 818ade62e..395aee80d 100644 --- a/pkg/blockchain/evm/channel_hub_reactor_test.go +++ b/pkg/blockchain/evm/channel_hub_reactor_test.go @@ -113,6 +113,14 @@ func (m *mockChannelHubStore) LockUserState(wallet, asset string) (decimal.Decim return args.Get(0).(decimal.Decimal), args.Error(1) } +func (m *mockChannelHubStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, error) { + args := m.Called(channelID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*core.Channel), args.Error(1) +} + func (m *mockChannelHubStore) HasSignedFinalize(channelID string) (bool, error) { args := m.Called(channelID) return args.Bool(0), args.Error(1) diff --git a/pkg/core/interface.go b/pkg/core/interface.go index 0830b94a0..f4a45e985 100644 --- a/pkg/core/interface.go +++ b/pkg/core/interface.go @@ -156,6 +156,14 @@ type ChannelHubEventHandlerStore interface { // in tests. LockUserState(wallet, asset string) (decimal.Decimal, error) + // LockUserStateForHomeChannel locks the balance row of the user owning channelID. On + // postgres it derives the lock key from the channel in SQL and returns the channel read + // under that lock; on non-postgres (sqlite in tests) the snapshot is taken before the lock + // for test compatibility. Event handlers must use this instead of a GetChannelByID + + // LockUserState pair, which reads channel status before the lock and races a concurrent + // submit_state finalization. Returns nil if the channel is absent. + LockUserStateForHomeChannel(channelID string) (*Channel, error) + // UpdateStateSigsIfMissing backfills the user and/or node signatures for a stored state // when the corresponding column is currently NULL. Used to repair the local record after // an on-chain event proves the state was enforced. Either signature may be empty to skip