From 97be8ff5f4aef501898a531651ffac9bd47c8e34 Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Fri, 5 Jun 2026 15:40:59 +0300 Subject: [PATCH 1/3] fix(nitronode): lock home channel before reading status in event handlers MF3-H02: home-channel event handlers read the channel via GetChannelByID before acquiring LockUserState, then acted on that pre-lock snapshot. LockUserState locks the user balance row, not the channel row, so a concurrent submit_state Finalize could flip the channel Open->Closing in the read->lock window. HandleHomeChannelCheckpointed's non-challenged path persists the snapshot verbatim, so the stale Open clobbered the committed Closing and reopened a finalized channel. Add DBStore.LockUserStateForHomeChannel: it derives the (wallet, asset) lock key from the channel inside SQL and, on Postgres, locks the balance row and reads the channel in one statement (SELECT ... JOIN ... FOR UPDATE OF b), so no stale pre-lock snapshot can exist. All four home-channel handlers (Created, Checkpointed, Challenged, Closed) now lock-then-read via this method. Escrow handlers are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- nitronode/event_handlers/service.go | 84 +++++------ nitronode/event_handlers/service_test.go | 142 +++++++++++------- nitronode/event_handlers/testing.go | 10 ++ nitronode/store/database/channel_test.go | 48 ++++++ nitronode/store/database/db_store.go | 57 +++++++ nitronode/store/database/interface.go | 6 + pkg/blockchain/evm/channel_hub_reactor.go | 7 + .../evm/channel_hub_reactor_test.go | 8 + pkg/core/interface.go | 7 + 9 files changed, 264 insertions(+), 105 deletions(-) diff --git a/nitronode/event_handlers/service.go b/nitronode/event_handlers/service.go index e246c1ce7..3478d63c6 100644 --- a/nitronode/event_handlers/service.go +++ b/nitronode/event_handlers/service.go @@ -57,7 +57,15 @@ func (s *EventHandlerService) HandleNodeBalanceUpdated(ctx context.Context, tx c func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCreatedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status or backfilling the off-chain head. Receiver/release issuance paths + // lock the same row up front and then re-check Status via CheckActiveChannel; without + // this lock an in-flight RPC can read Status=Void, decide to store an unsigned receiver + // row, and commit before we flip to Open — leaving the latest head unsigned on a + // technically opened channel. The lock+read is a single store call so the channel + // snapshot is consistent with the lock. See HandleHomeChannelCheckpointed and + // HandleHomeChannelClosed for the same pattern. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -70,18 +78,6 @@ func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx c return nil } - // Acquire the user's balance-row lock before mutating channel status or - // backfilling the off-chain head. Receiver/release issuance paths lock - // the same row up front and then re-check Status via CheckActiveChannel; - // without this lock an in-flight RPC can read Status=Void, decide to store - // an unsigned receiver row, and commit before we flip to Open — leaving - // the latest head unsigned on a technically opened channel. See - // HandleHomeChannelCheckpointed and HandleHomeChannelClosed for the same - // pattern. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - if channel.Status >= core.ChannelStatusOpen { logger.Warn("ignoring replayed HomeChannelCreated event on already-initialized channel", "channelId", chanID, "currentStatus", channel.Status, "currentStateVersion", channel.StateVersion, "eventStateVersion", event.StateVersion) @@ -137,7 +133,17 @@ func (s *EventHandlerService) HandleHomeChannelMigrated(ctx context.Context, tx func (s *EventHandlerService) HandleHomeChannelCheckpointed(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCheckpointedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status or backfilling the off-chain head. Reading the channel before the + // lock would race: a concurrent submit_state can co-sign a Finalize and flip the + // channel Open→Closing between the read and the lock, and the non-challenged path + // below persists the channel snapshot verbatim — a pre-lock Open snapshot would + // silently reopen the finalized channel. Receiver issuance paths lock the same row + // and then re-check Status; without this lock an RPC can read Status=Challenged, + // decide to store an unsigned receiver row, but commit after we flip to Open and + // backfill the prior head — leaving the latest head unsigned on an Open channel. See + // HandleHomeChannelClosed for the same pattern. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -150,16 +156,6 @@ func (s *EventHandlerService) HandleHomeChannelCheckpointed(ctx context.Context, return nil } - // Acquire the user's balance-row lock before mutating channel status or - // backfilling the off-chain head. Receiver issuance paths lock the same row - // and then re-check Status; without this lock an RPC can read Status=Challenged, - // decide to store an unsigned receiver row, but commit after we flip to Open - // and backfill the prior head — leaving the latest head unsigned on an Open - // channel. See HandleHomeChannelClosed for the same pattern. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - channel.StateVersion = event.StateVersion wasChallenged := channel.Status == core.ChannelStatusChallenged @@ -269,7 +265,15 @@ func (s *EventHandlerService) backfillOffChainHeadNodeSig(ctx context.Context, t func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelChallengedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status. Receiver issuance paths (issueTransferReceiverState / + // issueReleaseReceiverState) lock the same row up front and then re-check Status via + // CheckActiveChannel; without this lock an in-flight RPC can read Status=Open, + // node-sign a receiver state, and commit after we flip to Challenged — leaving a + // node-signed higher-version receiver state on a disputed channel. The lock+read is a + // single store call so the channel snapshot is consistent with the lock. See + // HandleHomeChannelClosed for the same pattern. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -282,17 +286,6 @@ func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, t return nil } - // Acquire the user's balance-row lock before mutating channel status. Receiver - // issuance paths (issueTransferReceiverState / issueReleaseReceiverState) lock - // the same row up front and then re-check Status via CheckActiveChannel; without - // this lock an in-flight RPC can read Status=Open, node-sign a receiver state, - // and commit after we flip to Challenged — leaving a node-signed higher-version - // receiver state on a disputed channel. See HandleHomeChannelClosed for the same - // pattern. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - if event.StateVersion < channel.StateVersion { // Per protocol the challenged version cannot be lower than the last known on-chain version. // Treat as an anomaly (replay, indexer mis-order, contract bug): warn and skip persistence. @@ -360,7 +353,14 @@ func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, t func (s *EventHandlerService) HandleHomeChannelClosed(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelClosedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - channel, err := tx.GetChannelByID(chanID) + // Acquire the user's balance-row lock and read the channel under it before mutating + // channel status or summing receiver credits. issueTransferReceiverState / + // issueReleaseReceiverState lock the same row up front, so this serializes the close + // against any in-flight RPC receiver-issuance for the same user: either the RPC commits + // its unsigned row before we sum (it lands in the rescue), or it blocks until we set the + // channel to Closed and then sees that via its own re-check. The lock+read is a single + // store call so the channel snapshot is consistent with the lock. + channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { return err } @@ -373,16 +373,6 @@ func (s *EventHandlerService) HandleHomeChannelClosed(ctx context.Context, tx co return nil } - // Acquire the user's balance-row lock before mutating channel status or summing - // receiver credits. issueTransferReceiverState / issueReleaseReceiverState lock - // the same row up front, so this serializes the close against any in-flight RPC - // receiver-issuance for the same user: either the RPC commits its unsigned row - // before we sum (it lands in the rescue), or it blocks until we set the channel - // to Closed and then sees that via its own re-check. - if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil { - return err - } - wasChallenged := channel.Status == core.ChannelStatusChallenged channel.StateVersion = event.StateVersion diff --git a/nitronode/event_handlers/service_test.go b/nitronode/event_handlers/service_test.go index 98730f1f0..210c1595b 100644 --- a/nitronode/event_handlers/service_test.go +++ b/nitronode/event_handlers/service_test.go @@ -43,8 +43,7 @@ func TestHandleHomeChannelCreated_Success(t *testing.T) { } // Mock expectations - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusOpen && @@ -101,9 +100,8 @@ func TestHandleHomeChannelCreated_IgnoresReplayOnInitializedChannel(t *testing.T StateVersion: 1, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - // LockUserState is called before the replay guard, so it fires even on replays. - mockStore.On("LockUserState", channel.UserWallet, channel.Asset).Return(decimal.Zero, nil) + // LockUserStateForHomeChannel is called before the replay guard, so it fires even on replays. + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) err := service.HandleHomeChannelCreated(ctx, mockStore, event) @@ -117,7 +115,7 @@ func TestHandleHomeChannelCreated_IgnoresReplayOnInitializedChannel(t *testing.T } // TestHandleHomeChannelCreated_AcquiresUserLockBeforeMutation pins the race fix: -// the handler must call LockUserState(userWallet, asset) before UpdateChannel so an +// the handler must call LockUserStateForHomeChannel before UpdateChannel so an // in-flight receiver-issuance RPC cannot read Status=Void, store an unsigned receiver // row, and commit before we flip to Open. See HandleHomeChannelCheckpointed and // HandleHomeChannelClosed for the same pattern. @@ -146,12 +144,11 @@ func TestHandleHomeChannelCreated_AcquiresUserLockBeforeMutation(t *testing.T) { } var locked bool - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset). + mockStore.On("LockUserStateForHomeChannel", channelID). Run(func(mock.Arguments) { locked = true }). - Return(decimal.Zero, nil) + Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(core.Channel) bool { - require.True(t, locked, "LockUserState must be called before UpdateChannel") + require.True(t, locked, "LockUserStateForHomeChannel must be called before UpdateChannel") return true })).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) @@ -226,8 +223,7 @@ func TestHandleHomeChannelCreated_BackfillsUnsignedReceiverStateOnOpen(t *testin StateVersion: createVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusOpen && @@ -315,8 +311,7 @@ func TestHandleHomeChannelCreated_HeadAlreadySigned_NoBackfill(t *testing.T) { StateVersion: createVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, createVersion, "", "").Return(nil) @@ -358,8 +353,7 @@ func TestHandleHomeChannelCheckpointed_Success(t *testing.T) { } // Mock expectations - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusOpen && @@ -381,6 +375,55 @@ func TestHandleHomeChannelCheckpointed_Success(t *testing.T) { mockStore.AssertExpectations(t) } +// TestHandleHomeChannelCheckpointed_DoesNotReopenFinalizedChannel pins the race fix: a +// concurrent submit_state can co-sign a Finalize and flip the channel Open→Closing in the +// window between reading the channel and acquiring the lock. Because the handler now reads the +// channel UNDER the lock (LockUserStateForHomeChannel), it observes Closing and the +// non-challenged path leaves the status untouched — it must NOT persist Open and silently +// reopen the finalized channel. Returning the post-lock Closing snapshot here simulates the +// concurrent finalize having committed first. +func TestHandleHomeChannelCheckpointed_DoesNotReopenFinalizedChannel(t *testing.T) { + mockStore := new(MockStore) + ctx := log.SetContextLogger(context.Background(), log.NewNoopLogger()) + + service, _ := newTestEventHandlerService(t) + + channelID := "0xHomeChannel123" + userWallet := "0x1234567890123456789012345678901234567890" + + // Post-lock snapshot: a concurrent submit_state Finalize already flipped Open→Closing. + channel := &core.Channel{ + ChannelID: channelID, + UserWallet: userWallet, + Asset: "usdc", + Type: core.ChannelTypeHome, + Status: core.ChannelStatusClosing, + StateVersion: 3, + } + + event := &core.HomeChannelCheckpointedEvent{ + ChannelID: channelID, + StateVersion: 5, + } + + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) + // Status must be preserved as Closing — never reopened to Open. + mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { + return ch.ChannelID == channelID && + ch.Status == core.ChannelStatusClosing && + ch.StateVersion == 5 + })).Return(nil) + mockStore.On("RefreshUserEnforcedBalance", userWallet, "usdc").Return(nil) + mockStore.On("UpdateStateSigsIfMissing", channelID, uint64(5), "", "").Return(nil) + + err := service.HandleHomeChannelCheckpointed(ctx, mockStore, event) + require.NoError(t, err) + mockStore.AssertExpectations(t) + // Not challenged → no Finalize lookup and no head-sig backfill on this path. + mockStore.AssertNotCalled(t, "HasSignedFinalize", channelID) + mockStore.AssertNotCalled(t, "GetLastStateByChannelID", channelID, false) +} + func TestHandleHomeChannelChallenged_PersistsChallenge(t *testing.T) { // Channel must be marked Challenged with the challenge expiry so CheckActiveChannel and // RefreshUserEnforcedBalance stop treating it as open. Auto-checkpoint stays disabled: @@ -410,8 +453,7 @@ func TestHandleHomeChannelChallenged_PersistsChallenge(t *testing.T) { ChallengeExpiry: challengeExpiry, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusChallenged && @@ -456,8 +498,7 @@ func TestHandleHomeChannelChallenged_StaleVersionIgnored(t *testing.T) { ChallengeExpiry: uint64(time.Now().Add(time.Hour).Unix()), } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) err := service.HandleHomeChannelChallenged(ctx, mockStore, event) @@ -481,7 +522,7 @@ func TestHandleHomeChannelChallenged_ChannelNotFound(t *testing.T) { ChallengeExpiry: uint64(time.Now().Add(time.Hour).Unix()), } - mockStore.On("GetChannelByID", channelID).Return(nil, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(nil, nil) err := service.HandleHomeChannelChallenged(ctx, mockStore, event) @@ -509,7 +550,7 @@ func TestHandleHomeChannelChallenged_TypeMismatch(t *testing.T) { ChallengeExpiry: uint64(time.Now().Add(time.Hour).Unix()), } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) err := service.HandleHomeChannelChallenged(ctx, mockStore, event) @@ -548,8 +589,7 @@ func TestHandleHomeChannelChallenged_FromClosingState(t *testing.T) { ChallengeExpiry: challengeExpiry, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusChallenged && @@ -567,7 +607,7 @@ func TestHandleHomeChannelChallenged_FromClosingState(t *testing.T) { } // TestHandleHomeChannelChallenged_AcquiresUserLockBeforeMutation pins the race fix: -// the handler must call LockUserState(userWallet, asset) before UpdateChannel so an +// the handler must call LockUserStateForHomeChannel before UpdateChannel so an // in-flight receiver-issuance RPC cannot read Status=Open, node-sign a receiver state // and commit after the status flip to Challenged. See HandleHomeChannelClosed for the // same pattern. @@ -597,12 +637,11 @@ func TestHandleHomeChannelChallenged_AcquiresUserLockBeforeMutation(t *testing.T } var locked bool - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset). + mockStore.On("LockUserStateForHomeChannel", channelID). Run(func(mock.Arguments) { locked = true }). - Return(decimal.Zero, nil) + Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(core.Channel) bool { - require.True(t, locked, "LockUserState must be called before UpdateChannel") + require.True(t, locked, "LockUserStateForHomeChannel must be called before UpdateChannel") return true })).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) @@ -639,8 +678,7 @@ func TestHandleHomeChannelClosed_Success(t *testing.T) { } // Mock expectations - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusClosed && @@ -1403,8 +1441,7 @@ func TestHandleHomeChannelCheckpointed_BackfillsUserSig(t *testing.T) { UserSig: userSig, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.StateVersion == 5 })).Return(nil) @@ -1444,8 +1481,7 @@ func TestHandleHomeChannelCheckpointed_BackfillError(t *testing.T) { UserSig: "0xdeadbeef", } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "usdc").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, "usdc").Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, uint64(5), "0xdeadbeef", "").Return(errors.New("db error")) @@ -1533,8 +1569,7 @@ func TestHandleHomeChannelCheckpointed_BackfillsHeadNodeSig(t *testing.T) { UserSig: checkpointUserSig, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusOpen && ch.StateVersion == checkpointVersion && @@ -1623,8 +1658,7 @@ func TestHandleHomeChannelCheckpointed_HeadAlreadySigned_NoBackfill(t *testing.T UserSig: "0xusersig", } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusOpen && ch.StateVersion == checkpointVersion && @@ -1699,8 +1733,7 @@ func TestHandleHomeChannelCheckpointed_FromChallengedWithSignedFinalize(t *testi UserSig: userSig, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.ChannelID == channelID && ch.Status == core.ChannelStatusClosing && @@ -1723,7 +1756,7 @@ func TestHandleHomeChannelCheckpointed_FromChallengedWithSignedFinalize(t *testi } // TestHandleHomeChannelCheckpointed_AcquiresUserLockBeforeMutation pins the race fix: -// the handler must call LockUserState before flipping Status from Challenged to Open +// the handler must call LockUserStateForHomeChannel before flipping Status from Challenged to Open // and backfilling the off-chain head. Otherwise an in-flight receiver-issuance RPC // can read Status=Challenged, choose to store an unsigned receiver row, and commit // after the backfill — leaving the latest head unsigned on a now-Open channel. @@ -1752,12 +1785,11 @@ func TestHandleHomeChannelCheckpointed_AcquiresUserLockBeforeMutation(t *testing } var locked bool - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset). + mockStore.On("LockUserStateForHomeChannel", channelID). Run(func(mock.Arguments) { locked = true }). - Return(decimal.Zero, nil) + Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(core.Channel) bool { - require.True(t, locked, "LockUserState must be called before UpdateChannel") + require.True(t, locked, "LockUserStateForHomeChannel must be called before UpdateChannel") return true })).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) @@ -1823,8 +1855,7 @@ func TestHandleHomeChannelClosed_ChallengeRescue_Squash(t *testing.T) { StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && ch.StateVersion == closureVersion })).Return(nil) @@ -1923,8 +1954,7 @@ func TestHandleHomeChannelClosed_ChallengeRescue_NoCredits(t *testing.T) { StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) // Terminal Close clears any lingering challenge expiry alongside the status flip. mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && @@ -2007,8 +2037,7 @@ func TestHandleHomeChannelClosed_ChallengeRescue_NegativeNet_ClampsToZero(t *tes StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, asset).Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, closureVersion, "", "").Return(nil) @@ -2092,8 +2121,7 @@ func TestHandleHomeChannelClosed_TimeoutAfterFinalize_AppendsRescue(t *testing.T StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && ch.StateVersion == closureVersion && @@ -2201,8 +2229,7 @@ func TestHandleHomeChannelClosed_CooperativeCloseAfterChallenge_ZeroRescue(t *te StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, asset).Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.MatchedBy(func(ch core.Channel) bool { return ch.Status == core.ChannelStatusClosed && ch.StateVersion == closureVersion && @@ -2279,8 +2306,7 @@ func TestHandleHomeChannelClosed_OpenChannel_NoRescue(t *testing.T) { StateVersion: closureVersion, } - mockStore.On("GetChannelByID", channelID).Return(channel, nil) - mockStore.On("LockUserState", userWallet, "USDC").Return(decimal.Zero, nil) + mockStore.On("LockUserStateForHomeChannel", channelID).Return(channel, nil) mockStore.On("UpdateChannel", mock.Anything).Return(nil) mockStore.On("RefreshUserEnforcedBalance", userWallet, "USDC").Return(nil) mockStore.On("UpdateStateSigsIfMissing", channelID, closureVersion, "", "").Return(nil) diff --git a/nitronode/event_handlers/testing.go b/nitronode/event_handlers/testing.go index dfe5be52f..8baefe5b0 100644 --- a/nitronode/event_handlers/testing.go +++ b/nitronode/event_handlers/testing.go @@ -123,6 +123,16 @@ func (m *MockStore) LockUserState(wallet, asset string) (decimal.Decimal, error) return args.Get(0).(decimal.Decimal), args.Error(1) } +// LockUserStateForHomeChannel mocks locking the balance row of the channel owner and +// returning the channel read under that lock. +func (m *MockStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, error) { + args := m.Called(channelID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*core.Channel), args.Error(1) +} + // HasSignedFinalize mocks the existence check for a node-signed Finalize state on the given home channel. func (m *MockStore) HasSignedFinalize(channelID string) (bool, error) { args := m.Called(channelID) diff --git a/nitronode/store/database/channel_test.go b/nitronode/store/database/channel_test.go index 8bcfa40af..cd23909fe 100644 --- a/nitronode/store/database/channel_test.go +++ b/nitronode/store/database/channel_test.go @@ -194,6 +194,54 @@ func TestDBStore_GetChannelByID(t *testing.T) { }) } +func TestDBStore_LockUserStateForHomeChannel(t *testing.T) { + t.Run("Success - returns channel and ensures balance row", func(t *testing.T) { + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + channel := core.Channel{ + ChannelID: "0xhomechannel123", + UserWallet: "0xuser123", + Asset: "usdc", + Type: core.ChannelTypeHome, + BlockchainID: 1, + TokenAddress: "0xtoken123", + ChallengeDuration: 86400, + Nonce: 1, + Status: core.ChannelStatusOpen, + StateVersion: 7, + } + require.NoError(t, store.CreateChannel(channel)) + + result, err := store.LockUserStateForHomeChannel("0xhomechannel123") + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, "0xhomechannel123", result.ChannelID) + assert.Equal(t, "0xuser123", result.UserWallet) + assert.Equal(t, core.ChannelStatusOpen, result.Status) + assert.Equal(t, uint64(7), result.StateVersion) + + // The balance row keyed by the channel's (wallet, asset) is ensured by the lock. + balances, err := store.GetUserBalances("0xuser123") + require.NoError(t, err) + require.Len(t, balances, 1) + assert.Equal(t, "usdc", balances[0].Asset) + }) + + t.Run("Channel not found returns nil", func(t *testing.T) { + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + result, err := store.LockUserStateForHomeChannel("0xnonexistent") + require.NoError(t, err) + assert.Nil(t, result) + }) +} + func TestDBStore_GetActiveHomeChannel(t *testing.T) { t.Run("Success - Get active home channel", func(t *testing.T) { db, cleanup := SetupTestDB(t) diff --git a/nitronode/store/database/db_store.go b/nitronode/store/database/db_store.go index f4b77611e..cc02b243c 100644 --- a/nitronode/store/database/db_store.go +++ b/nitronode/store/database/db_store.go @@ -145,6 +145,63 @@ func (s *DBStore) LockUserState(wallet, asset string) (decimal.Decimal, error) { return balance.Balance, nil } +// LockUserStateForHomeChannel locks the balance row of the user owning channelID and returns +// the channel read under that lock. The (wallet, asset) lock key is derived from the channel +// inside SQL, so there is no pre-lock Go-side snapshot: a concurrent transaction (e.g. +// submit_state co-signing a Finalize and flipping the channel to Closing) cannot slip a stale +// status into the returned channel. Callers that previously did GetChannelByID followed by +// LockUserState must use this instead — the separate read is read-before-lock and races. +// +// Returns (nil, nil) if the channel does not exist. Channel-type checks remain the caller's +// responsibility. +func (s *DBStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, error) { + channelID = strings.ToLower(channelID) + if !strings.HasPrefix(channelID, "0x") { + channelID = "0x" + channelID + } + + // Non-postgres (sqlite in tests) cannot SELECT ... FOR UPDATE and has no real concurrency + // in those paths; resolve, ensure the balance row via LockUserState, and read directly. + if s.db.Dialector.Name() != "postgres" { + channel, err := s.GetChannelByID(channelID) + if err != nil || channel == nil { + return channel, err + } + if _, err := s.LockUserState(channel.UserWallet, channel.Asset); err != nil { + return nil, err + } + return channel, nil + } + + now := time.Now() + // Ensure the balance row exists, deriving (wallet, asset) from the channel so FOR UPDATE + // has a row to lock. No-op when the channel is absent or the row already exists. + if err := s.db.Exec(` + INSERT INTO user_balances (user_wallet, asset, balance, enforced, home_blockchain_id, created_at, updated_at) + SELECT user_wallet, asset, 0, 0, 0, ?, ? FROM channels WHERE channel_id = ? + ON CONFLICT (user_wallet, asset) DO NOTHING + `, now, now, channelID).Error; err != nil { + return nil, fmt.Errorf("failed to ensure user balance row for channel %s: %w", channelID, err) + } + + // Lock only the balance row (FOR UPDATE OF b) and read the channel in the same statement. + var dbChannel Channel + result := s.db.Raw(` + SELECT c.* FROM channels c + JOIN user_balances b ON b.user_wallet = c.user_wallet AND b.asset = c.asset + WHERE c.channel_id = ? + FOR UPDATE OF b + `, channelID).Scan(&dbChannel) + if result.Error != nil { + return nil, fmt.Errorf("failed to lock user state for channel %s: %w", channelID, result.Error) + } + if result.RowsAffected == 0 { + return nil, nil + } + + return databaseChannelToCore(&dbChannel), nil +} + // EnsureNoOngoingStateTransitions validates that no conflicting blockchain operations are pending. // This method prevents race conditions by ensuring blockchain state versions // match the user's last signed state version before accepting new transitions. diff --git a/nitronode/store/database/interface.go b/nitronode/store/database/interface.go index 98d5fc408..227a2cd4c 100644 --- a/nitronode/store/database/interface.go +++ b/nitronode/store/database/interface.go @@ -28,6 +28,12 @@ type DatabaseStore interface { // Returns the current balance or zero if the row was just inserted. LockUserState(wallet, asset string) (decimal.Decimal, error) + // LockUserStateForHomeChannel locks the balance row of the user owning channelID (postgres only, + // must be used within a transaction) and returns the channel read under that lock. The lock key is + // derived from the channel in SQL, avoiding the stale pre-lock snapshot of a GetChannelByID + + // LockUserState pair. Returns nil if the channel does not exist. + LockUserStateForHomeChannel(channelID string) (*core.Channel, error) + // GetUserTransactions retrieves transaction history for a user with optional filters. GetUserTransactions(wallet string, asset *string, txType *core.TransactionType, fromTime *uint64, toTime *uint64, paginate *core.PaginationParams) ([]core.Transaction, core.PaginationMetadata, error) diff --git a/pkg/blockchain/evm/channel_hub_reactor.go b/pkg/blockchain/evm/channel_hub_reactor.go index e25296e25..6d71ed43c 100644 --- a/pkg/blockchain/evm/channel_hub_reactor.go +++ b/pkg/blockchain/evm/channel_hub_reactor.go @@ -81,6 +81,13 @@ type ChannelHubReactorStore interface { // in tests. LockUserState(wallet, asset string) (decimal.Decimal, error) + // LockUserStateForHomeChannel locks the balance row of the user owning channelID and + // returns the channel read under that lock, deriving the lock key from the channel in + // SQL so no stale pre-lock snapshot is used. Event handlers must use this instead of a + // GetChannelByID + LockUserState pair, which reads channel status before the lock and + // races a concurrent submit_state finalization. Returns nil if the channel is absent. + LockUserStateForHomeChannel(channelID string) (*core.Channel, error) + // UpdateStateSigsIfMissing backfills the user and/or node signatures for a stored // state when the corresponding column is currently NULL. Either signature may be // empty to skip that side. diff --git a/pkg/blockchain/evm/channel_hub_reactor_test.go b/pkg/blockchain/evm/channel_hub_reactor_test.go index 818ade62e..395aee80d 100644 --- a/pkg/blockchain/evm/channel_hub_reactor_test.go +++ b/pkg/blockchain/evm/channel_hub_reactor_test.go @@ -113,6 +113,14 @@ func (m *mockChannelHubStore) LockUserState(wallet, asset string) (decimal.Decim return args.Get(0).(decimal.Decimal), args.Error(1) } +func (m *mockChannelHubStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, error) { + args := m.Called(channelID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*core.Channel), args.Error(1) +} + func (m *mockChannelHubStore) HasSignedFinalize(channelID string) (bool, error) { args := m.Called(channelID) return args.Bool(0), args.Error(1) diff --git a/pkg/core/interface.go b/pkg/core/interface.go index 0830b94a0..44ac359fd 100644 --- a/pkg/core/interface.go +++ b/pkg/core/interface.go @@ -156,6 +156,13 @@ type ChannelHubEventHandlerStore interface { // in tests. LockUserState(wallet, asset string) (decimal.Decimal, error) + // LockUserStateForHomeChannel locks the balance row of the user owning channelID and + // returns the channel read under that lock, deriving the lock key from the channel in + // SQL so no stale pre-lock snapshot is used. Event handlers must use this instead of a + // GetChannelByID + LockUserState pair, which reads channel status before the lock and + // races a concurrent submit_state finalization. Returns nil if the channel is absent. + LockUserStateForHomeChannel(channelID string) (*Channel, error) + // UpdateStateSigsIfMissing backfills the user and/or node signatures for a stored state // when the corresponding column is currently NULL. Used to repair the local record after // an on-chain event proves the state was enforced. Either signature may be empty to skip From 84969032fc6cdbf38b87471de77edb1a2883bc7b Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Fri, 5 Jun 2026 16:58:29 +0300 Subject: [PATCH 2/3] fix(nitronode): refresh home channel under lock to fully close MF3-H02 LockUserStateForHomeChannel previously locked the balance row and read the channel in a single `SELECT c.* ... FOR UPDATE OF b`. In READ COMMITTED, `FOR UPDATE OF b` only re-evaluates the locked user_balances row after the lock wait; the joined channels row is served from the statement-start snapshot. A checkpoint handler could begin with status=Open, block on the balance lock while a concurrent Finalize committed status=Closing, then return the stale Open channel and persist it. Acquire the (wallet, asset) balance lock first, then read the channel in a separate statement so it takes a fresh snapshot reflecting any transition committed during the wait. Add a Postgres concurrency regression that holds the balance lock, flips status to Closing, and commits while the call is blocked; it asserts the returned status is Closing (skips on non-postgres). Co-Authored-By: Claude Opus 4.8 (1M context) --- nitronode/store/database/channel_test.go | 81 ++++++++++++++++++++++++ nitronode/store/database/db_store.go | 49 +++++++------- 2 files changed, 105 insertions(+), 25 deletions(-) diff --git a/nitronode/store/database/channel_test.go b/nitronode/store/database/channel_test.go index cd23909fe..09a2a496b 100644 --- a/nitronode/store/database/channel_test.go +++ b/nitronode/store/database/channel_test.go @@ -2,6 +2,7 @@ package database import ( "fmt" + "os" "testing" "time" @@ -9,6 +10,8 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) func TestChannel_TableName(t *testing.T) { @@ -240,6 +243,84 @@ func TestDBStore_LockUserStateForHomeChannel(t *testing.T) { require.NoError(t, err) assert.Nil(t, result) }) + + // Regression for MF3-H02: a status flip that commits while LockUserStateForHomeChannel is + // blocked on the balance lock must be reflected in the returned channel. Requires real + // FOR UPDATE concurrency, so it only runs against Postgres. + t.Run("Postgres - returns status committed while waiting on the lock", func(t *testing.T) { + if os.Getenv("TEST_DB_DRIVER") != "postgres" { + t.Skip("requires postgres for FOR UPDATE concurrency semantics") + } + + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + channel := core.Channel{ + ChannelID: "0xhomechannel123", + UserWallet: "0xuser123", + Asset: "usdc", + Type: core.ChannelTypeHome, + BlockchainID: 1, + Status: core.ChannelStatusOpen, + StateVersion: 7, + } + require.NoError(t, store.CreateChannel(channel)) + // Ensure the balance row exists so the competing transaction has a row to lock. + _, err := store.LockUserStateForHomeChannel(channel.ChannelID) + require.NoError(t, err) + + locked := make(chan struct{}) + proceed := make(chan struct{}) + txDone := make(chan error, 1) + + // Competing transaction: hold the balance lock, flip status to Closing, commit only + // once the main call is blocking on the lock. + go func() { + txDone <- db.Transaction(func(tx *gorm.DB) error { + var b UserBalance + if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}). + Where("user_wallet = ? AND asset = ?", channel.UserWallet, channel.Asset). + First(&b).Error; err != nil { + return err + } + if err := tx.Model(&Channel{}). + Where("channel_id = ?", channel.ChannelID). + Update("status", core.ChannelStatusClosing).Error; err != nil { + return err + } + close(locked) + <-proceed + return nil + }) + }() + + <-locked + + type lockResult struct { + channel *core.Channel + err error + } + got := make(chan lockResult, 1) + go func() { + ch, err := store.LockUserStateForHomeChannel(channel.ChannelID) + got <- lockResult{channel: ch, err: err} + }() + + // Give the call time to start and block on the balance lock, then let the competing + // transaction commit the Closing status. + time.Sleep(200 * time.Millisecond) + close(proceed) + + require.NoError(t, <-txDone) + + res := <-got + require.NoError(t, res.err) + require.NotNil(t, res.channel) + assert.Equal(t, core.ChannelStatusClosing, res.channel.Status, + "channel status must reflect the transition committed while waiting on the lock") + }) } func TestDBStore_GetActiveHomeChannel(t *testing.T) { diff --git a/nitronode/store/database/db_store.go b/nitronode/store/database/db_store.go index cc02b243c..0a79e30ab 100644 --- a/nitronode/store/database/db_store.go +++ b/nitronode/store/database/db_store.go @@ -145,11 +145,11 @@ func (s *DBStore) LockUserState(wallet, asset string) (decimal.Decimal, error) { return balance.Balance, nil } -// LockUserStateForHomeChannel locks the balance row of the user owning channelID and returns -// the channel read under that lock. The (wallet, asset) lock key is derived from the channel -// inside SQL, so there is no pre-lock Go-side snapshot: a concurrent transaction (e.g. -// submit_state co-signing a Finalize and flipping the channel to Closing) cannot slip a stale -// status into the returned channel. Callers that previously did GetChannelByID followed by +// LockUserStateForHomeChannel acquires the balance-row lock of the user owning channelID and +// returns the channel read *after* the lock is held. The order matters: the lock is taken first, +// then the channel is read in a separate statement, so a concurrent transaction (e.g. submit_state +// co-signing a Finalize and flipping the channel to Closing) that commits while we wait on the lock +// is reflected in the returned status. Callers that previously did GetChannelByID followed by // LockUserState must use this instead — the separate read is read-before-lock and races. // // Returns (nil, nil) if the channel does not exist. Channel-type checks remain the caller's @@ -173,33 +173,32 @@ func (s *DBStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, return channel, nil } - now := time.Now() - // Ensure the balance row exists, deriving (wallet, asset) from the channel so FOR UPDATE - // has a row to lock. No-op when the channel is absent or the row already exists. - if err := s.db.Exec(` - INSERT INTO user_balances (user_wallet, asset, balance, enforced, home_blockchain_id, created_at, updated_at) - SELECT user_wallet, asset, 0, 0, 0, ?, ? FROM channels WHERE channel_id = ? - ON CONFLICT (user_wallet, asset) DO NOTHING - `, now, now, channelID).Error; err != nil { - return nil, fmt.Errorf("failed to ensure user balance row for channel %s: %w", channelID, err) + // Resolve the channel's (wallet, asset) lock key. These columns are immutable for a given + // channel, so reading them at this statement's snapshot is safe even though status is not. + var key struct { + UserWallet string + Asset string } - - // Lock only the balance row (FOR UPDATE OF b) and read the channel in the same statement. - var dbChannel Channel - result := s.db.Raw(` - SELECT c.* FROM channels c - JOIN user_balances b ON b.user_wallet = c.user_wallet AND b.asset = c.asset - WHERE c.channel_id = ? - FOR UPDATE OF b - `, channelID).Scan(&dbChannel) + result := s.db.Raw(`SELECT user_wallet, asset FROM channels WHERE channel_id = ?`, channelID).Scan(&key) if result.Error != nil { - return nil, fmt.Errorf("failed to lock user state for channel %s: %w", channelID, result.Error) + return nil, fmt.Errorf("failed to resolve lock key for channel %s: %w", channelID, result.Error) } if result.RowsAffected == 0 { return nil, nil } - return databaseChannelToCore(&dbChannel), nil + // Acquire the (wallet, asset) balance-row lock first (ensures the row, then SELECT ... FOR + // UPDATE). This blocks until any concurrent transaction holding the row commits. + if _, err := s.LockUserState(key.UserWallet, key.Asset); err != nil { + return nil, err + } + + // Read the channel only after the lock is held. A single SELECT ... FOR UPDATE OF b that + // joins channels would return c.* from the statement-start snapshot — a Finalize that flips + // status to Closing while we wait on the balance lock would not be reflected. This separate + // statement takes a fresh snapshot once the lock is acquired, so the returned status reflects + // any such committed transition. + return s.GetChannelByID(channelID) } // EnsureNoOngoingStateTransitions validates that no conflicting blockchain operations are pending. From 7f6d27562a4509fc77f503de88ba599c1f0c8ada Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Mon, 8 Jun 2026 14:09:11 +0300 Subject: [PATCH 3/3] fix(nitronode): address MF3-H02 review nits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - db_store: document READ COMMITTED isolation prerequisite for the post-lock channel refresh (negated under REPEATABLE READ/SERIALIZABLE) - core/store interface: narrow LockUserStateForHomeChannel doc — channel is read under the lock on postgres; pre-lock snapshot on sqlite (tests) - channel_test: replace fixed time.Sleep with a deterministic pg_locks poll so the wait-and-refresh race is actually contended, not a no-op - service: shorten HandleHomeChannelCreated lock comment Co-Authored-By: Claude Opus 4.8 (1M context) --- nitronode/event_handlers/service.go | 10 +++------- nitronode/store/database/channel_test.go | 15 ++++++++++++--- nitronode/store/database/db_store.go | 4 ++++ nitronode/store/database/interface.go | 9 +++++---- pkg/core/interface.go | 11 ++++++----- 5 files changed, 30 insertions(+), 19 deletions(-) diff --git a/nitronode/event_handlers/service.go b/nitronode/event_handlers/service.go index 3478d63c6..522f10405 100644 --- a/nitronode/event_handlers/service.go +++ b/nitronode/event_handlers/service.go @@ -57,13 +57,9 @@ func (s *EventHandlerService) HandleNodeBalanceUpdated(ctx context.Context, tx c func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCreatedEvent) error { logger := log.FromContext(ctx) chanID := event.ChannelID - // Acquire the user's balance-row lock and read the channel under it before mutating - // channel status or backfilling the off-chain head. Receiver/release issuance paths - // lock the same row up front and then re-check Status via CheckActiveChannel; without - // this lock an in-flight RPC can read Status=Void, decide to store an unsigned receiver - // row, and commit before we flip to Open — leaving the latest head unsigned on a - // technically opened channel. The lock+read is a single store call so the channel - // snapshot is consistent with the lock. See HandleHomeChannelCheckpointed and + // Acquire the user's balance-row lock and read the channel under it before mutating status: + // the lock guards against a concurrent submit_state flipping channel status (e.g. Void→Open + // receiver issuance) between the read and our write. See HandleHomeChannelCheckpointed and // HandleHomeChannelClosed for the same pattern. channel, err := tx.LockUserStateForHomeChannel(chanID) if err != nil { diff --git a/nitronode/store/database/channel_test.go b/nitronode/store/database/channel_test.go index 09a2a496b..1c765c463 100644 --- a/nitronode/store/database/channel_test.go +++ b/nitronode/store/database/channel_test.go @@ -308,9 +308,18 @@ func TestDBStore_LockUserStateForHomeChannel(t *testing.T) { got <- lockResult{channel: ch, err: err} }() - // Give the call time to start and block on the balance lock, then let the competing - // transaction commit the Closing status. - time.Sleep(200 * time.Millisecond) + // Wait until the call is actually blocked on the balance-row lock before letting the + // competing transaction commit Closing. A fixed sleep is non-deterministic: under load + // the goroutine might not yet be in SELECT ... FOR UPDATE, so the competitor would commit + // before the lock is contended and the race would never be exercised. Poll pg_locks for an + // ungranted lock instead — that only appears once a backend is waiting on the lock. + require.Eventually(t, func() bool { + var waiting int64 + if err := db.Raw(`SELECT count(*) FROM pg_locks WHERE NOT granted`).Scan(&waiting).Error; err != nil { + return false + } + return waiting > 0 + }, 5*time.Second, 5*time.Millisecond, "LockUserStateForHomeChannel never blocked on the balance lock") close(proceed) require.NoError(t, <-txDone) diff --git a/nitronode/store/database/db_store.go b/nitronode/store/database/db_store.go index 0a79e30ab..78b684819 100644 --- a/nitronode/store/database/db_store.go +++ b/nitronode/store/database/db_store.go @@ -198,6 +198,10 @@ func (s *DBStore) LockUserStateForHomeChannel(channelID string) (*core.Channel, // status to Closing while we wait on the balance lock would not be reflected. This separate // statement takes a fresh snapshot once the lock is acquired, so the returned status reflects // any such committed transition. + // + // NOTE: requires READ COMMITTED isolation (Postgres default, and nitronode never overrides it). + // Under REPEATABLE READ or SERIALIZABLE this statement still sees the transaction-start + // snapshot, returning the stale pre-lock status and negating the fix. return s.GetChannelByID(channelID) } diff --git a/nitronode/store/database/interface.go b/nitronode/store/database/interface.go index 227a2cd4c..a3981ffb6 100644 --- a/nitronode/store/database/interface.go +++ b/nitronode/store/database/interface.go @@ -28,10 +28,11 @@ type DatabaseStore interface { // Returns the current balance or zero if the row was just inserted. LockUserState(wallet, asset string) (decimal.Decimal, error) - // LockUserStateForHomeChannel locks the balance row of the user owning channelID (postgres only, - // must be used within a transaction) and returns the channel read under that lock. The lock key is - // derived from the channel in SQL, avoiding the stale pre-lock snapshot of a GetChannelByID + - // LockUserState pair. Returns nil if the channel does not exist. + // LockUserStateForHomeChannel locks the balance row of the user owning channelID (must be used + // within a transaction). On postgres the lock key is derived from the channel in SQL and the + // channel is read under the lock, avoiding the stale pre-lock snapshot of a GetChannelByID + + // LockUserState pair; on non-postgres (sqlite in tests) the snapshot is taken before the lock + // for test compatibility. Returns nil if the channel does not exist. LockUserStateForHomeChannel(channelID string) (*core.Channel, error) // GetUserTransactions retrieves transaction history for a user with optional filters. diff --git a/pkg/core/interface.go b/pkg/core/interface.go index 44ac359fd..f4a45e985 100644 --- a/pkg/core/interface.go +++ b/pkg/core/interface.go @@ -156,11 +156,12 @@ type ChannelHubEventHandlerStore interface { // in tests. LockUserState(wallet, asset string) (decimal.Decimal, error) - // LockUserStateForHomeChannel locks the balance row of the user owning channelID and - // returns the channel read under that lock, deriving the lock key from the channel in - // SQL so no stale pre-lock snapshot is used. Event handlers must use this instead of a - // GetChannelByID + LockUserState pair, which reads channel status before the lock and - // races a concurrent submit_state finalization. Returns nil if the channel is absent. + // LockUserStateForHomeChannel locks the balance row of the user owning channelID. On + // postgres it derives the lock key from the channel in SQL and returns the channel read + // under that lock; on non-postgres (sqlite in tests) the snapshot is taken before the lock + // for test compatibility. Event handlers must use this instead of a GetChannelByID + + // LockUserState pair, which reads channel status before the lock and races a concurrent + // submit_state finalization. Returns nil if the channel is absent. LockUserStateForHomeChannel(channelID string) (*Channel, error) // UpdateStateSigsIfMissing backfills the user and/or node signatures for a stored state