diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 8339ceefb..5940d570b 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -28,10 +28,10 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import datetime import os import threading import time +import uuid import pytest @@ -52,11 +52,10 @@ @pytest.fixture(scope="function") def topics(): - topics = ["test", "foo", "bar"] + suffix = uuid.uuid4().hex + topics = [f"test-{suffix}", f"foo-{suffix}", f"{suffix}-bar"] admin_client = KafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"]) - # since kafka-python 2.1.0 we started to get failures in create_topics because topics were already there despite - # calls to delete_topics. In the meantime we found a proper fix use a big hammer and catch topics handling failures - # https://github.com/dpkp/kafka-python/issues/2557 + # Use unique topic names because Kafka topic deletion is asynchronous and fixed names can leak state between tests. try: admin_client.create_topics([NewTopic(name, num_partitions=1, replication_factor=1) for name in topics]) except Exception: @@ -66,53 +65,63 @@ def topics(): admin_client.delete_topics(topics) except Exception: pass + admin_client.close() @pytest.fixture() -def producer(): +def producer(topics): producer = KafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092") yield producer + producer.close() @pytest.fixture() -def consumer(): +def consumer(topics): consumer = KafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092", consumer_timeout_ms=500) - consumer.subscribe(topics=["foo", "bar", "test"]) + consumer.subscribe(topics=topics) + deadline = time.time() + 5 + while not consumer.assignment() and time.time() < deadline: + consumer.poll(timeout_ms=100) yield consumer + consumer.close() def test_kafka_produce(instrument, elasticapm_client, producer, topics): + test_topic = topics[0] elasticapm_client.begin_transaction("test") - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") elasticapm_client.end_transaction("test", "success") transactions = elasticapm_client.events[TRANSACTION] span = elasticapm_client.events[SPAN][0] - assert span["name"] == "Kafka SEND to test" - assert span["context"]["message"]["queue"]["name"] == "test" + assert span["name"] == f"Kafka SEND to {test_topic}" + assert span["context"]["message"]["queue"]["name"] == test_topic assert span["context"]["destination"]["port"] == 9092 assert span["context"]["destination"]["service"]["name"] == "kafka" - assert span["context"]["destination"]["service"]["resource"] == "kafka/test" + assert span["context"]["destination"]["service"]["resource"] == f"kafka/{test_topic}" assert span["context"]["destination"]["service"]["type"] == "messaging" def test_kafka_produce_ignore_topic(instrument, elasticapm_client, producer, topics): + test_topic, foo_topic, bar_topic = topics elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") elasticapm_client.begin_transaction("test") - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") + producer.send(topic=foo_topic, key=b"foo", value=b"bar") + producer.send(bar_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") elasticapm_client.end_transaction("test", "success") spans = elasticapm_client.events[SPAN] assert len(spans) == 1 - assert spans[0]["name"] == "Kafka SEND to test" + assert spans[0]["name"] == f"Kafka SEND to {test_topic}" def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send(test_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"baz", value=b"bazzinga") producer.flush() elasticapm_client.end_transaction("foo") @@ -126,20 +135,22 @@ def delayed_send(): spans = elasticapm_client.events[SPAN] # the consumer transactions should have the same trace id as the transaction that triggered the messages assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"] - assert transactions[1]["name"] == "Kafka RECEIVE from test" + assert transactions[1]["name"] == f"Kafka RECEIVE from {test_topic}" assert transactions[1]["type"] == "messaging" - assert transactions[1]["context"]["message"]["queue"]["name"] == "test" + assert transactions[1]["context"]["message"]["queue"]["name"] == test_topic assert spans[2]["transaction_id"] == transactions[1]["id"] assert spans[3]["transaction_id"] == transactions[2]["id"] def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) elasticapm_client.begin_transaction("foo") - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send(test_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"baz", value=b"bazzinga") producer.flush() elasticapm_client.end_transaction("foo") @@ -161,13 +172,14 @@ def delayed_send(): def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, consumer, topics): + test_topic, foo_topic, bar_topic = topics elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") + producer.send(topic=foo_topic, key=b"foo", value=b"bar") + producer.send(bar_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") producer.flush() thread = threading.Thread(target=delayed_send) @@ -178,17 +190,18 @@ def delayed_send(): thread.join() transactions = elasticapm_client.events[TRANSACTION] assert len(transactions) == 1 - assert transactions[0]["name"] == "Kafka RECEIVE from test" + assert transactions[0]["name"] == f"Kafka RECEIVE from {test_topic}" def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + test_topic, foo_topic, bar_topic = topics elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar") def delayed_send(): time.sleep(0.2) - producer.send(topic="foo", key=b"foo", value=b"bar") - producer.send("bar", key=b"foo", value=b"bar") - producer.send("test", key=b"foo", value=b"bar") + producer.send(topic=foo_topic, key=b"foo", value=b"bar") + producer.send(bar_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") producer.flush() thread = threading.Thread(target=delayed_send) @@ -201,14 +214,16 @@ def delayed_send(): transactions = elasticapm_client.events[TRANSACTION] spans = elasticapm_client.spans_for_transaction(transactions[0]) assert len(spans) == 1 - assert spans[0]["name"] == "Kafka RECEIVE from test" + assert spans[0]["name"] == f"Kafka RECEIVE from {test_topic}" def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar") - producer.send("test", key=b"baz", value=b"bazzinga") + producer.send(test_topic, key=b"foo", value=b"bar") + producer.send(test_topic, key=b"baz", value=b"bazzinga") producer.flush() thread = threading.Thread(target=delayed_send) @@ -219,21 +234,23 @@ def delayed_send(): transactions = elasticapm_client.events[TRANSACTION] spans = elasticapm_client.events[SPAN] assert len(spans) == 1 - assert spans[0]["name"] == "Kafka POLL from bar, foo, test" + assert spans[0]["name"] == "Kafka POLL from " + ", ".join(sorted(topics)) def test_kafka_no_client(instrument, producer, consumer, topics): + test_topic = topics[0] assert elasticapm.get_client() is None # the following code shouldn't trigger any errors - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") for item in consumer: pass def test_kafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics): + test_topic = topics[0] transaction_object = elasticapm_client.begin_transaction("transaction") transaction_object.is_sampled = False - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") elasticapm_client.end_transaction("foo") spans = elasticapm_client.events[SPAN] assert len(spans) == 0 @@ -251,9 +268,11 @@ def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consume def test_kafka_consumer_unsampled_transaction_handles_stop_iteration( instrument, elasticapm_client, producer, consumer, topics ): + test_topic = topics[0] + def delayed_send(): time.sleep(0.2) - producer.send("test", key=b"foo", value=b"bar") + producer.send(test_topic, key=b"foo", value=b"bar") producer.flush() thread = threading.Thread(target=delayed_send)