Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
import kotlin.test.assertTrue
import org.apache.kafka.clients.consumer.Consumer
Expand All @@ -27,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.springframework.kafka.listener.RecordInterceptor
Expand Down Expand Up @@ -377,6 +379,64 @@ class SentryKafkaRecordInterceptorTest {
verify(delegate).clearThreadState(consumer)
}

@Test
fun `full lifecycle intercept success clearThreadState closes token exactly once`() {
val delegate = mock<RecordInterceptor<String, String>>()
val record = createRecord()
whenever(delegate.intercept(record, consumer)).thenReturn(record)
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)

interceptor.setupThreadState(consumer)
interceptor.intercept(record, consumer)
interceptor.success(record, consumer)
interceptor.clearThreadState(consumer)

// token closed once by success(); clearThreadState must not re-close it
verify(lifecycleToken, times(1)).close()
assertTrue(transaction.isFinished)
// delegate hooks still delegated across the full lifecycle
verify(delegate).setupThreadState(consumer)
verify(delegate).success(record, consumer)
verify(delegate).clearThreadState(consumer)
}

@Test
fun `when delegate intercept returns null clearThreadState still finishes transaction and closes token`() {
val delegate = mock<RecordInterceptor<String, String>>()
val record = createRecord()
// delegate filters the record โ€” per Spring Kafka contract, success/failure will not be invoked
whenever(delegate.intercept(record, consumer)).thenReturn(null)
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)

interceptor.setupThreadState(consumer)
val result = interceptor.intercept(record, consumer)
interceptor.clearThreadState(consumer)

assertNull(result)
verify(lifecycleToken, times(1)).close()
assertTrue(transaction.isFinished)
verify(delegate).clearThreadState(consumer)
}

@Test
fun `when delegate intercept throws clearThreadState still finishes transaction and closes token`() {
val delegate = mock<RecordInterceptor<String, String>>()
val record = createRecord()
val boom = RuntimeException("delegate boom")
whenever(delegate.intercept(record, consumer)).thenThrow(boom)
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)

interceptor.setupThreadState(consumer)
val thrown = assertFailsWith<RuntimeException> { interceptor.intercept(record, consumer) }
assertEquals(boom, thrown)

interceptor.clearThreadState(consumer)

verify(lifecycleToken, times(1)).close()
assertTrue(transaction.isFinished)
verify(delegate).clearThreadState(consumer)
}

@Test
fun `intercept cleans up stale context from previous record`() {
val lifecycleToken2 = mock<ISentryLifecycleToken>()
Expand Down
Loading