Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ jobs:
mvn --update-snapshots verify \
org.sonarsource.scanner.maven:sonar-maven-plugin:sonar \
-Dsonar.projectKey=eclipse-ecsp_streambase -Dsonar.organization=eclipse-ecsp \
-Dcheckstyle.skip -Dpmd.skip=true
-Dcheckstyle.skip -Dpmd.skip=true
142 changes: 75 additions & 67 deletions DEPENDENCIES

Large diffs are not rendered by default.

51 changes: 32 additions & 19 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.eclipse.ecsp</groupId>
<artifactId>streambase</artifactId>
<version>1.2-SNAPSHOT</version>
<version>1.2.SPRING-SNAPSHOT</version>

<name>StreamBase library</name>
<description>Enabler for event driven processing and device messaging capabilities</description>
Expand Down Expand Up @@ -90,23 +90,23 @@
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>3.6.2</kafka.version>
<kafka.version>3.9.1</kafka.version>
<zookeeper.version>3.8.4</zookeeper.version>
<junit.version>5.5.2</junit.version>
<jackson.version>2.9.8</jackson.version>
<junit.version>5.11.4</junit.version>
<jackson.version>2.15.3</jackson.version>
<slf4j.version>2.0.13</slf4j.version>
<additionalparam>-Xdoclint:none</additionalparam>
<maven.surefire.version>2.18.1</maven.surefire.version>
<maven.surefire.version>3.5.4</maven.surefire.version>
<maven.dependency.version>2.10</maven.dependency.version>
<spring.test.version>6.1.14</spring.test.version>
<spring-boot-starter>3.3.3</spring-boot-starter>
<spring.version>6.1.14</spring.version>
<spring.test.version>7.0.5</spring.test.version>
<spring-boot-starter>4.0.3</spring-boot-starter>
<spring.version>7.0.5</spring.version>
<embedded.mongodb>3.4.3</embedded.mongodb>
<junit4.version>4.13.2</junit4.version>
<nosql.dao.version>1.1.1</nosql.dao.version>
<cache.enabler.version>1.0.0</cache.enabler.version>
<transformers.version>1.0.0</transformers.version>
<utils.version>1.1.1</utils.version>
<nosql.dao.version>1.3.SPRING-SNAPSHOT</nosql.dao.version>
<cache.enabler.version>1.0.SPRING-SNAPSHOT</cache.enabler.version>
<transformers.version>1.0.SPRING-SNAPSHOT</transformers.version>
<utils.version>1.2.SPRING-SNAPSHOT</utils.version>
<entities.version>1.1.1</entities.version>
<scala.version>2.13.14</scala.version>
<curator.version>5.3.0</curator.version>
Expand All @@ -117,7 +117,7 @@
<hivemq.mqtt.client>1.3.0</hivemq.mqtt.client>
<io.confluent.schema-registry.version>7.3.3</io.confluent.schema-registry.version>
<moquette.broker.version>0.17</moquette.broker.version>
<testcontainers.version>1.18.3</testcontainers.version>
<testcontainers.version>2.0.3</testcontainers.version>
<gson.version>2.8.9</gson.version>

<sonar.host.url>https://sonarcloud.io</sonar.host.url>
Expand Down Expand Up @@ -192,11 +192,24 @@
</snapshots>
</pluginRepository>
</pluginRepositories>

<repositories>
<repository>
<id>org.sonatype.central</id>
<url>https://central.sonatype.com/repository/maven-snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
Expand All @@ -208,7 +221,7 @@
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<artifactId>testcontainers-mongodb</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
Expand Down Expand Up @@ -726,7 +739,7 @@
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.0.1</version>
<version>1.11.4</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -946,8 +959,8 @@
<version>${maven.surefire.version}</version>
<configuration>
<argLine>${surefireArgLine} ${java.17.options}</argLine>
<forkMode>pertest</forkMode>
<childDelegation>true</childDelegation>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
<excludedGroups>${excludeTestCaseGroups}</excludedGroups>
<excludes>
<exclude>**/MqttDispatcherHealthMontiorIntegrationTest.java</exclude>
Expand All @@ -957,7 +970,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.8</version>
<version>0.8.14</version>
<configuration>
<excludes>
<exclude>org/eclipse/ecsp/analytics/aws/**/*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.rocksdb.Statistics;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchInterface;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -619,8 +620,13 @@ public synchronized KeyValueIterator<K, V> all() {
* @throws RocksDBException the rocks DB exception
*/
@Override
public void write(final WriteBatch batch) throws RocksDBException {
db.write(writeOptions, batch);
public void write(final WriteBatchInterface batch) throws RocksDBException {
if (batch instanceof WriteBatch writeBatch) {
db.write(writeOptions, writeBatch);
} else {
LOG.error("Batch must be an instance of WriteBatch");
throw new IllegalArgumentException("Batch must be an instance of WriteBatch");
}
}

/**
Expand Down Expand Up @@ -767,11 +773,16 @@ public void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> re
*/
public void addToBatch(final byte[] key,
final byte[] value,
final WriteBatch batch) throws RocksDBException {
if (value == null) {
batch.delete(key);
final WriteBatchInterface batch) throws RocksDBException {
if (batch instanceof WriteBatch writeBatch) {
if (value == null) {
writeBatch.delete(key);
} else {
writeBatch.put(key, value);
}
} else {
batch.put(key, value);
LOG.error("Batch must be an instance of WriteBatch");
throw new IllegalArgumentException("Batch must be an instance of WriteBatch");
}
}

Expand All @@ -783,7 +794,7 @@ public void addToBatch(final byte[] key,
* @throws RocksDBException the rocks DB exception
*/
@Override
public void addToBatch(KeyValue<byte[], byte[]> kvRecord, WriteBatch batch) throws RocksDBException {
public void addToBatch(KeyValue<byte[], byte[]> kvRecord, WriteBatchInterface batch) throws RocksDBException {
addToBatch(kvRecord.key, kvRecord.value, batch);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
*
*
* ******************************************************************************
*
* Copyright (c) 2023-24 Harman International
*
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
*
* distributed under the License is distributed on an "AS IS" BASIS,
*
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and
*
* limitations under the License.
*
*
*
* SPDX-License-Identifier: Apache-2.0
*
* *******************************************************************************
*
*
*/

package org.eclipse.ecsp.analytics.stream.base.constants;

/**
* The KafkaConfigConstant class contains constants for Kafka configuration properties.
*/
public final class KafkaConfigConstant {

private KafkaConfigConstant() {
// Private constructor to prevent instantiation
}

/** The Constant BROKER_ID. */
public static final String BROKER_ID = "broker.id";

/** The Constant LISTENERS. */
public static final String LISTENERS = "listeners";

/** The Constant NUM_PARTITIONS. */
public static final String NUM_PARTITIONS = "num.partitions";

/** The Constant AUTO_CREATE_TOPICS_ENABLE. */
public static final String AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable";

/** The Constant MESSAGE_MAX_BYTES. */
public static final String MESSAGE_MAX_BYTES = "message.max.bytes";

/** The Constant CONTROLLED_SHUTDOWN_ENABLE. */
public static final String CONTROLLED_SHUTDOWN_ENABLE = "controlled.shutdown.enable";

/** The Constant LOG_DIR. */
public static final String LOG_DIR = "log.dir";

/** The Constant ZOOKEEPER_CONNECT. */
public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";

/** The Constant ZOOKEEPER_SESSION_TIMEOUT_MS. */
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS = "zookeeper.session.timeout.ms";

/** The Constant ZOOKEEPER_CONNECTION_TIMEOUT_MS. */
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms";

/** The Constant LOG_RETENTION_HOURS. */
public static final String LOG_RETENTION_HOURS = "log.retention.hours";

/** The Constant DELETE_TOPIC_ENABLE. */
public static final String DELETE_TOPIC_ENABLE = "delete.topic.enable";

/** The Constant LOG_CLEANER_DEDUPE_BUFFER_SIZE. */
public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE = "log.cleaner.dedupe.buffer.size";

/** The Constant GROUP_MIN_SESSION_TIMEOUT_MS. */
public static final String GROUP_MIN_SESSION_TIMEOUT_MS = "group.min.session.timeout.ms";

/** The Constant OFFSETS_TOPIC_REPLICATION_FACTOR. */
public static final String OFFSETS_TOPIC_REPLICATION_FACTOR = "offsets.topic.replication.factor";

/** The Constant OFFSETS_TOPIC_NUM_PARTITIONS. */
public static final String OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions";
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -53,11 +52,12 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.eclipse.ecsp.analytics.stream.base.constants.KafkaConfigConstant;
import org.eclipse.ecsp.analytics.stream.base.utils.Constants;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.mutable.ArraySeq;
import scala.collection.mutable.ArrayBuffer;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -125,15 +125,15 @@ public EmbeddedKafka(final Properties config) throws IOException {
*/
private Properties effectiveConfigFrom(final Properties initialConfig) {
final Properties effectiveConfigProps = new Properties();
effectiveConfigProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
effectiveConfigProps.put(KafkaConfig.ListenersProp(), "PLAINTEXT://127.0.0.1:9092");
effectiveConfigProps.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
effectiveConfigProps.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
effectiveConfigProps.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), Constants.INT_1000000);
effectiveConfigProps.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
effectiveConfigProps.put(KafkaConfigConstant.BROKER_ID, 0);
effectiveConfigProps.put(KafkaConfigConstant.LISTENERS, "PLAINTEXT://127.0.0.1:9092");
effectiveConfigProps.put(KafkaConfigConstant.NUM_PARTITIONS, 1);
effectiveConfigProps.put(KafkaConfigConstant.AUTO_CREATE_TOPICS_ENABLE, true);
effectiveConfigProps.put(KafkaConfigConstant.MESSAGE_MAX_BYTES, Constants.INT_1000000);
effectiveConfigProps.put(KafkaConfigConstant.CONTROLLED_SHUTDOWN_ENABLE, true);

effectiveConfigProps.putAll(initialConfig);
effectiveConfigProps.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
effectiveConfigProps.setProperty(KafkaConfigConstant.LOG_DIR, logDir.getAbsolutePath());

//effectiveConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, effectiveConfig)
effectiveConfigProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Expand All @@ -150,7 +150,8 @@ private Properties effectiveConfigFrom(final Properties initialConfig) {
* @return the string
*/
public String brokerList() {
final EndPoint endPoint = ((ArraySeq<EndPoint>) kafka.advertisedListeners()).head();
// Kafka 3.9.1 returns ArrayBuffer instead of ArraySeq
final EndPoint endPoint = ((ArrayBuffer<EndPoint>) kafka.advertisedListeners()).head();
final String hostname = endPoint.host() == null ? "" : endPoint.host();

return String.join(":", hostname, Integer.toString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@
package org.eclipse.ecsp.analytics.stream.base.kafka;

import de.flapdoodle.embed.process.runtime.Network;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.test.TestCondition;
import org.eclipse.ecsp.analytics.stream.base.constants.KafkaConfigConstant;
import org.eclipse.ecsp.analytics.stream.base.constants.TestConstants;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -123,7 +122,7 @@ public void start() throws Exception {

final Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper);
LOG.debug("Starting a Kafka instance on port {} ...",
effectiveBrokerConfig.getProperty(KafkaConfig.ListenersProp()));
effectiveBrokerConfig.getProperty(KafkaConfigConstant.LISTENERS));
broker = new EmbeddedKafka(effectiveBrokerConfig);
LOG.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
broker.brokerList(), broker.zookeeperConnect());
Expand All @@ -145,19 +144,19 @@ private Properties effectiveBrokerConfigFrom(final Properties brokerConfig, fina
LOG.error("Error fetching kafka port", e);
}
effectiveConfig.putAll(brokerConfig);
effectiveConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zookeeper.connectString());
effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(),
effectiveConfig.put(KafkaConfigConstant.ZOOKEEPER_CONNECT, zookeeper.connectString());
effectiveConfig.put(KafkaConfigConstant.ZOOKEEPER_SESSION_TIMEOUT_MS,
TestConstants.INT_30 * TestConstants.THOUSAND);
effectiveConfig.put(KafkaConfig.ListenersProp(), String.format("PLAINTEXT://127.0.0.1:%s", kafkaBrokerPort));
effectiveConfig.put(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(),
effectiveConfig.put(KafkaConfigConstant.LISTENERS, String.format("PLAINTEXT://127.0.0.1:%s", kafkaBrokerPort));
effectiveConfig.put(KafkaConfigConstant.ZOOKEEPER_CONNECTION_TIMEOUT_MS,
TestConstants.INT_60 * TestConstants.THOUSAND);
effectiveConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(),
effectiveConfig.put(KafkaConfigConstant.DELETE_TOPIC_ENABLE, true);
effectiveConfig.put(KafkaConfigConstant.LOG_CLEANER_DEDUPE_BUFFER_SIZE,
TestConstants.TWO * TestConstants.LONG_1024 * TestConstants.LONG_1024);
effectiveConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
effectiveConfig.put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
effectiveConfig.put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfigConstant.GROUP_MIN_SESSION_TIMEOUT_MS, 0);
effectiveConfig.put(KafkaConfigConstant.OFFSETS_TOPIC_REPLICATION_FACTOR, (short) 1);
effectiveConfig.put(KafkaConfigConstant.OFFSETS_TOPIC_NUM_PARTITIONS, 1);
effectiveConfig.put(KafkaConfigConstant.AUTO_CREATE_TOPICS_ENABLE, true);
return effectiveConfig;
}

Expand Down
Loading
Loading