diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index b07d761a92..d2302dca57 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -177,6 +177,11 @@ private boolean isIgnored() { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); } + final int bodySize = record.serializedValueSize(); + if (bodySize >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize); + } + final @Nullable Integer retryCount = retryCount(record); if (retryCount != null) { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index ac47d3654a..703f22fe3e 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -13,6 +13,7 @@ import io.sentry.kafka.SentryKafkaProducerInterceptor import io.sentry.test.initForTest import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.util.Optional import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test @@ -22,6 +23,7 @@ import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.never @@ -72,10 +74,21 @@ class SentryKafkaRecordInterceptorTest { private fun createRecord( topic: String = "my-topic", headers: RecordHeaders = RecordHeaders(), + serializedValueSize: Int = -1, ): ConsumerRecord { - val record = ConsumerRecord(topic, 0, 0L, "key", "value") - headers.forEach { record.headers().add(it) } - return record + return ConsumerRecord( + topic, + 0, + 0L, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 3, + serializedValueSize, + "key", + "value", + headers, + Optional.empty(), + ) } private fun createRecordWithHeaders( @@ -164,6 +177,26 @@ class SentryKafkaRecordInterceptorTest { ) } + @Test + fun `sets body size from serializedValueSize`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = 42) + + interceptor.intercept(record, consumer) + + assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + + @Test + fun `does not set body size when serializedValueSize is negative`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = -1) + + interceptor.intercept(record, consumer) + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + @Test fun `sets retry count from delivery attempt header`() { val interceptor = SentryKafkaRecordInterceptor(scopes)