From f0203198164655dc14b3adae29c2152a720888db Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Wed, 22 Apr 2026 05:51:59 +0200 Subject: [PATCH] fix(kafka): [Queue Instrumentation 24] Read all baggage headers on consumers Pass every Kafka baggage header through trace continuation in both the raw Kafka helper and the Spring Kafka record interceptor. Previously both consumer paths used lastHeader("baggage"), which dropped all earlier baggage values and could break interop with upstream OTel or other W3C baggage producers. Reading the full header list preserves the existing baggage context during queue trace continuation. --- .../kafka/SentryKafkaConsumerTracing.java | 19 ++++++++++++--- .../kafka/SentryKafkaConsumerTracingTest.kt | 19 +++++++++++++++ .../kafka/SentryKafkaRecordInterceptor.java | 19 ++++++++++++--- .../kafka/SentryKafkaRecordInterceptorTest.kt | 23 +++++++++++++++++++ 4 files changed, 74 insertions(+), 6 deletions(-) diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java index deaa41c5ee..37c7073038 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java @@ -15,7 +15,7 @@ import io.sentry.util.SpanUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -215,9 +215,8 @@ private void finishTransaction( private @Nullable TransactionContext continueTrace( final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); - final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); final @Nullable List baggageHeaders = - baggage != null ? Collections.singletonList(baggage) : null; + headerValues(record, BaggageHeader.BAGGAGE_HEADER); return forkedScopes.continueTrace(sentryTrace, baggageHeaders); } @@ -265,4 +264,18 @@ private void finishTransaction( } return new String(header.value(), StandardCharsets.UTF_8); } + + private @Nullable List headerValues( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + @Nullable List values = null; + for (final @NotNull Header header : record.headers().headers(headerName)) { + if (header.value() != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(header.value(), StandardCharsets.UTF_8)); + } + } + return values; + } } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt index 29283102fa..38c0bf3198 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt @@ -111,6 +111,21 @@ class SentryKafkaConsumerTracingTest { verify(lifecycleToken).close() } + @Test + fun `withTracing passes all baggage headers to continueTrace`() { + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = + createRecord( + sentryTrace = sentryTraceValue, + baggageHeaders = listOf("third=party", "sentry-sample_rate=1"), + ) + + tracing.withTracingImpl(record, Callable { "done" }) + + verify(forkedScopes) + .continueTrace(eq(sentryTraceValue), eq(listOf("third=party", "sentry-sample_rate=1"))) + } + @Test fun `withTracing skips scope forking when queue tracing is disabled`() { options.isEnableQueueTracing = false @@ -193,6 +208,7 @@ class SentryKafkaConsumerTracingTest { topic: String = "my-topic", sentryTrace: String? = null, baggage: String? = null, + baggageHeaders: List? = null, messageId: String? = null, deliveryAttempt: Int? = null, enqueuedTime: String? = null, @@ -205,6 +221,9 @@ class SentryKafkaConsumerTracingTest { baggage?.let { headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) } + baggageHeaders?.forEach { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } messageId?.let { headers.add(SpanDataConvention.MESSAGING_MESSAGE_ID, it.toByteArray(StandardCharsets.UTF_8)) } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index c03d318770..025fe9762b 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -14,7 +14,7 @@ import io.sentry.util.SpanUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -126,9 +126,8 @@ private boolean isIgnored() { private @Nullable TransactionContext continueTrace( final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); - final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); final @Nullable List baggageHeaders = - baggage != null ? Collections.singletonList(baggage) : null; + headerValues(record, BaggageHeader.BAGGAGE_HEADER); return forkedScopes.continueTrace(sentryTrace, baggageHeaders); } @@ -243,6 +242,20 @@ private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwa return new String(header.value(), StandardCharsets.UTF_8); } + private @Nullable List headerValues( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + @Nullable List values = null; + for (final @NotNull Header header : record.headers().headers(headerName)) { + if (header.value() != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(header.value(), StandardCharsets.UTF_8)); + } + } + return values; + } + private static final class SentryRecordContext { final @NotNull ISentryLifecycleToken lifecycleToken; final @Nullable ITransaction transaction; diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 6191654012..9a8ad5343f 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -81,6 +81,7 @@ class SentryKafkaRecordInterceptorTest { private fun createRecordWithHeaders( sentryTrace: String? = null, baggage: String? = null, + baggageHeaders: List? = null, enqueuedTime: String? = null, deliveryAttempt: Int? = null, ): ConsumerRecord { @@ -91,6 +92,9 @@ class SentryKafkaRecordInterceptorTest { baggage?.let { headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) } + baggageHeaders?.forEach { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } enqueuedTime?.let { headers.add( SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, @@ -141,6 +145,25 @@ class SentryKafkaRecordInterceptorTest { verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) } + @Test + fun `intercept passes all baggage headers to continueTrace`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = + createRecordWithHeaders( + sentryTrace = sentryTraceValue, + baggageHeaders = listOf("third=party", "sentry-sample_rate=1"), + ) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace( + org.mockito.kotlin.eq(sentryTraceValue), + org.mockito.kotlin.eq(listOf("third=party", "sentry-sample_rate=1")), + ) + } + @Test fun `sets retry count from delivery attempt header`() { val interceptor = SentryKafkaRecordInterceptor(scopes)