From 4b69355f0c4c21598429f49609143ff9024246ee Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Fri, 22 May 2026 16:01:25 +0200 Subject: [PATCH] tests: remove flakyness from kafka tests The Kafka clients could be created/subscribed before topics existed, while fixed topic names also leaked broker state because Kafka topic deletion is async. So make topic names unique, wait for consumer partition assignment before producing and remember to close the kafka clients on teardown. Assisted-by: Cursor --- tests/instrumentation/kafka_tests.py | 91 +++++++++++++++++----------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 8339ceefb..5940d570b 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -28,10 +28,10 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import datetime import os import threading import time +import uuid import pytest @@ -52,11 +52,10 @@ @pytest.fixture(scope="function") def topics(): - topics = ["test", "foo", "bar"] + suffix = uuid.uuid4().hex + topics = [f"test-{suffix}", f"foo-{suffix}", f"{suffix}-bar"] admin_client = KafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"]) - # since kafka-python 2.1.0 we started to get failures in create_topics because topics were already there despite - # calls to delete_topics. In the meantime we found a proper fix use a big hammer and catch topics handling failures - # https://github.com/dpkp/kafka-python/issues/2557 + # Use unique topic names because Kafka topic deletion is asynchronous and fixed names can leak state between tests. try: admin_client.create_topics([NewTopic(name, num_partitions=1, replication_factor=1) for name in topics]) except Exception: @@ -66,53 +65,63 @@ def topics(): admin_client.delete_topics(topics) except Exception: pass + admin_client.close() @pytest.fixture() -def producer(): +def producer(topics): producer = KafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092") yield producer + producer.close() @pytest.fixture() -def consumer(): +def consumer(topics): consumer = KafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092", consumer_timeout_ms=500) - consumer.subscribe(topics=["foo", "bar", "test"]) + consumer.subscribe(topics=topics) + deadline = time.time() + 5 + while not consumer.assignment() and time.time() < deadline: + consumer.poll(timeout_ms=100) yield consumer + consumer.close() def test_kafka_produce(instrument, elasticapm_client, producer, topics): + test_topic = topics[0] elasticapm_client.begin_transaction("test") - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") elasticapm_client.end_transaction("test", "success") transactions = elasticapm_client.events[TRANSACTION] span = elasticapm_client.events[SPAN][0] - assert span["name"] == "Kafka SEND to test" - assert span["context"]["message"]["queue"]["name"] == "test" + assert span["name"] == f"Kafka SEND to {test_topic}" + assert span["context"]["message"]["queue"]["name"] == test_topic assert span["context"]["destination"]["port"] == 9092 assert span["context"]["destination"]["service"]["name"] == "kafka" - assert span["context"]["destination"]["service"]["resource"] == "kafka/test" + assert span["context"]["destination"]["service"]["resource"] == f"kafka/{test_topic}" assert span["context"]["destination"]["service"]["type"] == "messaging" def test_kafka_produce_ignore_topic(instrument, elasticapm_client, producer, topics): + test_topic, foo_topic, bar_topic = topics elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") elasticapm_client.begin_transaction("test") - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") + producer.send(topic=foo_topic, key=b"foo", value=b"bar") + producer.send(bar_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") elasticapm_client.end_transaction("test", "success") spans = elasticapm_client.events[SPAN] assert len(spans) == 1 - assert spans[0]["name"] == "Kafka SEND to test" + assert spans[0]["name"] == f"Kafka SEND to {test_topic}" def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send(test_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"baz", value=b"bazzinga") producer.flush() elasticapm_client.end_transaction("foo") @@ -126,20 +135,22 @@ def delayed_send(): spans = elasticapm_client.events[SPAN] # the consumer transactions should have the same trace id as the transaction that triggered the messages assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"] - assert transactions[1]["name"] == "Kafka RECEIVE from test" + assert transactions[1]["name"] == f"Kafka RECEIVE from {test_topic}" assert transactions[1]["type"] == "messaging" - assert transactions[1]["context"]["message"]["queue"]["name"] == "test" + assert transactions[1]["context"]["message"]["queue"]["name"] == test_topic assert spans[2]["transaction_id"] == transactions[1]["id"] assert spans[3]["transaction_id"] == transactions[2]["id"] def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send(test_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"baz", value=b"bazzinga") producer.flush() elasticapm_client.end_transaction("foo") @@ -161,13 +172,14 @@ def delayed_send(): def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, consumer, topics): + test_topic, foo_topic, bar_topic = topics elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") + producer.send(topic=foo_topic, key=b"foo", value=b"bar") + producer.send(bar_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") producer.flush() thread = threading.Thread(target=delayed_send) @@ -178,17 +190,18 @@ def delayed_send(): thread.join() transactions = elasticapm_client.events[TRANSACTION] assert len(transactions) == 1 - assert transactions[0]["name"] == "Kafka RECEIVE from test" + assert transactions[0]["name"] == f"Kafka RECEIVE from {test_topic}" def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + test_topic, foo_topic, bar_topic = topics elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") + producer.send(topic=foo_topic, key=b"foo", value=b"bar") + producer.send(bar_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") producer.flush() thread = threading.Thread(target=delayed_send) @@ -201,14 +214,16 @@ def delayed_send(): transactions = elasticapm_client.events[TRANSACTION] spans = elasticapm_client.spans_for_transaction(transactions[0]) assert len(spans) == 1 - assert spans[0]["name"] == "Kafka RECEIVE from test" + assert spans[0]["name"] == f"Kafka RECEIVE from {test_topic}" def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send(test_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"baz", value=b"bazzinga") producer.flush() thread = threading.Thread(target=delayed_send) @@ -219,21 +234,23 @@ def delayed_send(): transactions = elasticapm_client.events[TRANSACTION] spans = elasticapm_client.events[SPAN] assert len(spans) == 1 - assert spans[0]["name"] == "Kafka POLL from bar, foo, test" + assert spans[0]["name"] == "Kafka POLL from " + ", ".join(sorted(topics)) def test_kafka_no_client(instrument, producer, consumer, topics): + test_topic = topics[0] assert elasticapm.get_client() is None # the following code shouldn't trigger any errors - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") for item in consumer: pass def test_kafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics): + test_topic = topics[0] transaction_object = elasticapm_client.begin_transaction("transaction") transaction_object.is_sampled = False - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") elasticapm_client.end_transaction("foo") spans = elasticapm_client.events[SPAN] assert len(spans) == 0 @@ -251,9 +268,11 @@ def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consume def test_kafka_consumer_unsampled_transaction_handles_stop_iteration( instrument, elasticapm_client, producer, consumer, topics ): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") producer.flush() thread = threading.Thread(target=delayed_send)