From 271fb8bc1a60514308654faf833d9f05c2dc7ee4 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Thu, 2 Apr 2026 06:47:04 +0200 Subject: [PATCH 1/3] test(samples): Add Kafka queue system tests for Spring Boot 3 Add KafkaQueueSystemTest with e2e tests for: - Producer endpoint creates queue.publish span - Consumer creates queue.process transaction - Distributed tracing (producer and consumer share same trace) - Messaging attributes on publish span and process transaction Also add produceKafkaMessage to RestTestClient and enable sentry.enable-queue-tracing in the kafka profile properties. Requires a running Kafka broker at localhost:9092 and the sample app started with --spring.profiles.active=kafka. Co-Authored-By: Claude --- .../resources/application-kafka.properties | 2 + .../sentry/systemtest/KafkaQueueSystemTest.kt | 117 ++++++++++++++++++ .../kafka/SentryKafkaRecordInterceptor.java | 24 ++-- .../api/sentry-system-test-support.api | 2 + .../sentry/systemtest/util/RestTestClient.kt | 6 + 5 files changed, 143 insertions(+), 8 deletions(-) create mode 100644 sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties index a943f203c8..71e517b82a 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties @@ -1,4 +1,6 @@ # Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + spring.autoconfigure.exclude= spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=sentry-sample-group diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt new file mode 100644 index 0000000000..43781cf2c5 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt @@ -0,0 +1,117 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +/** + * System tests for Kafka queue instrumentation. + * + * Requires: + * - The sample app running with `--spring.profiles.active=kafka` + * - A Kafka broker at localhost:9092 + * - The mock Sentry server at localhost:8000 + */ +class KafkaQueueSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `producer endpoint creates queue publish span`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish") + } + } + + @Test + fun `consumer creates queue process transaction`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-consumer-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // The consumer runs asynchronously, so wait for the queue.process transaction + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionHaveOp(transaction, "queue.process") + } + } + + @Test + fun `producer and consumer share same trace`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("trace-test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // Capture the trace ID from the producer transaction (has queue.publish span) + var producerTraceId: String? = null + testHelper.ensureTransactionReceived { transaction, _ -> + if (testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")) { + producerTraceId = transaction.contexts.trace?.traceId?.toString() + true + } else { + false + } + } + + // Verify the consumer transaction has the same trace ID + // Use retryCount=3 since the consumer may take a moment to process + testHelper.ensureEnvelopeReceived(retryCount = 3) { envelopeString -> + val envelope = + testHelper.jsonSerializer.deserializeEnvelope(envelopeString.byteInputStream()) + ?: return@ensureEnvelopeReceived false + val txItem = + envelope.items.firstOrNull { it.header.type == io.sentry.SentryItemType.Transaction } + ?: return@ensureEnvelopeReceived false + val tx = + txItem.getTransaction(testHelper.jsonSerializer) ?: return@ensureEnvelopeReceived false + + tx.contexts.trace?.operation == "queue.process" && + tx.contexts.trace?.traceId?.toString() == producerTraceId + } + } + + @Test + fun `queue publish span has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + val span = transaction.spans.firstOrNull { it.op == "queue.publish" } + if (span == null) return@ensureTransactionReceived false + + val data = span.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } + + @Test + fun `queue process transaction has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("process-attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + if (!testHelper.doesTransactionHaveOp(transaction, "queue.process")) { + return@ensureTransactionReceived false + } + + val data = transaction.contexts.trace?.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index 419e7834a1..425fe61673 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -55,9 +55,10 @@ public SentryKafkaRecordInterceptor( final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); - continueTrace(forkedScopes, record); + final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); - final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); + final @Nullable ITransaction transaction = + startTransaction(forkedScopes, record, transactionContext); currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); return delegateIntercept(record, consumer); @@ -105,28 +106,35 @@ public void afterRecord( return record; } - private void continueTrace( + private @Nullable TransactionContext continueTrace( final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); final @Nullable List baggageHeaders = baggage != null ? Collections.singletonList(baggage) : null; - forkedScopes.continueTrace(sentryTrace, baggageHeaders); + return forkedScopes.continueTrace(sentryTrace, baggageHeaders); } private @Nullable ITransaction startTransaction( - final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @NotNull IScopes forkedScopes, + final @NotNull ConsumerRecord record, + final @Nullable TransactionContext transactionContext) { if (!forkedScopes.getOptions().isTracingEnabled()) { return null; } + final @NotNull TransactionContext txContext = + transactionContext != null + ? transactionContext + : new TransactionContext("queue.process", "queue.process"); + txContext.setName("queue.process"); + txContext.setOperation("queue.process"); + final @NotNull TransactionOptions txOptions = new TransactionOptions(); txOptions.setOrigin(TRACE_ORIGIN); txOptions.setBindToScope(true); - final @NotNull ITransaction transaction = - forkedScopes.startTransaction( - new TransactionContext("queue.process", "queue.process"), txOptions); + final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions); if (transaction.isNoOp()) { return null; diff --git a/sentry-system-test-support/api/sentry-system-test-support.api b/sentry-system-test-support/api/sentry-system-test-support.api index 83a9f288d0..1cbec85751 100644 --- a/sentry-system-test-support/api/sentry-system-test-support.api +++ b/sentry-system-test-support/api/sentry-system-test-support.api @@ -560,6 +560,8 @@ public final class io/sentry/systemtest/util/RestTestClient : io/sentry/systemte public final fun getTodo (J)Lio/sentry/systemtest/Todo; public final fun getTodoRestClient (J)Lio/sentry/systemtest/Todo; public final fun getTodoWebclient (J)Lio/sentry/systemtest/Todo; + public final fun produceKafkaMessage (Ljava/lang/String;)Ljava/lang/String; + public static synthetic fun produceKafkaMessage$default (Lio/sentry/systemtest/util/RestTestClient;Ljava/lang/String;ILjava/lang/Object;)Ljava/lang/String; public final fun saveCachedTodo (Lio/sentry/systemtest/Todo;)Lio/sentry/systemtest/Todo; } diff --git a/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt b/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt index da552ff93b..b9dc0f3cca 100644 --- a/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt +++ b/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt @@ -81,6 +81,12 @@ class RestTestClient(private val backendBaseUrl: String) : LoggingInsecureRestCl return response?.body?.string() } + fun produceKafkaMessage(message: String = "hello from sentry!"): String? { + val request = Request.Builder().url("$backendBaseUrl/kafka/produce?message=$message") + + return callTyped(request, true) + } + fun getCountMetric(): String? { val request = Request.Builder().url("$backendBaseUrl/metric/count") From 02f2007faabf4e98e010f5f901fe9c9885ad8078 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Thu, 2 Apr 2026 07:54:03 +0200 Subject: [PATCH 2/3] docs: Add rule against force-pushing stack branches Force-pushing a stack branch can cause GitHub to auto-merge or auto-close other PRs in the stack. Add explicit guidance to never use --force, --force-with-lease, or amend+push on stack branches. --- .cursor/rules/pr.mdc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.cursor/rules/pr.mdc b/.cursor/rules/pr.mdc index 08a07511c6..8f28f94c8c 100644 --- a/.cursor/rules/pr.mdc +++ b/.cursor/rules/pr.mdc @@ -258,3 +258,5 @@ git push **Never merge into the collection branch.** Syncing only happens between stack PR branches. The collection branch is untouched until the user merges PRs through GitHub. Prefer merge over rebase — it preserves commit history, doesn't invalidate existing review comments, and avoids the need for force-pushing. Only rebase if explicitly requested. + +**Never force-push stack branches.** Do not use `--force`, `--force-with-lease`, or `git push` after `git commit --amend` on branches that are part of a stack. Force-pushing a stack branch can cause GitHub to auto-merge or auto-close other PRs in the stack. If a commit needs fixing, add a new commit instead of amending. From 6f90ea7c801b8e44c2921a5957b2c7b108805cba Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Thu, 2 Apr 2026 07:55:51 +0200 Subject: [PATCH 3/3] docs: Also prohibit --amend on stack branches --- .cursor/rules/pr.mdc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.cursor/rules/pr.mdc b/.cursor/rules/pr.mdc index 8f28f94c8c..e15c0a0a56 100644 --- a/.cursor/rules/pr.mdc +++ b/.cursor/rules/pr.mdc @@ -259,4 +259,4 @@ git push Prefer merge over rebase — it preserves commit history, doesn't invalidate existing review comments, and avoids the need for force-pushing. Only rebase if explicitly requested. -**Never force-push stack branches.** Do not use `--force`, `--force-with-lease`, or `git push` after `git commit --amend` on branches that are part of a stack. Force-pushing a stack branch can cause GitHub to auto-merge or auto-close other PRs in the stack. If a commit needs fixing, add a new commit instead of amending. +**Never amend or force-push stack branches.** Do not use `git commit --amend`, `--force`, or `--force-with-lease` on branches that are part of a stack. Amending a pushed commit requires a force-push, which can cause GitHub to auto-merge or auto-close other PRs in the stack. If a commit needs fixing, add a new fixup commit instead.