Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 55 additions & 36 deletions tests/instrumentation/kafka_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import datetime
import os
import threading
import time
import uuid

import pytest

Expand All @@ -52,11 +52,10 @@

@pytest.fixture(scope="function")
def topics():
topics = ["test", "foo", "bar"]
suffix = uuid.uuid4().hex
topics = [f"test-{suffix}", f"foo-{suffix}", f"{suffix}-bar"]
admin_client = KafkaAdminClient(bootstrap_servers=[f"{KAFKA_HOST}:9092"])
# since kafka-python 2.1.0 we started to get failures in create_topics because topics were already there despite
# calls to delete_topics. In the meantime we found a proper fix use a big hammer and catch topics handling failures
# https://github.com/dpkp/kafka-python/issues/2557
# Use unique topic names because Kafka topic deletion is asynchronous and fixed names can leak state between tests.
try:
admin_client.create_topics([NewTopic(name, num_partitions=1, replication_factor=1) for name in topics])
except Exception:
Expand All @@ -66,53 +65,63 @@ def topics():
admin_client.delete_topics(topics)
except Exception:
pass
admin_client.close()


@pytest.fixture()
def producer():
def producer(topics):
producer = KafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:9092")
yield producer
producer.close()


@pytest.fixture()
def consumer():
def consumer(topics):
consumer = KafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092", consumer_timeout_ms=500)
consumer.subscribe(topics=["foo", "bar", "test"])
consumer.subscribe(topics=topics)
deadline = time.time() + 5
while not consumer.assignment() and time.time() < deadline:
consumer.poll(timeout_ms=100)
yield consumer
consumer.close()


def test_kafka_produce(instrument, elasticapm_client, producer, topics):
test_topic = topics[0]
elasticapm_client.begin_transaction("test")
producer.send("test", key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
elasticapm_client.end_transaction("test", "success")
transactions = elasticapm_client.events[TRANSACTION]
span = elasticapm_client.events[SPAN][0]
assert span["name"] == "Kafka SEND to test"
assert span["context"]["message"]["queue"]["name"] == "test"
assert span["name"] == f"Kafka SEND to {test_topic}"
assert span["context"]["message"]["queue"]["name"] == test_topic
assert span["context"]["destination"]["port"] == 9092
assert span["context"]["destination"]["service"]["name"] == "kafka"
assert span["context"]["destination"]["service"]["resource"] == "kafka/test"
assert span["context"]["destination"]["service"]["resource"] == f"kafka/{test_topic}"
assert span["context"]["destination"]["service"]["type"] == "messaging"


def test_kafka_produce_ignore_topic(instrument, elasticapm_client, producer, topics):
test_topic, foo_topic, bar_topic = topics
elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar")
elasticapm_client.begin_transaction("test")
producer.send(topic="foo", key=b"foo", value=b"bar")
producer.send("bar", key=b"foo", value=b"bar")
producer.send("test", key=b"foo", value=b"bar")
producer.send(topic=foo_topic, key=b"foo", value=b"bar")
producer.send(bar_topic, key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
elasticapm_client.end_transaction("test", "success")
spans = elasticapm_client.events[SPAN]
assert len(spans) == 1
assert spans[0]["name"] == "Kafka SEND to test"
assert spans[0]["name"] == f"Kafka SEND to {test_topic}"


def test_kafka_consume(instrument, elasticapm_client, producer, consumer, topics):
test_topic = topics[0]

def delayed_send():
time.sleep(0.2)
elasticapm_client.begin_transaction("foo")
producer.send("test", key=b"foo", value=b"bar")
producer.send("test", key=b"baz", value=b"bazzinga")
producer.send(test_topic, key=b"foo", value=b"bar")
producer.send(test_topic, key=b"baz", value=b"bazzinga")
producer.flush()
elasticapm_client.end_transaction("foo")

Expand All @@ -126,20 +135,22 @@ def delayed_send():
spans = elasticapm_client.events[SPAN]
# the consumer transactions should have the same trace id as the transaction that triggered the messages
assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"]
assert transactions[1]["name"] == "Kafka RECEIVE from test"
assert transactions[1]["name"] == f"Kafka RECEIVE from {test_topic}"
assert transactions[1]["type"] == "messaging"
assert transactions[1]["context"]["message"]["queue"]["name"] == "test"
assert transactions[1]["context"]["message"]["queue"]["name"] == test_topic

assert spans[2]["transaction_id"] == transactions[1]["id"]
assert spans[3]["transaction_id"] == transactions[2]["id"]


def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
test_topic = topics[0]

def delayed_send():
time.sleep(0.2)
elasticapm_client.begin_transaction("foo")
producer.send("test", key=b"foo", value=b"bar")
producer.send("test", key=b"baz", value=b"bazzinga")
producer.send(test_topic, key=b"foo", value=b"bar")
producer.send(test_topic, key=b"baz", value=b"bazzinga")
producer.flush()
elasticapm_client.end_transaction("foo")

Expand All @@ -161,13 +172,14 @@ def delayed_send():


def test_kafka_consumer_ignore_topic(instrument, elasticapm_client, producer, consumer, topics):
test_topic, foo_topic, bar_topic = topics
elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar")

def delayed_send():
time.sleep(0.2)
producer.send(topic="foo", key=b"foo", value=b"bar")
producer.send("bar", key=b"foo", value=b"bar")
producer.send("test", key=b"foo", value=b"bar")
producer.send(topic=foo_topic, key=b"foo", value=b"bar")
producer.send(bar_topic, key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
producer.flush()

thread = threading.Thread(target=delayed_send)
Expand All @@ -178,17 +190,18 @@ def delayed_send():
thread.join()
transactions = elasticapm_client.events[TRANSACTION]
assert len(transactions) == 1
assert transactions[0]["name"] == "Kafka RECEIVE from test"
assert transactions[0]["name"] == f"Kafka RECEIVE from {test_topic}"


def test_kafka_consumer_ignore_topic_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
test_topic, foo_topic, bar_topic = topics
elasticapm_client.config.update("1", ignore_message_queues="foo*,*bar")

def delayed_send():
time.sleep(0.2)
producer.send(topic="foo", key=b"foo", value=b"bar")
producer.send("bar", key=b"foo", value=b"bar")
producer.send("test", key=b"foo", value=b"bar")
producer.send(topic=foo_topic, key=b"foo", value=b"bar")
producer.send(bar_topic, key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
producer.flush()

thread = threading.Thread(target=delayed_send)
Expand All @@ -201,14 +214,16 @@ def delayed_send():
transactions = elasticapm_client.events[TRANSACTION]
spans = elasticapm_client.spans_for_transaction(transactions[0])
assert len(spans) == 1
assert spans[0]["name"] == "Kafka RECEIVE from test"
assert spans[0]["name"] == f"Kafka RECEIVE from {test_topic}"


def test_kafka_poll_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
test_topic = topics[0]

def delayed_send():
time.sleep(0.2)
producer.send("test", key=b"foo", value=b"bar")
producer.send("test", key=b"baz", value=b"bazzinga")
producer.send(test_topic, key=b"foo", value=b"bar")
producer.send(test_topic, key=b"baz", value=b"bazzinga")
producer.flush()

thread = threading.Thread(target=delayed_send)
Expand All @@ -219,21 +234,23 @@ def delayed_send():
transactions = elasticapm_client.events[TRANSACTION]
spans = elasticapm_client.events[SPAN]
assert len(spans) == 1
assert spans[0]["name"] == "Kafka POLL from bar, foo, test"
assert spans[0]["name"] == "Kafka POLL from " + ", ".join(sorted(topics))


def test_kafka_no_client(instrument, producer, consumer, topics):
test_topic = topics[0]
assert elasticapm.get_client() is None
# the following code shouldn't trigger any errors
producer.send("test", key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
for item in consumer:
pass


def test_kafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics):
test_topic = topics[0]
transaction_object = elasticapm_client.begin_transaction("transaction")
transaction_object.is_sampled = False
producer.send("test", key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
elasticapm_client.end_transaction("foo")
spans = elasticapm_client.events[SPAN]
assert len(spans) == 0
Expand All @@ -251,9 +268,11 @@ def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consume
def test_kafka_consumer_unsampled_transaction_handles_stop_iteration(
instrument, elasticapm_client, producer, consumer, topics
):
test_topic = topics[0]

def delayed_send():
time.sleep(0.2)
producer.send("test", key=b"foo", value=b"bar")
producer.send(test_topic, key=b"foo", value=b"bar")
producer.flush()

thread = threading.Thread(target=delayed_send)
Expand Down
Loading