diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 703f22fe3e..c17025285c 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -18,6 +18,7 @@ import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertNull import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer @@ -27,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.never +import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import org.springframework.kafka.listener.RecordInterceptor @@ -377,6 +379,64 @@ class SentryKafkaRecordInterceptorTest { verify(delegate).clearThreadState(consumer) } + @Test + fun `full lifecycle intercept success clearThreadState closes token exactly once`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + interceptor.clearThreadState(consumer) + + // token closed once by success(); clearThreadState must not re-close it + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + // delegate hooks still delegated across the full lifecycle + verify(delegate).setupThreadState(consumer) + verify(delegate).success(record, consumer) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `when delegate intercept returns null clearThreadState still finishes transaction and closes token`() { + val delegate = mock>() + val record = createRecord() + // delegate filters the record — per Spring Kafka contract, success/failure will not be invoked + whenever(delegate.intercept(record, consumer)).thenReturn(null) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + val result = interceptor.intercept(record, consumer) + interceptor.clearThreadState(consumer) + + assertNull(result) + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `when delegate intercept throws clearThreadState still finishes transaction and closes token`() { + val delegate = mock>() + val record = createRecord() + val boom = RuntimeException("delegate boom") + whenever(delegate.intercept(record, consumer)).thenThrow(boom) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + val thrown = assertFailsWith { interceptor.intercept(record, consumer) } + assertEquals(boom, thrown) + + interceptor.clearThreadState(consumer) + + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + verify(delegate).clearThreadState(consumer) + } + @Test fun `intercept cleans up stale context from previous record`() { val lifecycleToken2 = mock()