From 39f1008c2da4d205bbbdba7bf93de1224ff998f1 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Tue, 25 Apr 2023 14:19:08 -0700 Subject: [PATCH 1/2] Replaced deprecated Kafka client poll with the recommended one --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 2 +- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index 98918c33e..46eac9963 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -508,7 +508,7 @@ protected void trackEventsProcessedProgress(int recordCount) { } protected ConsumerRecords consumerPoll(long pollInterval) { - return _consumer.poll(pollInterval); + return _consumer.poll(Duration.ofMillis(pollInterval)); } /** diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 676a170c9..90e00936d 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -424,7 +424,7 @@ protected void maybeCommitOffsets(Consumer consumer, boolean hardCommit) { // Kafka rejects a poll if there is empty assignment return ConsumerRecords.EMPTY; } else { - return _consumer.poll(pollInterval); + return _consumer.poll(Duration.ofMillis(pollInterval)); } } From 8dece8dfc599fcef0e92eef6b190a1f683048247 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Thu, 27 Apr 2023 13:39:54 -0700 Subject: [PATCH 2/2] Workaround for tests failing due to behavior change in first consumer poll --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 8 +++++++- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 4 ++++ gradle/maven.gradle | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index 46eac9963..aaedb0418 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -508,7 +508,13 @@ protected void trackEventsProcessedProgress(int recordCount) { } protected ConsumerRecords consumerPoll(long pollInterval) { - return _consumer.poll(Duration.ofMillis(pollInterval)); + if (pollInterval == 0) { + // Brooklin calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old + // and new poll in this case. We need to understand that behavior better before removing usages of deprecated API + return _consumer.poll(pollInterval); + } else { + return _consumer.poll(Duration.ofMillis(pollInterval)); + } } /** diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 90e00936d..e32b9ed8a 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -423,6 +423,10 @@ protected void maybeCommitOffsets(Consumer consumer, boolean hardCommit) { if (_enablePartitionAssignment && _consumerAssignment.isEmpty()) { // Kafka rejects a poll if there is empty assignment return ConsumerRecords.EMPTY; + } else if (pollInterval == 0) { + // BMM calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old poll + // and new poll in this case. We need to understand that behavior better before removing usages of deprecated API + return _consumer.poll(pollInterval); } else { return _consumer.poll(Duration.ofMillis(pollInterval)); } diff --git a/gradle/maven.gradle b/gradle/maven.gradle index c1a52034c..91465b20e 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "5.3.0-SNAPSHOT" + version = "5.3.2-SNAPSHOT" } subprojects {