diff --git a/app/src/main/java/com/p14n/postevent/App.java b/app/src/main/java/com/p14n/postevent/App.java index 268ed92..3e5166d 100644 --- a/app/src/main/java/com/p14n/postevent/App.java +++ b/app/src/main/java/com/p14n/postevent/App.java @@ -21,7 +21,7 @@ import com.p14n.postevent.data.ConfigData; import com.p14n.postevent.data.Event; import com.p14n.postevent.db.DatabaseSetup; - +import com.p14n.postevent.db.PoolSetup; import com.p14n.postevent.telemetry.OpenTelemetryFunctions; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; @@ -119,7 +119,7 @@ private static void run(String affinity, String[] write, String[] read, String d RemotePersistentConsumer cc = null; var ot = Opentelemetry.create("postevent"); - var ds = JdbcTelemetry.create(ot).wrap(DatabaseSetup.createPool(cfg)); + var ds = JdbcTelemetry.create(ot).wrap(PoolSetup.createPool(cfg)); try { if (write.length > 0) { diff --git a/build.gradle.ref b/build.gradle.ref new file mode 100644 index 0000000..e511de3 --- /dev/null +++ b/build.gradle.ref @@ -0,0 +1,178 @@ +import com.vanniktech.maven.publish.SonatypeHost +import com.vanniktech.maven.publish.JavaLibrary +import com.vanniktech.maven.publish.JavadocJar + +plugins { + id 'java' + id 'com.adarshr.test-logger' version '4.0.0' + id 'com.google.protobuf' version '0.9.2' + id "com.vanniktech.maven.publish" version "0.31.0" +} + +compileJava { + sourceCompatibility = 21 + targetCompatibility = 21 +} + +group = 'com.p14n' +version = '1.0.1-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + implementation 'io.debezium:debezium-api:3.0.1.Final' + implementation ('io.debezium:debezium-embedded:3.0.1.Final') { + exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' + exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2' + exclude group: 'org.eclipse.jetty' + } + implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final' + implementation 'io.debezium:debezium-storage-jdbc:3.0.1.Final' + implementation 'org.slf4j:slf4j-api:2.0.9' + implementation 'com.zaxxer:HikariCP:6.2.1' + + constraints { + implementation 'com.google.guava:guava:32.0.0-jre' + } + + // gRPC dependencies + implementation 'io.grpc:grpc-netty-shaded:1.53.0' + implementation 'io.grpc:grpc-protobuf:1.53.0' + implementation 'io.grpc:grpc-stub:1.53.0' + implementation 'io.grpc:grpc-api:1.53.0' + + implementation 'javax.annotation:javax.annotation-api:1.3.2' + + // For code generation + implementation 'com.google.protobuf:protobuf-java:3.21.7' + + testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0') + testImplementation 'io.zonky.test:embedded-postgres:2.0.7' + testImplementation 'net.jqwik:jqwik:1.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0' + testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11' + testImplementation 'org.mockito:mockito-core:3.12.4' + + // OpenTelemetry core dependencies + implementation 'io.opentelemetry:opentelemetry-api:1.32.0' + + // Instrumentation for GRPC + implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha' + +} + +testlogger { + theme 'standard' + showExceptions true + showStackTraces true + showFullStackTraces false + showCauses true + slowThreshold 2000 + showSummary true + showSimpleNames false + showPassed true + showSkipped true + showFailed true + showStandardStreams false + showPassedStandardStreams true + showSkippedStandardStreams true + showFailedStandardStreams true +} + +test { + useJUnitPlatform { + includeEngines 'jqwik', 'junit-jupiter' + } + maxHeapSize = "1G" + minHeapSize = "512M" + maxParallelForks = 1 + failFast = true + testLogging.showStandardStreams = true +} + +task fastTest( type: Test ) { + useJUnitPlatform { + includeEngines 'junit-jupiter' + exclude '**/dst/**' + + } +} + +jar { + manifest { + } +} + +// Configure Protobuf plugin +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc:3.21.7' + } + plugins { + grpc { + artifact = 'io.grpc:protoc-gen-grpc-java:1.53.0' + } + } + generateProtoTasks { + all()*.plugins { + grpc {} + } + } +} + +sourceSets { + main { + java { + srcDirs 'build/generated/source/proto/main/grpc' + srcDirs 'build/generated/source/proto/main/java' + } + } +} + +tasks.withType(Jar) { + duplicatesStrategy = DuplicatesStrategy.EXCLUDE +} + +javadoc { + exclude "**/grpc/**" + source = sourceSets.main.allJava +} + +mavenPublishing { + + configure(new JavaLibrary(new JavadocJar.Javadoc(), true)) + + publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true) + + signAllPublications() + + coordinates("com.p14n", "postevent", version) + + pom { + name = "Postevent" + description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC' + inceptionYear = "2025" + url = "https://github.com/p14n/postevent/" + licenses { + license { + name = 'MIT License' + url = 'https://opensource.org/licenses/MIT' + } + } + developers { + developer { + id = 'p14n' + name = 'Dean Chapman' + email = 'dean@p14n.com' + } + } + scm { + connection = 'scm:git:git://github.com/p14n/postevent.git' + developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git' + url = 'https://github.com/p14n/postevent' + } + } +} \ No newline at end of file diff --git a/core/build.gradle b/core/build.gradle index 8c0d204..a02acbe 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -17,12 +17,8 @@ repositories { dependencies { // Core database and connection pooling - implementation 'com.zaxxer:HikariCP:6.2.1' implementation 'org.slf4j:slf4j-api:2.0.9' - // Guava for utilities - implementation 'com.google.guava:guava:32.0.0-jre' - // OpenTelemetry core dependencies implementation 'io.opentelemetry:opentelemetry-api:1.32.0' diff --git a/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java b/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java index 3d954c3..f2bef1d 100644 --- a/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java +++ b/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java @@ -3,8 +3,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static java.lang.String.format; /** * Default implementation of {@link AsyncExecutor} that provides configurable @@ -49,6 +50,19 @@ public DefaultExecutor(int scheduledSize, int fixedSize) { this.es = createFixedExecutorService(fixedSize); } + protected ThreadFactory createNamedFactory(String nameFormat,ThreadFactory backingFactory){ + AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null; + return runnable -> { + Thread thread = backingFactory.newThread(runnable); + if (nameFormat != null) { + thread.setName(format(nameFormat, count.getAndIncrement())); + } + return thread; + }; + } + protected ThreadFactory createNamedFactory(String nameFormat) { + return createNamedFactory(nameFormat,Executors.defaultThreadFactory()); + } /** * Creates a fixed-size thread pool with named threads. * @@ -57,7 +71,7 @@ public DefaultExecutor(int scheduledSize, int fixedSize) { */ protected ExecutorService createFixedExecutorService(int size) { return Executors.newFixedThreadPool(size, - new ThreadFactoryBuilder().setNameFormat("post-event-fixed-%d").build()); + createNamedFactory("post-event-fixed-%d")); } /** @@ -67,8 +81,7 @@ protected ExecutorService createFixedExecutorService(int size) { */ protected ExecutorService createVirtualExecutorService() { return Executors.newThreadPerTaskExecutor( - new ThreadFactoryBuilder().setThreadFactory(Thread.ofVirtual().factory()) - .setNameFormat("post-event-virtual-%d").build()); + createNamedFactory("post-event-virtual-%d",Thread.ofVirtual().factory())); } /** @@ -79,7 +92,7 @@ protected ExecutorService createVirtualExecutorService() { */ protected ScheduledExecutorService createScheduledExecutorService(int size) { return Executors.newScheduledThreadPool(size, - new ThreadFactoryBuilder().setNameFormat("post-event-scheduled-%d").build()); + createNamedFactory("post-event-scheduled-%d")); } @Override diff --git a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java index d74e662..c7bdc30 100644 --- a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java +++ b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java @@ -1,7 +1,6 @@ package com.p14n.postevent.db; import com.p14n.postevent.data.PostEventConfig; -import com.zaxxer.hikari.HikariDataSource; import javax.sql.DataSource; @@ -35,16 +34,14 @@ * Example usage: *
* - *{@code
+ *
+ * {@code
* PostEventConfig config = // initialize configuration
* DatabaseSetup setup = new DatabaseSetup(config);
*
* // Setup all required tables for given topics
* setup.setupAll(Set.of("orders", "inventory"));
*
- * // Create connection pool
- * DataSource pool = DatabaseSetup.createPool(config);
- * }
*/
public class DatabaseSetup {
private static final Logger logger = LoggerFactory.getLogger(DatabaseSetup.class);
@@ -53,6 +50,8 @@ public class DatabaseSetup {
private final String username;
private final String password;
+ private final DataSource ds;
+
/**
* Creates a new DatabaseSetup instance using configuration from
* PostEventConfig.
@@ -74,6 +73,17 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
+ this.ds = null;
+ }
+
+ public DatabaseSetup(DataSource ds) {
+ if (ds == null) {
+ throw new IllegalArgumentException("DataSource must not be null");
+ }
+ this.jdbcUrl = null;
+ this.username = null;
+ this.password = null;
+ this.ds = ds;
}
/**
@@ -85,11 +95,27 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
* @throws RuntimeException if database operations fail
*/
public DatabaseSetup setupAll(Set topics) {
+ setupClient();
+ setupServer(topics);
+ setupDebezium();
+ return this;
+ }
+
+ public DatabaseSetup setupDebezium() {
+ clearOldSlots();
+ return this;
+ }
+
+ public DatabaseSetup setupServer(Set topics) {
+ createSchemaIfNotExists();
+ topics.stream().forEach(this::createTableIfNotExists);
+ return this;
+ }
+
+ public DatabaseSetup setupClient() {
createSchemaIfNotExists();
createMessagesTableIfNotExists();
createContiguousHwmTableIfNotExists();
- topics.stream().forEach(this::createTableIfNotExists);
- clearOldSlots();
return this;
}
@@ -287,21 +313,9 @@ topic_name VARCHAR(255) PRIMARY KEY,
* @throws SQLException if connection fails
*/
private Connection getConnection() throws SQLException {
+ if (ds != null)
+ return ds.getConnection();
return DriverManager.getConnection(jdbcUrl, username, password);
}
- /**
- * Creates and configures a connection pool using HikariCP.
- *
- * @param cfg Configuration containing database connection details
- * @return Configured DataSource
- */
- public static DataSource createPool(PostEventConfig cfg) {
- HikariDataSource ds = new HikariDataSource();
- ds.setJdbcUrl(cfg.jdbcUrl());
- ds.setUsername(cfg.dbUser());
- ds.setPassword(cfg.dbPassword());
- return ds;
- }
-
}
diff --git a/debezium/build.gradle b/debezium/build.gradle
index 64f6442..c6af107 100644
--- a/debezium/build.gradle
+++ b/debezium/build.gradle
@@ -42,6 +42,9 @@ dependencies {
constraints {
implementation 'com.google.guava:guava:32.0.0-jre'
}
+
+ // Database connection pooling
+ implementation 'com.zaxxer:HikariCP:5.0.1'
// Test dependencies
testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0')
diff --git a/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java b/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
index c394d2c..a3e89bd 100644
--- a/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
+++ b/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java
@@ -8,6 +8,7 @@
import com.p14n.postevent.data.PostEventConfig;
import com.p14n.postevent.data.UnprocessedEventFinder;
import com.p14n.postevent.db.DatabaseSetup;
+import com.p14n.postevent.db.PoolSetup;
import io.opentelemetry.api.OpenTelemetry;
@@ -104,7 +105,7 @@ public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, OpenTelemetry
* @param batchSize Maximum number of events to process in a batch
*/
public LocalPersistentConsumer(PostEventConfig cfg, OpenTelemetry ot) {
- this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2, 10), ot, 10);
+ this(PoolSetup.createPool(cfg), cfg, new DefaultExecutor(2, 10), ot, 10);
}
/**
diff --git a/debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java b/debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java
new file mode 100644
index 0000000..3157de1
--- /dev/null
+++ b/debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java
@@ -0,0 +1,23 @@
+package com.p14n.postevent.db;
+
+import com.p14n.postevent.data.PostEventConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+public class PoolSetup {
+ /**
+ * Creates and configures a connection pool using HikariCP.
+ *
+ * @param cfg Configuration containing database connection details
+ * @return Configured DataSource
+ */
+ public static DataSource createPool(PostEventConfig cfg) {
+ HikariDataSource ds = new HikariDataSource();
+ ds.setJdbcUrl(cfg.jdbcUrl());
+ ds.setUsername(cfg.dbUser());
+ ds.setPassword(cfg.dbPassword());
+ return ds;
+ }
+
+}
diff --git a/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java b/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java
index 672a099..c3d7fb4 100644
--- a/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java
+++ b/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java
@@ -13,7 +13,7 @@
import com.p14n.postevent.catchup.CatchupServer;
import com.p14n.postevent.catchup.remote.CatchupGrpcServer;
import com.p14n.postevent.data.ConfigData;
-import com.p14n.postevent.db.DatabaseSetup;
+import com.p14n.postevent.db.PoolSetup;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@@ -72,7 +72,7 @@ public class ConsumerServer implements AutoCloseable {
* @param ot The OpenTelemetry instance for monitoring and tracing
*/
public ConsumerServer(ConfigData cfg, OpenTelemetry ot) {
- this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2), ot);
+ this(PoolSetup.createPool(cfg), cfg, new DefaultExecutor(2), ot);
}
/**
diff --git a/settings.gradle b/settings.gradle
index e5f8e62..04d7570 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1,2 @@
rootProject.name = 'postevent'
-include 'core', 'debezium', 'grpc', 'app'
\ No newline at end of file
+include 'core', 'debezium', 'grpc', 'vertx', 'app'
\ No newline at end of file
diff --git a/vertx/README.md b/vertx/README.md
new file mode 100644
index 0000000..dfab57a
--- /dev/null
+++ b/vertx/README.md
@@ -0,0 +1,14 @@
+# Vert.x EventBus Integration
+
+## Server implementation
+EventBusMessageBroker - allows publishing to persistent topics, handles translation between vertx and postevent
+VertxConsumerServer - sets up DDL for given topics and starts catchup for those topics
+
+## Client implementation
+VertxPersistentConsumer - consumes events from vertx eventbus. Creates system event bus and catchup client, handles translation between vertx and postevent on the transactional consumer side.
+
+Todo
+ - [x] Implement autoclose on new classes
+ - [ ] Adapt classes to use vertx threading model
+
+
diff --git a/vertx/build.gradle b/vertx/build.gradle
new file mode 100644
index 0000000..8c0917a
--- /dev/null
+++ b/vertx/build.gradle
@@ -0,0 +1,80 @@
+plugins {
+ id 'java'
+ id 'com.adarshr.test-logger' version '4.0.0'
+}
+
+compileJava {
+ sourceCompatibility = 21
+ targetCompatibility = 21
+}
+
+group = 'com.p14n'
+version = '1.0.1-SNAPSHOT'
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ // Dependency on core module
+ implementation project(':core')
+
+ // Vert.x dependencies
+ implementation 'io.vertx:vertx-core:5.0.4'
+
+ // OpenTelemetry (from core module)
+ implementation 'io.opentelemetry:opentelemetry-api:1.32.0'
+
+ // Logging
+ implementation 'org.slf4j:slf4j-api:2.0.9'
+
+ // Security constraint
+ constraints {
+ implementation 'com.google.guava:guava:32.0.0-jre'
+ }
+
+ // Test dependencies
+ testImplementation 'io.vertx:vertx-junit5:4.5.1'
+ testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0')
+ testImplementation 'io.zonky.test:embedded-postgres:2.0.7'
+ testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0'
+ testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0'
+ testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11'
+ testImplementation 'org.mockito:mockito-core:3.12.4'
+}
+
+testlogger {
+ theme 'standard'
+ showExceptions true
+ showStackTraces true
+ showFullStackTraces false
+ showCauses true
+ slowThreshold 2000
+ showSummary true
+ showSimpleNames false
+ showPassed true
+ showSkipped true
+ showFailed true
+ showStandardStreams false
+ showPassedStandardStreams true
+ showSkippedStandardStreams true
+ showFailedStandardStreams true
+}
+
+test {
+ useJUnitPlatform()
+ maxHeapSize = "1G"
+ minHeapSize = "512M"
+ maxParallelForks = 1
+ failFast = true
+ testLogging.showStandardStreams = true
+}
+
+jar {
+ manifest {
+ }
+}
+
+tasks.withType(Jar) {
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
new file mode 100644
index 0000000..7740aac
--- /dev/null
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
@@ -0,0 +1,59 @@
+package com.p14n.postevent.vertx;
+
+import com.p14n.postevent.vertx.adapter.EventBusCatchupService;
+import com.p14n.postevent.vertx.adapter.EventBusMessageBroker;
+import com.p14n.postevent.broker.AsyncExecutor;
+import com.p14n.postevent.catchup.CatchupServer;
+import com.p14n.postevent.db.DatabaseSetup;
+import io.opentelemetry.api.OpenTelemetry;
+import io.vertx.core.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public class VertxConsumerServer implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(VertxConsumerServer.class);
+
+ private DataSource ds;
+ //private ConfigData cfg;
+ private List closeables;
+ private AsyncExecutor asyncExecutor;
+ OpenTelemetry ot;
+
+ public VertxConsumerServer(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot) {
+ this.ds = ds;
+ this.asyncExecutor = asyncExecutor;
+ this.ot = ot;
+ }
+
+ public void start(EventBus eb, EventBusMessageBroker mb, Set topics) throws IOException, InterruptedException {
+ logger.atInfo().log("Starting consumer server");
+
+ var db = new DatabaseSetup(ds);
+ db.setupServer(topics);
+ var catchupServer = new CatchupServer(ds);
+ var catchupService = new EventBusCatchupService(catchupServer,eb,topics,this.asyncExecutor);
+
+ closeables = List.of(catchupService, mb, asyncExecutor);
+ System.out.println("🌐 Vert.x EventBus server started");
+
+ }
+
+ @Override
+ public void close() {
+ if(closeables != null){
+ for(var c : closeables){
+ try {
+ c.close();
+ } catch (Exception e){
+
+ }
+ }
+ }
+ System.out.println("🛑 Vert.x EventBus server stopped");
+ }
+}
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
new file mode 100644
index 0000000..c228766
--- /dev/null
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
@@ -0,0 +1,161 @@
+package com.p14n.postevent.vertx;
+
+import com.p14n.postevent.Publisher;
+import com.p14n.postevent.db.DatabaseSetup;
+import com.p14n.postevent.vertx.adapter.EventBusMessageBroker;
+import com.p14n.postevent.broker.*;
+import com.p14n.postevent.catchup.CatchupService;
+import com.p14n.postevent.catchup.PersistentBroker;
+import com.p14n.postevent.catchup.UnprocessedSubmitter;
+import com.p14n.postevent.vertx.client.EventBusCatchupClient;
+import com.p14n.postevent.data.UnprocessedEventFinder;
+import io.opentelemetry.api.OpenTelemetry;
+import io.vertx.core.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class VertxPersistentConsumer implements AutoCloseable, MessageBroker {
+
+ private static final Logger logger = LoggerFactory.getLogger(VertxPersistentConsumer.class);
+
+ private AsyncExecutor asyncExecutor;
+ private List closeables;
+ private TransactionalBroker tb;
+ SystemEventBroker seb;
+ OpenTelemetry ot;
+ private final int batchSize;
+
+ public VertxPersistentConsumer(OpenTelemetry ot, AsyncExecutor asyncExecutor, int batchSize) {
+ this.asyncExecutor = asyncExecutor;
+ this.ot = ot;
+ this.batchSize = batchSize;
+ }
+
+ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) {
+ logger.atInfo().log("Starting consumer client");
+
+ if (tb != null) {
+ logger.atError().log("Consumer client already started");
+ throw new IllegalStateException("Already started");
+ }
+ var db = new DatabaseSetup(ds);
+ db.setupClient();
+
+ try {
+ seb = new SystemEventBroker(asyncExecutor, ot);
+ tb = new TransactionalBroker(ds, asyncExecutor, ot, seb);
+ var pb = new PersistentBroker<>(tb, ds, seb);
+ var catchupClient = new EventBusCatchupClient(eb);
+
+ for (var topic : topics) {
+ mb.subscribeToEventBus(topic,pb);
+ }
+ seb.subscribe(new CatchupService(ds, catchupClient, seb));
+ seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize));
+
+ asyncExecutor.scheduleAtFixedRate(
+ () -> {
+ seb.publish(SystemEvent.UnprocessedCheckRequired);
+ for (String topic : topics) {
+ seb.publish(SystemEvent.FetchLatest.withTopic(topic));
+ }
+ },
+ 30, 30, TimeUnit.SECONDS);
+
+ closeables = List.of(pb, seb, tb);
+
+ logger.atInfo().log("Consumer client started successfully");
+
+
+ } catch (Exception e) {
+ logger.atError()
+ .setCause(e)
+ .log("Failed to start consumer client");
+ throw new RuntimeException("Failed to start consumer client", e);
+ }
+ }
+
+ /**
+ * Closes all resources associated with this consumer.
+ * This includes the message brokers, gRPC channel, and other closeable
+ * resources.
+ */
+ @Override
+ public void close() {
+ logger.atInfo().log("Closing consumer client");
+
+ for (AutoCloseable c : closeables) {
+ try {
+ if(c != null) c.close();
+ } catch (Exception e) {
+ logger.atWarn()
+ .setCause(e)
+ .addArgument(c.getClass().getSimpleName())
+ .log("Error closing {}");
+ }
+ }
+
+ logger.atInfo().log("Consumer client closed");
+ }
+
+ /**
+ * Publishes a transactional event to the specified topic.
+ *
+ * @param topic The topic to publish to
+ * @param message The transactional event to publish
+ * @throws RuntimeException if publishing fails
+ */
+ @Override
+ public void publish(String topic, TransactionalEvent message) {
+ try {
+ Publisher.publish(message.event(), message.connection(), topic);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Subscribes to events on the specified topic.
+ * Triggers a catchup event to ensure the subscriber receives any missed events.
+ *
+ * @param topic The topic to subscribe to
+ * @param subscriber The subscriber that will receive events
+ * @return true if subscription was successful, false otherwise
+ */
+ @Override
+ public boolean subscribe(String topic, MessageSubscriber subscriber) {
+ var subscribed = tb.subscribe(topic, subscriber);
+ seb.publish(SystemEvent.CatchupRequired.withTopic(topic));
+ return subscribed;
+ }
+
+ /**
+ * Unsubscribes from events on the specified topic.
+ *
+ * @param topic The topic to unsubscribe from
+ * @param subscriber The subscriber to remove
+ * @return true if unsubscription was successful, false otherwise
+ */
+ @Override
+ public boolean unsubscribe(String topic, MessageSubscriber subscriber) {
+ return tb.unsubscribe(topic, subscriber);
+ }
+
+ /**
+ * Converts a transactional event. In this implementation, returns the event
+ * unchanged.
+ *
+ * @param m The transactional event to convert
+ * @return The same transactional event
+ */
+ @Override
+ public TransactionalEvent convert(TransactionalEvent m) {
+ return m;
+ }
+}
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
new file mode 100644
index 0000000..aa06861
--- /dev/null
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
@@ -0,0 +1,247 @@
+package com.p14n.postevent.vertx.adapter;
+
+import com.p14n.postevent.broker.AsyncExecutor;
+import com.p14n.postevent.catchup.CatchupServerInterface;
+import com.p14n.postevent.data.Event;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service that exposes CatchupServerInterface methods via Vert.x EventBus
+ * messaging.
+ * This allows remote clients to request catchup operations through the EventBus
+ * using a request-reply pattern.
+ *
+ *
+ * The service listens on specific EventBus addresses and delegates to the
+ * underlying CatchupServerInterface implementation. All requests and responses
+ * are JSON-encoded for simplicity and debugging.
+ *
+ *
+ *
+ * EventBus addresses:
+ *
+ * - {@code catchup.fetchEvents} - Fetch events within a range
+ * - {@code catchup.getLatestMessageId} - Get the latest message ID for a
+ * topic
+ *
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * CatchupServerInterface catchupServer = new CatchupServer(dataSource);
+ * EventBus eventBus = vertx.eventBus();
+ *
+ * EventBusCatchupService service = new EventBusCatchupService(catchupServer, eventBus);
+ * service.start();
+ *
+ * // Service is now listening for catchup requests on the EventBus
+ * }
+ */
+public class EventBusCatchupService implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class);
+
+ public static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events.";
+ public static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest.";
+
+ private final CatchupServerInterface catchupServer;
+ private final EventBus eventBus;
+ private List> fetchEventsConsumers;
+ private List> getLatestMessageIdConsumers;
+ private Set topics;
+ private AsyncExecutor executor;
+
+ /**
+ * Creates a new EventBusCatchupService.
+ *
+ * @param catchupServer The underlying catchup server implementation
+ * @param eventBus The Vert.x EventBus to use for messaging
+ */
+ public EventBusCatchupService(CatchupServerInterface catchupServer,
+ EventBus eventBus,
+ Set topics,
+ AsyncExecutor executor) {
+ this.catchupServer = catchupServer;
+ this.eventBus = eventBus;
+ this.topics = topics;
+ this.executor = executor;
+ }
+
+ /**
+ * Starts the service by registering EventBus consumers for catchup operations.
+ * This method sets up listeners for both fetchEvents and getLatestMessageId
+ * requests.
+ */
+ public void start() {
+ if(fetchEventsConsumers == null) {
+
+ logger.atInfo().log("Starting EventBusCatchupService");
+
+ // Register consumer for fetchEvents requests
+ fetchEventsConsumers = topics.stream().map(topic -> {
+ logger.atInfo()
+ .addArgument(FETCH_EVENTS_ADDRESS + topic)
+ .log("EventBusCatchupService started, listening on address: {}");
+
+ return eventBus.consumer(FETCH_EVENTS_ADDRESS + topic, this::handleFetchEvents);
+ }).toList();
+
+ // Register consumer for getLatestMessageId requests
+ getLatestMessageIdConsumers = topics.stream().map(topic -> {
+ logger.atInfo()
+ .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS + topic)
+ .log("EventBusCatchupService started, listening on address: {}");
+ return eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS + topic, this::handleGetLatestMessageId);
+ }).toList();
+ } else {
+ logger.atInfo().log("EventBusCatchupService already started");
+ }
+
+ }
+
+ /**
+ * Stops the service by unregistering EventBus consumers.
+ */
+ public void stop() {
+ logger.atInfo().log("Stopping EventBusCatchupService");
+
+ if (fetchEventsConsumers != null) {
+ for (var c : fetchEventsConsumers) {
+ c.unregister();
+ }
+ fetchEventsConsumers = null;
+ }
+
+ if (getLatestMessageIdConsumers != null) {
+ for (var c : getLatestMessageIdConsumers) {
+ c.unregister();
+ }
+ getLatestMessageIdConsumers = null;
+ }
+
+ logger.atInfo().log("EventBusCatchupService stopped");
+ }
+
+ /**
+ * Handles fetchEvents requests from the EventBus.
+ *
+ * Expected request format:
+ *
+ * {@code
+ * {
+ * "fromId": 100,
+ * "toId": 200,
+ * "limit": 50,
+ * "topic": "orders"
+ * }
+ * }
+ *
+ * @param message The EventBus message containing the request
+ */
+ private void handleFetchEvents(Message message) {
+ JsonObject request = message.body();
+
+ try {
+ long fromId = request.getLong("fromId");
+ long toId = request.getLong("toId");
+ int limit = request.getInteger("limit");
+ String topic = request.getString("topic");
+
+ logger.atDebug()
+ .addArgument(fromId)
+ .addArgument(toId)
+ .addArgument(limit)
+ .addArgument(topic)
+ .log("Handling fetchEvents request: fromId={}, toId={}, limit={}, topic={}");
+
+ executor.submit(() -> {
+
+ try{
+ List events = catchupServer.fetchEvents(fromId, toId, limit, topic);
+
+ // Serialize events to JSON and reply
+ String eventsJson = Json.encode(events);
+ message.reply(eventsJson);
+
+ logger.atDebug()
+ .addArgument(events.size())
+ .addArgument(topic)
+ .log("Successfully fetched {} events for topic {}", events.size(), topic);
+
+ } catch (Exception e){
+ logger.atError()
+ .setCause(e)
+ .log("Error handling fetchEvents request");
+ message.fail(500, e.getMessage());
+
+ }
+ return null;
+ });
+
+ } catch (Exception e) {
+ logger.atError()
+ .setCause(e)
+ .log("Error handling fetchEvents request");
+ message.fail(500, e.getMessage());
+ }
+ }
+
+ /**
+ * Handles getLatestMessageId requests from the EventBus.
+ *
+ * Expected request format:
+ *
+ * {@code
+ * {
+ * "topic": "orders"
+ * }
+ * }
+ *
+ * @param message The EventBus message containing the request
+ */
+ private void handleGetLatestMessageId(Message message) {
+ JsonObject request = message.body();
+
+ try {
+ String topic = request.getString("topic");
+
+ logger.atDebug()
+ .addArgument(topic)
+ .log("Handling getLatestMessageId request for topic: {}");
+
+ long latestId = catchupServer.getLatestMessageId(topic);
+
+ // Create response with latest ID
+ JsonObject response = new JsonObject().put("latestId", latestId);
+ message.reply(response);
+
+ logger.atDebug()
+ .addArgument(latestId)
+ .addArgument(topic)
+ .log("Successfully retrieved latest message ID {} for topic {}", latestId, topic);
+
+ } catch (Exception e) {
+ logger.atError()
+ .setCause(e)
+ .log("Error handling getLatestMessageId request");
+ message.fail(500, e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop();
+ }
+}
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
new file mode 100644
index 0000000..f96e068
--- /dev/null
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
@@ -0,0 +1,238 @@
+package com.p14n.postevent.vertx.adapter;
+
+import com.p14n.postevent.Publisher;
+import com.p14n.postevent.broker.AsyncExecutor;
+import com.p14n.postevent.broker.EventMessageBroker;
+import com.p14n.postevent.broker.MessageSubscriber;
+import com.p14n.postevent.vertx.codec.EventCodec;
+import com.p14n.postevent.data.Event;
+import io.opentelemetry.api.OpenTelemetry;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.MessageConsumer;
+
+import javax.sql.DataSource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A message broker implementation that bridges the core EventMessageBroker
+ * with Vert.x EventBus for reactive, asynchronous event processing.
+ *
+ *
+ * This broker provides a dual-write pattern:
+ *
+ * - Events are first persisted to the database using the existing
+ * Publisher
+ * - Events are then published to the Vert.x EventBus for real-time
+ * distribution
+ *
+ *
+ *
+ *
+ * Subscribers receive events from the EventBus, providing low-latency
+ * event delivery while maintaining persistence guarantees.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * Vertx vertx = Vertx.vertx();
+ * DataSource dataSource = // configure datasource
+ * AsyncExecutor executor = new DefaultExecutor();
+ *
+ * EventBusMessageBroker broker = new EventBusMessageBroker(
+ * vertx, dataSource, executor, OpenTelemetry.noop(), "my-broker");
+ *
+ * // Subscribe to events
+ * broker.subscribe("orders", event -> {
+ * System.out.println("Received: " + event);
+ * });
+ *
+ * // Publish events (persisted + real-time)
+ * Event event = Event.create(...);
+ * broker.publish("orders", event);
+ * }
+ */
+public class EventBusMessageBroker extends EventMessageBroker {
+ private static final Logger logger = LoggerFactory.getLogger(EventBusMessageBroker.class);
+ private final EventBus eventBus;
+ private final DataSource dataSource;
+ private final Map>> consumers = new ConcurrentHashMap<>();
+ private final AsyncExecutor executor;
+
+ /**
+ * Creates a new EventBusMessageBroker.
+ *
+ * @param eventBus The Vert.x EventBus instance to use
+ * @param dataSource The DataSource for database persistence
+ * @param executor The AsyncExecutor for handling asynchronous operations
+ * @param ot OpenTelemetry instance for observability
+ * @param name Name identifier for this broker instance
+ */
+ public EventBusMessageBroker(EventBus eventBus, DataSource dataSource, AsyncExecutor executor,
+ OpenTelemetry ot, String name) {
+ super(executor, ot, name);
+ this.eventBus = eventBus;
+ this.dataSource = dataSource;
+ this.executor = executor;
+ // Register the Event codec for EventBus serialization
+ eventBus.registerDefaultCodec(Event.class, new EventCodec());
+
+ logger.atInfo()
+ .addArgument(name)
+ .log("EventBusMessageBroker initialized: {}");
+ }
+
+ /**
+ * Publishes an event using the dual-write pattern.
+ * The event is first persisted to the database, then published to the EventBus.
+ *
+ * @param topic The topic to publish to
+ * @param event The event to publish
+ */
+ @Override
+ public void publish(String topic, Event event) {
+ logger.atDebug()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .log("Publishing event to topic {} with id {}");
+
+ try {
+
+ executor.submit(() -> {
+ try {
+ Publisher.publish(event, dataSource, topic);
+
+ // Then, publish to EventBus for real-time distribution
+ String eventBusAddress = "events." + topic;
+ eventBus.publish(eventBusAddress, event);
+
+ logger.atDebug()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .log("Successfully published event to topic {} with id {}");
+
+ } catch (Exception e) {
+ logger.atError()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .setCause(e)
+ .log("Failed to publish event to topic {} with id {}");
+
+ }
+ return null;
+ });
+
+ // First, persist to database using existing Publisher
+
+ } catch (Exception e) {
+ logger.atError()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .setCause(e)
+ .log("Failed to publish event to topic {} with id {}");
+ throw new RuntimeException("Failed to publish event", e);
+ }
+ }
+
+ /**
+ * Subscribes to events on a specific topic via the EventBus.
+ * Creates a consumer that listens to the EventBus address for the topic.
+ *
+ * @param topic The topic to subscribe to
+ * @param subscriber The subscriber that will receive events
+ */
+ public void subscribeToEventBus(String topic, MessageSubscriber subscriber) {
+ logger.atInfo()
+ .addArgument(topic)
+ .log("Subscribing to topic: {}");
+
+ String eventBusAddress = "events." + topic;
+
+ MessageConsumer consumer = eventBus.consumer(eventBusAddress);
+ consumer.handler(message -> {
+ Event event = message.body();
+ logger.atDebug()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .log("Received event on topic {} with id {}");
+
+ try {
+ executor.submit(() -> {
+ try {
+ subscriber.onMessage(event);
+ } catch (Exception e) {
+ logger.atError()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .setCause(e)
+ .log("Error processing event on topic {} with id {}");
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ logger.atError()
+ .addArgument(topic)
+ .addArgument(event.id())
+ .setCause(e)
+ .log("Error processing event on topic {} with id {}");
+ }
+ });
+
+ // Store consumer for potential cleanup
+ consumers.compute(topic, (k,l) -> {
+ if(l == null){
+ l = new ArrayList<>();
+ }
+ l.add(consumer);
+ return l;
+ });
+
+ logger.atInfo()
+ .addArgument(topic)
+ .addArgument(eventBusAddress)
+ .log("Successfully subscribed to topic {} at address {}");
+ }
+
+ /**
+ * Unsubscribes from a topic by removing the EventBus consumer.
+ *
+ * @param topic The topic to unsubscribe from
+ */
+ public void unsubscribe(String topic) {
+ List> consumerList = consumers.remove(topic);
+ if (consumerList != null) {
+ for(var consumer: consumerList){
+ consumer.unregister();
+ }
+ logger.atInfo()
+ .addArgument(topic)
+ .log("Unsubscribed from topic: {}");
+ }
+ }
+
+ /**
+ * Closes the broker and cleans up all subscriptions.
+ */
+ @Override
+ public void close() {
+ logger.atInfo().log("Closing EventBusMessageBroker");
+
+ // Unregister all consumers
+ consumers.values().forEach( l -> {
+ l.forEach(MessageConsumer::unregister);
+ });
+ consumers.clear();
+
+ super.close();
+
+ logger.atInfo().log("EventBusMessageBroker closed");
+ }
+}
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
new file mode 100644
index 0000000..8b1b1b7
--- /dev/null
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
@@ -0,0 +1,130 @@
+package com.p14n.postevent.vertx.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.p14n.postevent.catchup.CatchupServerInterface;
+import com.p14n.postevent.data.Event;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static com.p14n.postevent.vertx.adapter.EventBusCatchupService.FETCH_EVENTS_ADDRESS;
+import static com.p14n.postevent.vertx.adapter.EventBusCatchupService.GET_LATEST_MESSAGE_ID_ADDRESS;
+
+/**
+ * Client implementation of CatchupServerInterface that sends requests
+ * over the Vert.x EventBus to a remote EventBusCatchupService.
+ *
+ *
+ * This client provides a synchronous API that internally uses the
+ * asynchronous EventBus request-reply pattern. All operations have
+ * configurable timeouts to prevent indefinite blocking.
+ *
+ *
+ *
+ * The client communicates with EventBusCatchupService using JSON-encoded
+ * messages over the EventBus, making it suitable for both local and
+ * distributed deployments.
+ *
+ *
+ *
+ * Example usage:
+ *
+ *
+ * {@code
+ * EventBus eventBus = vertx.eventBus();
+ * EventBusCatchupClient client = new EventBusCatchupClient(eventBus);
+ *
+ * // Fetch events
+ * List events = client.fetchEvents(100L, 200L, 50, "orders");
+ *
+ * // Get latest message ID
+ * long latestId = client.getLatestMessageId("orders");
+ * }
+ */
+public class EventBusCatchupClient implements CatchupServerInterface {
+ private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupClient.class);
+
+ private static final long DEFAULT_TIMEOUT_SECONDS = 30;
+
+ private final EventBus eventBus;
+ private final long timeoutSeconds;
+
+ /**
+ * Creates a new EventBusCatchupClient with default timeout.
+ *
+ * @param eventBus The Vert.x EventBus to use for communication
+ */
+ public EventBusCatchupClient(EventBus eventBus) {
+ this(eventBus, DEFAULT_TIMEOUT_SECONDS);
+ }
+
+ /**
+ * Creates a new EventBusCatchupClient with custom timeout.
+ *
+ * @param eventBus The Vert.x EventBus to use for communication
+ * @param timeoutSeconds Timeout in seconds for EventBus requests
+ */
+ public EventBusCatchupClient(EventBus eventBus, long timeoutSeconds) {
+ this.eventBus = eventBus;
+ this.timeoutSeconds = timeoutSeconds;
+ }
+
+
+ private R requestAndDecode(
+ String address,
+ JsonObject payload,
+ Function