Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 33 additions & 47 deletions nitronode/event_handlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (s *EventHandlerService) HandleNodeBalanceUpdated(ctx context.Context, tx c
func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCreatedEvent) error {
logger := log.FromContext(ctx)
chanID := event.ChannelID
channel, err := tx.GetChannelByID(chanID)
// Acquire the user's balance-row lock and read the channel under it before mutating status:
// the lock guards against a concurrent submit_state flipping channel status (e.g. Void→Open
// receiver issuance) between the read and our write. See HandleHomeChannelCheckpointed and
// HandleHomeChannelClosed for the same pattern.
channel, err := tx.LockUserStateForHomeChannel(chanID)
if err != nil {
return err
}
Expand All @@ -70,18 +74,6 @@ func (s *EventHandlerService) HandleHomeChannelCreated(ctx context.Context, tx c
return nil
}

// Acquire the user's balance-row lock before mutating channel status or
// backfilling the off-chain head. Receiver/release issuance paths lock
// the same row up front and then re-check Status via CheckActiveChannel;
// without this lock an in-flight RPC can read Status=Void, decide to store
// an unsigned receiver row, and commit before we flip to Open — leaving
// the latest head unsigned on a technically opened channel. See
// HandleHomeChannelCheckpointed and HandleHomeChannelClosed for the same
// pattern.
if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil {
return err
}

if channel.Status >= core.ChannelStatusOpen {
logger.Warn("ignoring replayed HomeChannelCreated event on already-initialized channel",
"channelId", chanID, "currentStatus", channel.Status, "currentStateVersion", channel.StateVersion, "eventStateVersion", event.StateVersion)
Expand Down Expand Up @@ -137,7 +129,17 @@ func (s *EventHandlerService) HandleHomeChannelMigrated(ctx context.Context, tx
func (s *EventHandlerService) HandleHomeChannelCheckpointed(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelCheckpointedEvent) error {
logger := log.FromContext(ctx)
chanID := event.ChannelID
channel, err := tx.GetChannelByID(chanID)
// Acquire the user's balance-row lock and read the channel under it before mutating
// channel status or backfilling the off-chain head. Reading the channel before the
// lock would race: a concurrent submit_state can co-sign a Finalize and flip the
// channel Open→Closing between the read and the lock, and the non-challenged path
// below persists the channel snapshot verbatim — a pre-lock Open snapshot would
// silently reopen the finalized channel. Receiver issuance paths lock the same row
// and then re-check Status; without this lock an RPC can read Status=Challenged,
// decide to store an unsigned receiver row, but commit after we flip to Open and
// backfill the prior head — leaving the latest head unsigned on an Open channel. See
// HandleHomeChannelClosed for the same pattern.
channel, err := tx.LockUserStateForHomeChannel(chanID)
if err != nil {
return err
}
Expand All @@ -150,16 +152,6 @@ func (s *EventHandlerService) HandleHomeChannelCheckpointed(ctx context.Context,
return nil
}

// Acquire the user's balance-row lock before mutating channel status or
// backfilling the off-chain head. Receiver issuance paths lock the same row
// and then re-check Status; without this lock an RPC can read Status=Challenged,
// decide to store an unsigned receiver row, but commit after we flip to Open
// and backfill the prior head — leaving the latest head unsigned on an Open
// channel. See HandleHomeChannelClosed for the same pattern.
if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil {
return err
}

channel.StateVersion = event.StateVersion

wasChallenged := channel.Status == core.ChannelStatusChallenged
Expand Down Expand Up @@ -269,7 +261,15 @@ func (s *EventHandlerService) backfillOffChainHeadNodeSig(ctx context.Context, t
func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelChallengedEvent) error {
logger := log.FromContext(ctx)
chanID := event.ChannelID
channel, err := tx.GetChannelByID(chanID)
// Acquire the user's balance-row lock and read the channel under it before mutating
// channel status. Receiver issuance paths (issueTransferReceiverState /
// issueReleaseReceiverState) lock the same row up front and then re-check Status via
// CheckActiveChannel; without this lock an in-flight RPC can read Status=Open,
// node-sign a receiver state, and commit after we flip to Challenged — leaving a
// node-signed higher-version receiver state on a disputed channel. The lock+read is a
// single store call so the channel snapshot is consistent with the lock. See
// HandleHomeChannelClosed for the same pattern.
channel, err := tx.LockUserStateForHomeChannel(chanID)
if err != nil {
return err
}
Expand All @@ -282,17 +282,6 @@ func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, t
return nil
}

// Acquire the user's balance-row lock before mutating channel status. Receiver
// issuance paths (issueTransferReceiverState / issueReleaseReceiverState) lock
// the same row up front and then re-check Status via CheckActiveChannel; without
// this lock an in-flight RPC can read Status=Open, node-sign a receiver state,
// and commit after we flip to Challenged — leaving a node-signed higher-version
// receiver state on a disputed channel. See HandleHomeChannelClosed for the same
// pattern.
if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil {
return err
}

if event.StateVersion < channel.StateVersion {
// Per protocol the challenged version cannot be lower than the last known on-chain version.
// Treat as an anomaly (replay, indexer mis-order, contract bug): warn and skip persistence.
Expand Down Expand Up @@ -360,7 +349,14 @@ func (s *EventHandlerService) HandleHomeChannelChallenged(ctx context.Context, t
func (s *EventHandlerService) HandleHomeChannelClosed(ctx context.Context, tx core.ChannelHubEventHandlerStore, event *core.HomeChannelClosedEvent) error {
logger := log.FromContext(ctx)
chanID := event.ChannelID
channel, err := tx.GetChannelByID(chanID)
// Acquire the user's balance-row lock and read the channel under it before mutating
// channel status or summing receiver credits. issueTransferReceiverState /
// issueReleaseReceiverState lock the same row up front, so this serializes the close
// against any in-flight RPC receiver-issuance for the same user: either the RPC commits
// its unsigned row before we sum (it lands in the rescue), or it blocks until we set the
// channel to Closed and then sees that via its own re-check. The lock+read is a single
// store call so the channel snapshot is consistent with the lock.
channel, err := tx.LockUserStateForHomeChannel(chanID)
if err != nil {
return err
}
Expand All @@ -373,16 +369,6 @@ func (s *EventHandlerService) HandleHomeChannelClosed(ctx context.Context, tx co
return nil
}

// Acquire the user's balance-row lock before mutating channel status or summing
// receiver credits. issueTransferReceiverState / issueReleaseReceiverState lock
// the same row up front, so this serializes the close against any in-flight RPC
// receiver-issuance for the same user: either the RPC commits its unsigned row
// before we sum (it lands in the rescue), or it blocks until we set the channel
// to Closed and then sees that via its own re-check.
if _, err := tx.LockUserState(channel.UserWallet, channel.Asset); err != nil {
return err
}

wasChallenged := channel.Status == core.ChannelStatusChallenged

channel.StateVersion = event.StateVersion
Expand Down
Loading
Loading