From 28f9e5af812a69385a34b86535566b5e8e2d49a4 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Tue, 16 Sep 2025 20:15:19 +0100 Subject: [PATCH 1/9] Augment - create vertx submodule --- settings.gradle | 2 +- vertx/build.gradle | 83 ++++++ .../p14n/postevent/VertxEventBusConsumer.java | 275 ++++++++++++++++++ .../adapter/EventBusCatchupService.java | 199 +++++++++++++ .../adapter/EventBusMessageBroker.java | 203 +++++++++++++ .../client/EventBusCatchupClient.java | 182 ++++++++++++ .../com/p14n/postevent/codec/EventCodec.java | 97 ++++++ .../example/VertxEventBusExample.java | 174 +++++++++++ 8 files changed, 1214 insertions(+), 1 deletion(-) create mode 100644 vertx/build.gradle create mode 100644 vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java create mode 100644 vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java create mode 100644 vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java create mode 100644 vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java create mode 100644 vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java create mode 100644 vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java diff --git a/settings.gradle b/settings.gradle index e5f8e62..04d7570 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ rootProject.name = 'postevent' -include 'core', 'debezium', 'grpc', 'app' \ No newline at end of file +include 'core', 'debezium', 'grpc', 'vertx', 'app' \ No newline at end of file diff --git a/vertx/build.gradle b/vertx/build.gradle new file mode 100644 index 0000000..f513ea6 --- /dev/null +++ b/vertx/build.gradle @@ -0,0 +1,83 @@ +plugins { + id 'java' + id 'com.adarshr.test-logger' version '4.0.0' +} + +compileJava { + sourceCompatibility = 21 + targetCompatibility = 21 +} + +group = 'com.p14n' +version = '1.0.1-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + // Dependency on core module + implementation project(':core') + + // Vert.x dependencies + implementation 'io.vertx:vertx-core:4.5.1' + + // OpenTelemetry (from core module) + implementation 'io.opentelemetry:opentelemetry-api:1.32.0' + + // Database connection pooling (for examples) + implementation 'com.zaxxer:HikariCP:5.0.1' + + // Logging + implementation 'org.slf4j:slf4j-api:2.0.9' + + // Security constraint + constraints { + implementation 'com.google.guava:guava:32.0.0-jre' + } + + // Test dependencies + testImplementation 'io.vertx:vertx-junit5:4.5.1' + testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0') + testImplementation 'io.zonky.test:embedded-postgres:2.0.7' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0' + testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11' + testImplementation 'org.mockito:mockito-core:3.12.4' +} + +testlogger { + theme 'standard' + showExceptions true + showStackTraces true + showFullStackTraces false + showCauses true + slowThreshold 2000 + showSummary true + showSimpleNames false + showPassed true + showSkipped true + showFailed true + showStandardStreams false + showPassedStandardStreams true + showSkippedStandardStreams true + showFailedStandardStreams true +} + +test { + useJUnitPlatform() + maxHeapSize = "1G" + minHeapSize = "512M" + maxParallelForks = 1 + failFast = true + testLogging.showStandardStreams = true +} + +jar { + manifest { + } +} + +tasks.withType(Jar) { + duplicatesStrategy = DuplicatesStrategy.EXCLUDE +} diff --git a/vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java b/vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java new file mode 100644 index 0000000..9e5720c --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java @@ -0,0 +1,275 @@ +package com.p14n.postevent; + +import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.broker.MessageSubscriber; +import com.p14n.postevent.broker.SystemEventBroker; +import com.p14n.postevent.catchup.CatchupService; +import com.p14n.postevent.client.EventBusCatchupClient; +import com.p14n.postevent.codec.EventCodec; +import com.p14n.postevent.data.ConfigData; +import com.p14n.postevent.data.Event; +import io.opentelemetry.api.OpenTelemetry; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; + +import javax.sql.DataSource; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main consumer class that provides event consumption capabilities using Vert.x + * EventBus. + * This consumer combines real-time event delivery via EventBus with catchup + * capabilities + * for guaranteed event processing. + * + *

+ * The consumer provides: + *

+ *

+ * + *

+ * Architecture: + * + *

+ * Events → Database → EventBus → VertxEventBusConsumer → Subscriber
+ *                        ↑
+ *                   CatchupService (for missed events)
+ * 
+ *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * // Create configuration
+ * ConfigData config = new ConfigData("myapp", Set.of("orders"), ...);
+ * 
+ * // Create consumer
+ * VertxEventBusConsumer consumer = new VertxEventBusConsumer(config, OpenTelemetry.noop(), 30);
+ * 
+ * // Subscribe to events
+ * consumer.subscribe("orders", event -> {
+ *     System.out.println("Received: " + event);
+ * });
+ * 
+ * // Start consuming
+ * consumer.start(Set.of("orders"), dataSource);
+ * }
+ */ +public class VertxEventBusConsumer implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(VertxEventBusConsumer.class); + + private final Vertx vertx; + private final EventBus eventBus; + private final ConfigData config; + private final OpenTelemetry openTelemetry; + private final int catchupIntervalSeconds; + private final Map> consumers = new ConcurrentHashMap<>(); + + private CatchupService catchupService; + private EventBusMessageBroker messageBroker; + private boolean started = false; + + /** + * Creates a new VertxEventBusConsumer. + * + * @param config Configuration data for the consumer + * @param openTelemetry OpenTelemetry instance for observability + * @param catchupIntervalSeconds Interval in seconds for catchup operations + */ + public VertxEventBusConsumer(ConfigData config, OpenTelemetry openTelemetry, int catchupIntervalSeconds) { + this.config = config; + this.openTelemetry = openTelemetry; + this.catchupIntervalSeconds = catchupIntervalSeconds; + this.vertx = Vertx.vertx(); + this.eventBus = vertx.eventBus(); + + // Register Event codec for EventBus serialization + eventBus.registerDefaultCodec(Event.class, new EventCodec()); + + logger.atInfo() + .addArgument(config.affinity()) + .addArgument(catchupIntervalSeconds) + .log("VertxEventBusConsumer created for {} with catchup interval {}s"); + } + + /** + * Starts the consumer with the specified topics and data source. + * This initializes the catchup service and prepares for event consumption. + * + * @param topics The set of topics to consume from + * @param dataSource The DataSource for database operations + * @throws Exception If startup fails + */ + public void start(Set topics, DataSource dataSource) throws Exception { + if (started) { + throw new IllegalStateException("Consumer is already started"); + } + + logger.atInfo() + .addArgument(topics) + .log("Starting VertxEventBusConsumer for topics: {}"); + + try { + // Create EventBus-based catchup client + EventBusCatchupClient catchupClient = new EventBusCatchupClient(eventBus); + + // Create system event broker for catchup coordination + SystemEventBroker systemEventBroker = new SystemEventBroker(openTelemetry); + + // Initialize catchup service + catchupService = new CatchupService(dataSource, catchupClient, systemEventBroker); + + started = true; + + logger.atInfo() + .addArgument(topics) + .log("VertxEventBusConsumer started successfully for topics: {}"); + + } catch (Exception e) { + logger.atError() + .setCause(e) + .log("Failed to start VertxEventBusConsumer"); + throw e; + } + } + + /** + * Subscribes to events on a specific topic via the EventBus. + * Creates an EventBus consumer that listens for events on the topic. + * + * @param topic The topic to subscribe to + * @param subscriber The subscriber that will receive events + */ + public void subscribe(String topic, MessageSubscriber subscriber) { + if (!started) { + throw new IllegalStateException("Consumer must be started before subscribing"); + } + + logger.atInfo() + .addArgument(topic) + .log("Subscribing to topic: {}"); + + String eventBusAddress = "events." + topic; + + MessageConsumer consumer = eventBus.consumer(eventBusAddress); + consumer.handler(message -> { + Event event = message.body(); + logger.atDebug() + .addArgument(topic) + .addArgument(event.id()) + .log("Received event on topic {} with id {}"); + + try { + subscriber.onMessage(event); + } catch (Exception e) { + logger.atError() + .addArgument(topic) + .addArgument(event.id()) + .setCause(e) + .log("Error processing event on topic {} with id {}"); + } + }); + + // Store consumer for cleanup + consumers.put(topic, consumer); + + logger.atInfo() + .addArgument(topic) + .addArgument(eventBusAddress) + .log("Successfully subscribed to topic {} at address {}"); + } + + /** + * Unsubscribes from a specific topic. + * + * @param topic The topic to unsubscribe from + */ + public void unsubscribe(String topic) { + MessageConsumer consumer = consumers.remove(topic); + if (consumer != null) { + consumer.unregister(); + logger.atInfo() + .addArgument(topic) + .log("Unsubscribed from topic: {}"); + } + } + + /** + * Gets the Vert.x instance used by this consumer. + * This can be useful for advanced integrations. + * + * @return The Vert.x instance + */ + public Vertx getVertx() { + return vertx; + } + + /** + * Gets the EventBus instance used by this consumer. + * This can be useful for advanced integrations. + * + * @return The EventBus instance + */ + public EventBus getEventBus() { + return eventBus; + } + + /** + * Checks if the consumer is currently started. + * + * @return true if started, false otherwise + */ + public boolean isStarted() { + return started; + } + + /** + * Closes the consumer and cleans up all resources. + * This stops the catchup service, unregisters all consumers, and closes Vert.x. + */ + @Override + public void close() { + logger.atInfo().log("Closing VertxEventBusConsumer"); + + // Unregister all EventBus consumers + consumers.values().forEach(MessageConsumer::unregister); + consumers.clear(); + + // Stop catchup service + // Note: CatchupService doesn't need explicit closing + // It's managed by the SystemEventBroker + + // Close message broker if created + if (messageBroker != null) { + try { + messageBroker.close(); + } catch (Exception e) { + logger.atWarn() + .setCause(e) + .log("Error closing message broker"); + } + } + + // Close Vert.x + if (vertx != null) { + vertx.close(); + } + + started = false; + + logger.atInfo().log("VertxEventBusConsumer closed"); + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java new file mode 100644 index 0000000..a9a0c83 --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java @@ -0,0 +1,199 @@ +package com.p14n.postevent.adapter; + +import com.p14n.postevent.catchup.CatchupServerInterface; +import com.p14n.postevent.data.Event; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service that exposes CatchupServerInterface methods via Vert.x EventBus messaging. + * This allows remote clients to request catchup operations through the EventBus + * using a request-reply pattern. + * + *

+ * The service listens on specific EventBus addresses and delegates to the + * underlying CatchupServerInterface implementation. All requests and responses + * are JSON-encoded for simplicity and debugging. + *

+ * + *

+ * EventBus addresses: + *

    + *
  • {@code catchup.fetchEvents} - Fetch events within a range
  • + *
  • {@code catchup.getLatestMessageId} - Get the latest message ID for a topic
  • + *
+ *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * CatchupServerInterface catchupServer = new CatchupServer(dataSource);
+ * EventBus eventBus = vertx.eventBus();
+ * 
+ * EventBusCatchupService service = new EventBusCatchupService(catchupServer, eventBus);
+ * service.start();
+ * 
+ * // Service is now listening for catchup requests on the EventBus
+ * }
+ */ +public class EventBusCatchupService { + private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class); + + private static final String FETCH_EVENTS_ADDRESS = "catchup.fetchEvents"; + private static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.getLatestMessageId"; + + private final CatchupServerInterface catchupServer; + private final EventBus eventBus; + private MessageConsumer fetchEventsConsumer; + private MessageConsumer getLatestMessageIdConsumer; + + /** + * Creates a new EventBusCatchupService. + * + * @param catchupServer The underlying catchup server implementation + * @param eventBus The Vert.x EventBus to use for messaging + */ + public EventBusCatchupService(CatchupServerInterface catchupServer, EventBus eventBus) { + this.catchupServer = catchupServer; + this.eventBus = eventBus; + } + + /** + * Starts the service by registering EventBus consumers for catchup operations. + * This method sets up listeners for both fetchEvents and getLatestMessageId requests. + */ + public void start() { + logger.atInfo().log("Starting EventBusCatchupService"); + + // Register consumer for fetchEvents requests + fetchEventsConsumer = eventBus.consumer(FETCH_EVENTS_ADDRESS, this::handleFetchEvents); + + // Register consumer for getLatestMessageId requests + getLatestMessageIdConsumer = eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS, this::handleGetLatestMessageId); + + logger.atInfo() + .addArgument(FETCH_EVENTS_ADDRESS) + .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS) + .log("EventBusCatchupService started, listening on addresses: {} and {}"); + } + + /** + * Stops the service by unregistering EventBus consumers. + */ + public void stop() { + logger.atInfo().log("Stopping EventBusCatchupService"); + + if (fetchEventsConsumer != null) { + fetchEventsConsumer.unregister(); + fetchEventsConsumer = null; + } + + if (getLatestMessageIdConsumer != null) { + getLatestMessageIdConsumer.unregister(); + getLatestMessageIdConsumer = null; + } + + logger.atInfo().log("EventBusCatchupService stopped"); + } + + /** + * Handles fetchEvents requests from the EventBus. + * + * Expected request format: + *
{@code
+     * {
+     *   "fromId": 100,
+     *   "toId": 200,
+     *   "limit": 50,
+     *   "topic": "orders"
+     * }
+     * }
+ * + * @param message The EventBus message containing the request + */ + private void handleFetchEvents(Message message) { + JsonObject request = message.body(); + + try { + long fromId = request.getLong("fromId"); + long toId = request.getLong("toId"); + int limit = request.getInteger("limit"); + String topic = request.getString("topic"); + + logger.atDebug() + .addArgument(fromId) + .addArgument(toId) + .addArgument(limit) + .addArgument(topic) + .log("Handling fetchEvents request: fromId={}, toId={}, limit={}, topic={}"); + + List events = catchupServer.fetchEvents(fromId, toId, limit, topic); + + // Serialize events to JSON and reply + String eventsJson = Json.encode(events); + message.reply(eventsJson); + + logger.atDebug() + .addArgument(events.size()) + .addArgument(topic) + .log("Successfully fetched {} events for topic {}", events.size(), topic); + + } catch (Exception e) { + logger.atError() + .setCause(e) + .log("Error handling fetchEvents request"); + message.fail(500, e.getMessage()); + } + } + + /** + * Handles getLatestMessageId requests from the EventBus. + * + * Expected request format: + *
{@code
+     * {
+     *   "topic": "orders"
+     * }
+     * }
+ * + * @param message The EventBus message containing the request + */ + private void handleGetLatestMessageId(Message message) { + JsonObject request = message.body(); + + try { + String topic = request.getString("topic"); + + logger.atDebug() + .addArgument(topic) + .log("Handling getLatestMessageId request for topic: {}"); + + long latestId = catchupServer.getLatestMessageId(topic); + + // Create response with latest ID + JsonObject response = new JsonObject().put("latestId", latestId); + message.reply(response); + + logger.atDebug() + .addArgument(latestId) + .addArgument(topic) + .log("Successfully retrieved latest message ID {} for topic {}", latestId, topic); + + } catch (Exception e) { + logger.atError() + .setCause(e) + .log("Error handling getLatestMessageId request"); + message.fail(500, e.getMessage()); + } + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java new file mode 100644 index 0000000..b0aaf70 --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java @@ -0,0 +1,203 @@ +package com.p14n.postevent.adapter; + +import com.p14n.postevent.Publisher; +import com.p14n.postevent.broker.AsyncExecutor; +import com.p14n.postevent.broker.EventMessageBroker; +import com.p14n.postevent.broker.MessageSubscriber; +import com.p14n.postevent.codec.EventCodec; +import com.p14n.postevent.data.Event; +import io.opentelemetry.api.OpenTelemetry; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; + +import javax.sql.DataSource; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A message broker implementation that bridges the core EventMessageBroker + * with Vert.x EventBus for reactive, asynchronous event processing. + * + *

+ * This broker provides a dual-write pattern: + *

    + *
  1. Events are first persisted to the database using the existing + * Publisher
  2. + *
  3. Events are then published to the Vert.x EventBus for real-time + * distribution
  4. + *
+ *

+ * + *

+ * Subscribers receive events from the EventBus, providing low-latency + * event delivery while maintaining persistence guarantees. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * Vertx vertx = Vertx.vertx();
+ * DataSource dataSource = // configure datasource
+ * AsyncExecutor executor = new DefaultExecutor();
+ * 
+ * EventBusMessageBroker broker = new EventBusMessageBroker(
+ *     vertx, dataSource, executor, OpenTelemetry.noop(), "my-broker");
+ * 
+ * // Subscribe to events
+ * broker.subscribe("orders", event -> {
+ *     System.out.println("Received: " + event);
+ * });
+ * 
+ * // Publish events (persisted + real-time)
+ * Event event = Event.create(...);
+ * broker.publish("orders", event);
+ * }
+ */ +public class EventBusMessageBroker extends EventMessageBroker { + private static final Logger logger = LoggerFactory.getLogger(EventBusMessageBroker.class); + + private final Vertx vertx; + private final EventBus eventBus; + private final DataSource dataSource; + private final Map> consumers = new ConcurrentHashMap<>(); + + /** + * Creates a new EventBusMessageBroker. + * + * @param vertx The Vert.x instance to use + * @param dataSource The DataSource for database persistence + * @param executor The AsyncExecutor for handling asynchronous operations + * @param ot OpenTelemetry instance for observability + * @param name Name identifier for this broker instance + */ + public EventBusMessageBroker(Vertx vertx, DataSource dataSource, AsyncExecutor executor, + OpenTelemetry ot, String name) { + super(executor, ot, name); + this.vertx = vertx; + this.eventBus = vertx.eventBus(); + this.dataSource = dataSource; + + // Register the Event codec for EventBus serialization + eventBus.registerDefaultCodec(Event.class, new EventCodec()); + + logger.atInfo() + .addArgument(name) + .log("EventBusMessageBroker initialized: {}"); + } + + /** + * Publishes an event using the dual-write pattern. + * The event is first persisted to the database, then published to the EventBus. + * + * @param topic The topic to publish to + * @param event The event to publish + */ + @Override + public void publish(String topic, Event event) { + logger.atDebug() + .addArgument(topic) + .addArgument(event.id()) + .log("Publishing event to topic {} with id {}"); + + try { + // First, persist to database using existing Publisher + Publisher.publish(event, dataSource, topic); + + // Then, publish to EventBus for real-time distribution + String eventBusAddress = "events." + topic; + eventBus.publish(eventBusAddress, event); + + logger.atDebug() + .addArgument(topic) + .addArgument(event.id()) + .log("Successfully published event to topic {} with id {}"); + + } catch (Exception e) { + logger.atError() + .addArgument(topic) + .addArgument(event.id()) + .setCause(e) + .log("Failed to publish event to topic {} with id {}"); + throw new RuntimeException("Failed to publish event", e); + } + } + + /** + * Subscribes to events on a specific topic via the EventBus. + * Creates a consumer that listens to the EventBus address for the topic. + * + * @param topic The topic to subscribe to + * @param subscriber The subscriber that will receive events + */ + public void subscribeToEventBus(String topic, MessageSubscriber subscriber) { + logger.atInfo() + .addArgument(topic) + .log("Subscribing to topic: {}"); + + String eventBusAddress = "events." + topic; + + MessageConsumer consumer = eventBus.consumer(eventBusAddress); + consumer.handler(message -> { + Event event = message.body(); + logger.atDebug() + .addArgument(topic) + .addArgument(event.id()) + .log("Received event on topic {} with id {}"); + + try { + subscriber.onMessage(event); + } catch (Exception e) { + logger.atError() + .addArgument(topic) + .addArgument(event.id()) + .setCause(e) + .log("Error processing event on topic {} with id {}"); + } + }); + + // Store consumer for potential cleanup + consumers.put(topic, consumer); + + logger.atInfo() + .addArgument(topic) + .addArgument(eventBusAddress) + .log("Successfully subscribed to topic {} at address {}"); + } + + /** + * Unsubscribes from a topic by removing the EventBus consumer. + * + * @param topic The topic to unsubscribe from + */ + public void unsubscribe(String topic) { + MessageConsumer consumer = consumers.remove(topic); + if (consumer != null) { + consumer.unregister(); + logger.atInfo() + .addArgument(topic) + .log("Unsubscribed from topic: {}"); + } + } + + /** + * Closes the broker and cleans up all subscriptions. + */ + @Override + public void close() { + logger.atInfo().log("Closing EventBusMessageBroker"); + + // Unregister all consumers + consumers.values().forEach(MessageConsumer::unregister); + consumers.clear(); + + super.close(); + + logger.atInfo().log("EventBusMessageBroker closed"); + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java b/vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java new file mode 100644 index 0000000..8e1839e --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java @@ -0,0 +1,182 @@ +package com.p14n.postevent.client; + +import com.p14n.postevent.catchup.CatchupServerInterface; +import com.p14n.postevent.data.Event; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client implementation of CatchupServerInterface that sends requests + * over the Vert.x EventBus to a remote EventBusCatchupService. + * + *

+ * This client provides a synchronous API that internally uses the + * asynchronous EventBus request-reply pattern. All operations have + * configurable timeouts to prevent indefinite blocking. + *

+ * + *

+ * The client communicates with EventBusCatchupService using JSON-encoded + * messages over the EventBus, making it suitable for both local and + * distributed deployments. + *

+ * + *

+ * Example usage: + *

+ * + *
{@code
+ * EventBus eventBus = vertx.eventBus();
+ * EventBusCatchupClient client = new EventBusCatchupClient(eventBus);
+ * 
+ * // Fetch events
+ * List events = client.fetchEvents(100L, 200L, 50, "orders");
+ * 
+ * // Get latest message ID
+ * long latestId = client.getLatestMessageId("orders");
+ * }
+ */ +public class EventBusCatchupClient implements CatchupServerInterface { + private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupClient.class); + + private static final String FETCH_EVENTS_ADDRESS = "catchup.fetchEvents"; + private static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.getLatestMessageId"; + private static final long DEFAULT_TIMEOUT_SECONDS = 30; + + private final EventBus eventBus; + private final long timeoutSeconds; + + /** + * Creates a new EventBusCatchupClient with default timeout. + * + * @param eventBus The Vert.x EventBus to use for communication + */ + public EventBusCatchupClient(EventBus eventBus) { + this(eventBus, DEFAULT_TIMEOUT_SECONDS); + } + + /** + * Creates a new EventBusCatchupClient with custom timeout. + * + * @param eventBus The Vert.x EventBus to use for communication + * @param timeoutSeconds Timeout in seconds for EventBus requests + */ + public EventBusCatchupClient(EventBus eventBus, long timeoutSeconds) { + this.eventBus = eventBus; + this.timeoutSeconds = timeoutSeconds; + } + + /** + * Fetches events within the specified ID range for a topic. + * Sends a request to the EventBusCatchupService and waits for the response. + * + * @param fromId The starting event ID (inclusive) + * @param toId The ending event ID (inclusive) + * @param limit Maximum number of events to return + * @param topic The topic to fetch events from + * @return List of events within the specified range + * @throws Exception If the request fails or times out + */ + @Override + public List fetchEvents(long fromId, long toId, int limit, String topic) { + logger.atDebug() + .addArgument(fromId) + .addArgument(toId) + .addArgument(limit) + .addArgument(topic) + .log("Fetching events: fromId={}, toId={}, limit={}, topic={}"); + + JsonObject request = new JsonObject() + .put("fromId", fromId) + .put("toId", toId) + .put("limit", limit) + .put("topic", topic); + + try { + CompletableFuture future = new CompletableFuture<>(); + + eventBus.request(FETCH_EVENTS_ADDRESS, request, reply -> { + if (reply.succeeded()) { + String eventsJson = (String) reply.result().body(); + future.complete(eventsJson); + } else { + future.completeExceptionally(new RuntimeException( + "Failed to fetch events: " + reply.cause().getMessage(), reply.cause())); + } + }); + + String eventsJson = future.get(timeoutSeconds, TimeUnit.SECONDS); + List events = Json.decodeValue(eventsJson, List.class); + + logger.atDebug() + .addArgument(events.size()) + .addArgument(topic) + .log("Successfully fetched {} events for topic {}", events.size(), topic); + + return events; + + } catch (Exception e) { + logger.atError() + .addArgument(topic) + .setCause(e) + .log("Error fetching events for topic {}", topic); + throw new RuntimeException("Failed to fetch events for topic " + topic, e); + } + } + + /** + * Gets the latest message ID for a specific topic. + * Sends a request to the EventBusCatchupService and waits for the response. + * + * @param topic The topic to get the latest message ID for + * @return The latest message ID for the topic + * @throws Exception If the request fails or times out + */ + @Override + public long getLatestMessageId(String topic) { + logger.atDebug() + .addArgument(topic) + .log("Getting latest message ID for topic: {}"); + + JsonObject request = new JsonObject().put("topic", topic); + + try { + CompletableFuture future = new CompletableFuture<>(); + + eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS, request, reply -> { + if (reply.succeeded()) { + JsonObject response = (JsonObject) reply.result().body(); + future.complete(response); + } else { + future.completeExceptionally(new RuntimeException( + "Failed to get latest message ID: " + reply.cause().getMessage(), reply.cause())); + } + }); + + JsonObject response = future.get(timeoutSeconds, TimeUnit.SECONDS); + long latestId = response.getLong("latestId"); + + logger.atDebug() + .addArgument(latestId) + .addArgument(topic) + .log("Successfully retrieved latest message ID {} for topic {}", latestId, topic); + + return latestId; + + } catch (Exception e) { + logger.atError() + .addArgument(topic) + .setCause(e) + .log("Error getting latest message ID for topic {}", topic); + throw new RuntimeException("Failed to get latest message ID for topic " + topic, e); + } + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java b/vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java new file mode 100644 index 0000000..2e7af58 --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java @@ -0,0 +1,97 @@ +package com.p14n.postevent.codec; + +import com.p14n.postevent.data.Event; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import io.vertx.core.json.Json; + +/** + * MessageCodec for serializing Event objects on the Vert.x EventBus. + * This codec handles the conversion between Event objects and their wire format + * for transmission over the EventBus. + * + *

+ * The codec uses JSON serialization for simplicity and debugging ease. + * Events are encoded as JSON strings with a length prefix for efficient parsing. + *

+ * + *

+ * Wire format: + * [4 bytes: length][JSON string] + *

+ */ +public class EventCodec implements MessageCodec { + + /** + * Encodes an Event object to the wire format. + * The event is serialized to JSON and prefixed with its length. + * + * @param buffer The buffer to write the encoded event to + * @param event The event to encode + */ + @Override + public void encodeToWire(Buffer buffer, Event event) { + String json = Json.encode(event); + byte[] jsonBytes = json.getBytes(); + + // Write length prefix followed by JSON bytes + buffer.appendInt(jsonBytes.length); + buffer.appendBytes(jsonBytes); + } + + /** + * Decodes an Event object from the wire format. + * Reads the length prefix and then deserializes the JSON string. + * + * @param pos The position in the buffer to start reading from + * @param buffer The buffer containing the encoded event + * @return The decoded Event object + */ + @Override + public Event decodeFromWire(int pos, Buffer buffer) { + // Read length prefix + int length = buffer.getInt(pos); + + // Read JSON bytes and convert to string + byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length); + String json = new String(jsonBytes); + + // Deserialize from JSON + return Json.decodeValue(json, Event.class); + } + + /** + * Transform method for local delivery. + * Since we're using the same type for both send and receive, + * no transformation is needed. + * + * @param event The event to transform + * @return The same event (no transformation) + */ + @Override + public Event transform(Event event) { + return event; // No transformation needed for local delivery + } + + /** + * Returns the name of this codec. + * Used by Vert.x for codec registration and identification. + * + * @return The codec name + */ + @Override + public String name() { + return "postevent-event"; + } + + /** + * Returns the system codec ID. + * Since this is a user-defined codec, we return -1. + * + * @return -1 to indicate this is a user codec + */ + @Override + public byte systemCodecID() { + return -1; // User-defined codec + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java b/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java new file mode 100644 index 0000000..03ff122 --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java @@ -0,0 +1,174 @@ +package com.p14n.postevent.example; + +import com.p14n.postevent.VertxEventBusConsumer; +import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.adapter.EventBusCatchupService; +import com.p14n.postevent.broker.DefaultExecutor; +import com.p14n.postevent.catchup.CatchupServer; +import com.p14n.postevent.data.ConfigData; +import com.p14n.postevent.data.Event; +import com.p14n.postevent.db.DatabaseSetup; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.opentelemetry.api.OpenTelemetry; +import io.vertx.core.Vertx; + +import javax.sql.DataSource; +import java.time.Instant; +import java.util.Set; + +/** + * Example demonstrating how to use the Vert.x EventBus module for event + * processing. + * + * This example shows: + * 1. Setting up a server with EventBus message broker and catchup service + * 2. Creating a consumer that receives events via EventBus + * 3. Publishing events that are persisted and distributed via EventBus + * 4. Automatic catchup for guaranteed event delivery + */ +public class VertxEventBusExample { + + public static void main(String[] args) throws Exception { + // Setup database + DataSource dataSource = createDataSource(); + + // Configuration + ConfigData config = new ConfigData( + "vertx-example", + Set.of("orders", "payments"), + "localhost", + 5432, + "postevent", + "postgres", + "password"); + + // Start server components + VertxEventBusServer server = new VertxEventBusServer(config, dataSource); + server.start(); + + // Start consumer + VertxEventBusConsumer consumer = new VertxEventBusConsumer(config, OpenTelemetry.noop(), 30); + consumer.start(Set.of("orders", "payments"), dataSource); + + // Subscribe to events + consumer.subscribe("orders", event -> { + System.out.println("📦 Order Event: " + event.id() + " - " + event.type()); + }); + + consumer.subscribe("payments", event -> { + System.out.println("💳 Payment Event: " + event.id() + " - " + event.type()); + }); + + // Publish some test events + publishTestEvents(server.getMessageBroker()); + + // Keep running for demonstration + System.out.println("🚀 Vert.x EventBus example running..."); + System.out.println("📡 Events are being published and consumed via EventBus"); + System.out.println("🔄 Catchup service ensures no events are missed"); + System.out.println("Press Ctrl+C to stop"); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("\n🛑 Shutting down..."); + try { + consumer.close(); + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + + // Keep main thread alive + Thread.currentThread().join(); + } + + private static void publishTestEvents(EventBusMessageBroker broker) { + // Publish order events + for (int i = 1; i <= 5; i++) { + Event orderEvent = Event.create( + "order-" + i, + "order-service", + "order.created", + "application/json", + null, + "orders", + ("Order " + i + " data").getBytes(), + null); + broker.publish("orders", orderEvent); + } + + // Publish payment events + for (int i = 1; i <= 3; i++) { + Event paymentEvent = Event.create( + "payment-" + i, + "payment-service", + "payment.processed", + "application/json", + null, + "payments", + ("Payment " + i + " data").getBytes(), + null); + broker.publish("payments", paymentEvent); + } + } + + private static DataSource createDataSource() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl("jdbc:postgresql://localhost:5432/postevent"); + config.setUsername("postgres"); + config.setPassword("password"); + config.setMaximumPoolSize(10); + return new HikariDataSource(config); + } + + /** + * Server component that sets up EventBus message broker and catchup service. + */ + public static class VertxEventBusServer { + private final ConfigData config; + private final DataSource dataSource; + private final Vertx vertx; + + private EventBusMessageBroker messageBroker; + private EventBusCatchupService catchupService; + + public VertxEventBusServer(ConfigData config, DataSource dataSource) { + this.config = config; + this.dataSource = dataSource; + this.vertx = Vertx.vertx(); + } + + public void start() throws Exception { + // Create message broker that publishes to DB + EventBus + DefaultExecutor executor = new DefaultExecutor(2); + messageBroker = new EventBusMessageBroker( + vertx, dataSource, executor, OpenTelemetry.noop(), "vertx-server"); + + // Create catchup service for EventBus requests + CatchupServer catchupServer = new CatchupServer(dataSource); + catchupService = new EventBusCatchupService(catchupServer, vertx.eventBus()); + catchupService.start(); + + System.out.println("🌐 Vert.x EventBus server started"); + } + + public void stop() throws Exception { + if (catchupService != null) { + catchupService.stop(); + } + if (messageBroker != null) { + messageBroker.close(); + } + if (vertx != null) { + vertx.close(); + } + System.out.println("🛑 Vert.x EventBus server stopped"); + } + + public EventBusMessageBroker getMessageBroker() { + return messageBroker; + } + } +} From 3979e342b3f85af0e012bc56d75b229d1392275d Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Wed, 17 Sep 2025 22:29:34 +0100 Subject: [PATCH 2/9] Create new example --- build.gradle.ref | 178 ++++++++++++++++++ .../com/p14n/postevent/db/DatabaseSetup.java | 27 ++- vertx/README.md | 0 .../p14n/postevent/VertxConsumerServer.java | 44 +++++ .../postevent/VertxPersistentConsumer.java | 161 ++++++++++++++++ .../adapter/EventBusCatchupService.java | 55 ++++-- .../adapter/EventBusMessageBroker.java | 10 +- .../example/VertxConsumerExample.java | 42 +++++ .../example/VertxEventBusExample.java | 4 +- 9 files changed, 493 insertions(+), 28 deletions(-) create mode 100644 build.gradle.ref create mode 100644 vertx/README.md create mode 100644 vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java create mode 100644 vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java create mode 100644 vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java diff --git a/build.gradle.ref b/build.gradle.ref new file mode 100644 index 0000000..e511de3 --- /dev/null +++ b/build.gradle.ref @@ -0,0 +1,178 @@ +import com.vanniktech.maven.publish.SonatypeHost +import com.vanniktech.maven.publish.JavaLibrary +import com.vanniktech.maven.publish.JavadocJar + +plugins { + id 'java' + id 'com.adarshr.test-logger' version '4.0.0' + id 'com.google.protobuf' version '0.9.2' + id "com.vanniktech.maven.publish" version "0.31.0" +} + +compileJava { + sourceCompatibility = 21 + targetCompatibility = 21 +} + +group = 'com.p14n' +version = '1.0.1-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + implementation 'io.debezium:debezium-api:3.0.1.Final' + implementation ('io.debezium:debezium-embedded:3.0.1.Final') { + exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' + exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2' + exclude group: 'org.eclipse.jetty' + } + implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final' + implementation 'io.debezium:debezium-storage-jdbc:3.0.1.Final' + implementation 'org.slf4j:slf4j-api:2.0.9' + implementation 'com.zaxxer:HikariCP:6.2.1' + + constraints { + implementation 'com.google.guava:guava:32.0.0-jre' + } + + // gRPC dependencies + implementation 'io.grpc:grpc-netty-shaded:1.53.0' + implementation 'io.grpc:grpc-protobuf:1.53.0' + implementation 'io.grpc:grpc-stub:1.53.0' + implementation 'io.grpc:grpc-api:1.53.0' + + implementation 'javax.annotation:javax.annotation-api:1.3.2' + + // For code generation + implementation 'com.google.protobuf:protobuf-java:3.21.7' + + testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0') + testImplementation 'io.zonky.test:embedded-postgres:2.0.7' + testImplementation 'net.jqwik:jqwik:1.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0' + testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11' + testImplementation 'org.mockito:mockito-core:3.12.4' + + // OpenTelemetry core dependencies + implementation 'io.opentelemetry:opentelemetry-api:1.32.0' + + // Instrumentation for GRPC + implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha' + +} + +testlogger { + theme 'standard' + showExceptions true + showStackTraces true + showFullStackTraces false + showCauses true + slowThreshold 2000 + showSummary true + showSimpleNames false + showPassed true + showSkipped true + showFailed true + showStandardStreams false + showPassedStandardStreams true + showSkippedStandardStreams true + showFailedStandardStreams true +} + +test { + useJUnitPlatform { + includeEngines 'jqwik', 'junit-jupiter' + } + maxHeapSize = "1G" + minHeapSize = "512M" + maxParallelForks = 1 + failFast = true + testLogging.showStandardStreams = true +} + +task fastTest( type: Test ) { + useJUnitPlatform { + includeEngines 'junit-jupiter' + exclude '**/dst/**' + + } +} + +jar { + manifest { + } +} + +// Configure Protobuf plugin +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc:3.21.7' + } + plugins { + grpc { + artifact = 'io.grpc:protoc-gen-grpc-java:1.53.0' + } + } + generateProtoTasks { + all()*.plugins { + grpc {} + } + } +} + +sourceSets { + main { + java { + srcDirs 'build/generated/source/proto/main/grpc' + srcDirs 'build/generated/source/proto/main/java' + } + } +} + +tasks.withType(Jar) { + duplicatesStrategy = DuplicatesStrategy.EXCLUDE +} + +javadoc { + exclude "**/grpc/**" + source = sourceSets.main.allJava +} + +mavenPublishing { + + configure(new JavaLibrary(new JavadocJar.Javadoc(), true)) + + publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true) + + signAllPublications() + + coordinates("com.p14n", "postevent", version) + + pom { + name = "Postevent" + description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC' + inceptionYear = "2025" + url = "https://github.com/p14n/postevent/" + licenses { + license { + name = 'MIT License' + url = 'https://opensource.org/licenses/MIT' + } + } + developers { + developer { + id = 'p14n' + name = 'Dean Chapman' + email = 'dean@p14n.com' + } + } + scm { + connection = 'scm:git:git://github.com/p14n/postevent.git' + developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git' + url = 'https://github.com/p14n/postevent' + } + } +} \ No newline at end of file diff --git a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java index d74e662..1ed10ff 100644 --- a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java +++ b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java @@ -53,6 +53,8 @@ public class DatabaseSetup { private final String username; private final String password; + private final DataSource ds; + /** * Creates a new DatabaseSetup instance using configuration from * PostEventConfig. @@ -74,6 +76,13 @@ public DatabaseSetup(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; + this.ds = null; + } + public DatabaseSetup(DataSource ds) { + this.jdbcUrl = null; + this.username = null; + this.password = null; + this.ds = ds; } /** @@ -85,11 +94,24 @@ public DatabaseSetup(String jdbcUrl, String username, String password) { * @throws RuntimeException if database operations fail */ public DatabaseSetup setupAll(Set topics) { + setupClient(); + setupServer(topics); + setupDebezium(); + return this; + } + public DatabaseSetup setupDebezium() { + clearOldSlots(); + return this; + } + public DatabaseSetup setupServer(Set topics) { + createSchemaIfNotExists(); + topics.stream().forEach(this::createTableIfNotExists); + return this; + } + public DatabaseSetup setupClient() { createSchemaIfNotExists(); createMessagesTableIfNotExists(); createContiguousHwmTableIfNotExists(); - topics.stream().forEach(this::createTableIfNotExists); - clearOldSlots(); return this; } @@ -287,6 +309,7 @@ topic_name VARCHAR(255) PRIMARY KEY, * @throws SQLException if connection fails */ private Connection getConnection() throws SQLException { + if(ds!=null) return ds.getConnection(); return DriverManager.getConnection(jdbcUrl, username, password); } diff --git a/vertx/README.md b/vertx/README.md new file mode 100644 index 0000000..e69de29 diff --git a/vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java new file mode 100644 index 0000000..bd20d61 --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java @@ -0,0 +1,44 @@ +package com.p14n.postevent; + +import com.p14n.postevent.adapter.EventBusCatchupService; +import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.broker.AsyncExecutor; +import com.p14n.postevent.catchup.CatchupServer; +import com.p14n.postevent.db.DatabaseSetup; +import io.opentelemetry.api.OpenTelemetry; +import io.vertx.core.eventbus.EventBus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class VertxConsumerServer { + private static final Logger logger = LoggerFactory.getLogger(VertxConsumerServer.class); + + private DataSource ds; + //private ConfigData cfg; + private List closeables; + private AsyncExecutor asyncExecutor; + OpenTelemetry ot; + + public VertxConsumerServer(DataSource ds, AsyncExecutor asyncExecutor, OpenTelemetry ot) { + this.ds = ds; + this.asyncExecutor = asyncExecutor; + this.ot = ot; + } + + public void start(EventBus eb, EventBusMessageBroker mb, Set topics) throws IOException, InterruptedException { + logger.atInfo().log("Starting consumer server"); + + var db = new DatabaseSetup(ds); + db.setupServer(topics); + var catchupServer = new CatchupServer(ds); + var catchupService = new EventBusCatchupService(catchupServer,eb,topics); + + closeables = List.of(catchupService, mb, asyncExecutor); + } + +} diff --git a/vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java new file mode 100644 index 0000000..29e909d --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java @@ -0,0 +1,161 @@ +package com.p14n.postevent; + +import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.broker.*; +import com.p14n.postevent.catchup.CatchupService; +import com.p14n.postevent.catchup.PersistentBroker; +import com.p14n.postevent.catchup.UnprocessedSubmitter; +import com.p14n.postevent.client.EventBusCatchupClient; +import com.p14n.postevent.data.UnprocessedEventFinder; +import io.opentelemetry.api.OpenTelemetry; +import io.vertx.core.eventbus.EventBus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.SQLException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class VertxPersistentConsumer implements AutoCloseable, MessageBroker { + + private static final Logger logger = LoggerFactory.getLogger(VertxPersistentConsumer.class); + + private AsyncExecutor asyncExecutor; + private List closeables; + private TransactionalBroker tb; + SystemEventBroker seb; + OpenTelemetry ot; + private final int batchSize; + + public VertxPersistentConsumer(OpenTelemetry ot, AsyncExecutor asyncExecutor, int batchSize) { + this.asyncExecutor = asyncExecutor; + this.ot = ot; + this.batchSize = batchSize; + } + + public void start(Set topics, DataSource ds,EventBus eb, EventBusMessageBroker mb) { + logger.atInfo().log("Starting consumer client"); + + if (tb != null) { + logger.atError().log("Consumer client already started"); + throw new IllegalStateException("Already started"); + } + + try { + seb = new SystemEventBroker(asyncExecutor, ot); + tb = new TransactionalBroker(ds, asyncExecutor, ot, seb); + var pb = new PersistentBroker<>(tb, ds, seb); + //var client = new MessageBrokerGrpcClient(asyncExecutor, ot, channel); // needs fixed threads + //var catchupClient = new CatchupGrpcClient(channel); + + //Create a client that can listen to system events, register with seb + //register the pb with the eb for all topics + var catchupClient = new EventBusCatchupClient(eb); + + for (var topic : topics) { + mb.subscribeToEventBus(topic,pb); + } + seb.subscribe(new CatchupService(ds, catchupClient, seb)); + seb.subscribe(new UnprocessedSubmitter(seb, ds, new UnprocessedEventFinder(), tb, batchSize)); + + asyncExecutor.scheduleAtFixedRate( + () -> { + seb.publish(SystemEvent.UnprocessedCheckRequired); + for (String topic : topics) { + seb.publish(SystemEvent.FetchLatest.withTopic(topic)); + } + }, + 30, 30, TimeUnit.SECONDS); + + closeables = List.of(pb, seb, tb); + + logger.atInfo().log("Consumer client started successfully"); + + } catch (Exception e) { + logger.atError() + .setCause(e) + .log("Failed to start consumer client"); + throw new RuntimeException("Failed to start consumer client", e); + } + } + + /** + * Closes all resources associated with this consumer. + * This includes the message brokers, gRPC channel, and other closeable + * resources. + */ + @Override + public void close() { + logger.atInfo().log("Closing consumer client"); + + for (AutoCloseable c : closeables) { + try { + c.close(); + } catch (Exception e) { + logger.atWarn() + .setCause(e) + .addArgument(c.getClass().getSimpleName()) + .log("Error closing {}"); + } + } + + logger.atInfo().log("Consumer client closed"); + } + + /** + * Publishes a transactional event to the specified topic. + * + * @param topic The topic to publish to + * @param message The transactional event to publish + * @throws RuntimeException if publishing fails + */ + @Override + public void publish(String topic, TransactionalEvent message) { + try { + Publisher.publish(message.event(), message.connection(), topic); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Subscribes to events on the specified topic. + * Triggers a catchup event to ensure the subscriber receives any missed events. + * + * @param topic The topic to subscribe to + * @param subscriber The subscriber that will receive events + * @return true if subscription was successful, false otherwise + */ + @Override + public boolean subscribe(String topic, MessageSubscriber subscriber) { + var subscribed = tb.subscribe(topic, subscriber); + seb.publish(SystemEvent.CatchupRequired.withTopic(topic)); + return subscribed; + } + + /** + * Unsubscribes from events on the specified topic. + * + * @param topic The topic to unsubscribe from + * @param subscriber The subscriber to remove + * @return true if unsubscription was successful, false otherwise + */ + @Override + public boolean unsubscribe(String topic, MessageSubscriber subscriber) { + return tb.unsubscribe(topic, subscriber); + } + + /** + * Converts a transactional event. In this implementation, returns the event + * unchanged. + * + * @param m The transactional event to convert + * @return The same transactional event + */ + @Override + public TransactionalEvent convert(TransactionalEvent m) { + return m; + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java index a9a0c83..ac53d96 100644 --- a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java +++ b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java @@ -9,6 +9,7 @@ import io.vertx.core.json.JsonObject; import java.util.List; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,16 +47,17 @@ * // Service is now listening for catchup requests on the EventBus * } */ -public class EventBusCatchupService { +public class EventBusCatchupService implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class); - private static final String FETCH_EVENTS_ADDRESS = "catchup.fetchEvents"; - private static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.getLatestMessageId"; + private static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events."; + private static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest."; private final CatchupServerInterface catchupServer; private final EventBus eventBus; - private MessageConsumer fetchEventsConsumer; - private MessageConsumer getLatestMessageIdConsumer; + private List> fetchEventsConsumers; + private List> getLatestMessageIdConsumers; + private Set topics; /** * Creates a new EventBusCatchupService. @@ -63,9 +65,10 @@ public class EventBusCatchupService { * @param catchupServer The underlying catchup server implementation * @param eventBus The Vert.x EventBus to use for messaging */ - public EventBusCatchupService(CatchupServerInterface catchupServer, EventBus eventBus) { + public EventBusCatchupService(CatchupServerInterface catchupServer, EventBus eventBus, Set topics) { this.catchupServer = catchupServer; this.eventBus = eventBus; + this.topics = topics; } /** @@ -76,15 +79,22 @@ public void start() { logger.atInfo().log("Starting EventBusCatchupService"); // Register consumer for fetchEvents requests - fetchEventsConsumer = eventBus.consumer(FETCH_EVENTS_ADDRESS, this::handleFetchEvents); + fetchEventsConsumers = topics.stream().map( topic -> { + logger.atInfo() + .addArgument(FETCH_EVENTS_ADDRESS+topic) + .log("EventBusCatchupService started, listening on address: {}"); + + return eventBus.consumer(FETCH_EVENTS_ADDRESS+topic, this::handleFetchEvents); + }).toList(); // Register consumer for getLatestMessageId requests - getLatestMessageIdConsumer = eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS, this::handleGetLatestMessageId); + getLatestMessageIdConsumers = topics.stream().map(topic -> { + logger.atInfo() + .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS+topic) + .log("EventBusCatchupService started, listening on address: {}"); + return eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS+topic, this::handleGetLatestMessageId); + }).toList(); - logger.atInfo() - .addArgument(FETCH_EVENTS_ADDRESS) - .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS) - .log("EventBusCatchupService started, listening on addresses: {} and {}"); } /** @@ -93,14 +103,18 @@ public void start() { public void stop() { logger.atInfo().log("Stopping EventBusCatchupService"); - if (fetchEventsConsumer != null) { - fetchEventsConsumer.unregister(); - fetchEventsConsumer = null; + if (fetchEventsConsumers != null) { + for(var c : fetchEventsConsumers){ + c.unregister(); + } + fetchEventsConsumers = null; } - if (getLatestMessageIdConsumer != null) { - getLatestMessageIdConsumer.unregister(); - getLatestMessageIdConsumer = null; + if (getLatestMessageIdConsumers != null) { + for (var c:getLatestMessageIdConsumers){ + c.unregister(); + } + getLatestMessageIdConsumers = null; } logger.atInfo().log("EventBusCatchupService stopped"); @@ -196,4 +210,9 @@ private void handleGetLatestMessageId(Message message) { message.fail(500, e.getMessage()); } } + + @Override + public void close() throws Exception { + stop(); + } } diff --git a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java index b0aaf70..8fe7437 100644 --- a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java +++ b/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java @@ -61,8 +61,6 @@ */ public class EventBusMessageBroker extends EventMessageBroker { private static final Logger logger = LoggerFactory.getLogger(EventBusMessageBroker.class); - - private final Vertx vertx; private final EventBus eventBus; private final DataSource dataSource; private final Map> consumers = new ConcurrentHashMap<>(); @@ -70,17 +68,16 @@ public class EventBusMessageBroker extends EventMessageBroker { /** * Creates a new EventBusMessageBroker. * - * @param vertx The Vert.x instance to use + * @param eventBus The Vert.x EventBus instance to use * @param dataSource The DataSource for database persistence * @param executor The AsyncExecutor for handling asynchronous operations * @param ot OpenTelemetry instance for observability * @param name Name identifier for this broker instance */ - public EventBusMessageBroker(Vertx vertx, DataSource dataSource, AsyncExecutor executor, + public EventBusMessageBroker(EventBus eventBus, DataSource dataSource, AsyncExecutor executor, OpenTelemetry ot, String name) { super(executor, ot, name); - this.vertx = vertx; - this.eventBus = vertx.eventBus(); + this.eventBus = eventBus; this.dataSource = dataSource; // Register the Event codec for EventBus serialization @@ -91,6 +88,7 @@ public EventBusMessageBroker(Vertx vertx, DataSource dataSource, AsyncExecutor e .log("EventBusMessageBroker initialized: {}"); } + /** * Publishes an event using the dual-write pattern. * The event is first persisted to the database, then published to the EventBus. diff --git a/vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java b/vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java new file mode 100644 index 0000000..d7f090c --- /dev/null +++ b/vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java @@ -0,0 +1,42 @@ +package com.p14n.postevent.example; + +import com.p14n.postevent.VertxConsumerServer; +import com.p14n.postevent.VertxPersistentConsumer; +import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.broker.DefaultExecutor; +import com.p14n.postevent.data.Event; +import com.p14n.postevent.db.DatabaseSetup; +import io.opentelemetry.api.OpenTelemetry; +import io.vertx.core.Vertx; + +import javax.sql.DataSource; +import java.io.IOException; +import java.util.Set; +import java.util.UUID; + +public class VertxConsumerExample { + + public static void start(DataSource ds) throws IOException, InterruptedException { + DefaultExecutor executor = new DefaultExecutor(2); + var ot = OpenTelemetry.noop(); + var vertx = Vertx.vertx(); + var topics = Set.of("order"); + var mb = new EventBusMessageBroker(vertx.eventBus(),ds,executor, ot, "consumer_server"); + var server = new VertxConsumerServer(ds,executor,ot); + server.start(vertx.eventBus(),mb,topics); + + var client = new VertxPersistentConsumer(ot,executor,20); + client.start(topics,ds,vertx.eventBus(),mb); + client.subscribe("order", message -> { + System.out.println("Got message"); + }); + + mb.publish("order", Event.create(UUID.randomUUID().toString(), + "test", + "test", + "text", + null, + "test", + "hello".getBytes(),null)); + } +} diff --git a/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java b/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java index 03ff122..736c86a 100644 --- a/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java +++ b/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java @@ -144,11 +144,11 @@ public void start() throws Exception { // Create message broker that publishes to DB + EventBus DefaultExecutor executor = new DefaultExecutor(2); messageBroker = new EventBusMessageBroker( - vertx, dataSource, executor, OpenTelemetry.noop(), "vertx-server"); + vertx.eventBus(), dataSource, executor, OpenTelemetry.noop(), "vertx-server"); // Create catchup service for EventBus requests CatchupServer catchupServer = new CatchupServer(dataSource); - catchupService = new EventBusCatchupService(catchupServer, vertx.eventBus()); + catchupService = new EventBusCatchupService(catchupServer, vertx.eventBus(),Set.of()); catchupService.start(); System.out.println("🌐 Vert.x EventBus server started"); From d70f5eebea47ffc14c6f9911975ae1e0f68e4518 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Wed, 17 Sep 2025 23:11:32 +0100 Subject: [PATCH 3/9] Vertx example working --- vertx/README.md | 14 + .../p14n/postevent/VertxEventBusConsumer.java | 275 ------------------ .../example/VertxEventBusExample.java | 174 ----------- .../{ => vertx}/VertxConsumerServer.java | 9 +- .../{ => vertx}/VertxPersistentConsumer.java | 11 +- .../adapter/EventBusCatchupService.java | 115 ++++---- .../adapter/EventBusMessageBroker.java | 5 +- .../client/EventBusCatchupClient.java | 10 +- .../{ => vertx}/codec/EventCodec.java | 2 +- .../vertx}/example/VertxConsumerExample.java | 40 ++- 10 files changed, 130 insertions(+), 525 deletions(-) delete mode 100644 vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java delete mode 100644 vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java rename vertx/src/main/java/com/p14n/postevent/{ => vertx}/VertxConsumerServer.java (81%) rename vertx/src/main/java/com/p14n/postevent/{ => vertx}/VertxPersistentConsumer.java (94%) rename vertx/src/main/java/com/p14n/postevent/{ => vertx}/adapter/EventBusCatchupService.java (75%) rename vertx/src/main/java/com/p14n/postevent/{ => vertx}/adapter/EventBusMessageBroker.java (98%) rename vertx/src/main/java/com/p14n/postevent/{ => vertx}/client/EventBusCatchupClient.java (94%) rename vertx/src/main/java/com/p14n/postevent/{ => vertx}/codec/EventCodec.java (98%) rename vertx/src/{main/java/com/p14n/postevent => test/java/com/p14n/postevent/vertx}/example/VertxConsumerExample.java (52%) diff --git a/vertx/README.md b/vertx/README.md index e69de29..ceb5ec8 100644 --- a/vertx/README.md +++ b/vertx/README.md @@ -0,0 +1,14 @@ +# Vert.x EventBus Integration + +## Server implementation +EventBusMessageBroker - allows publishing to persistent topics, handles translation between vertx and postevent +VertxConsumerServer - sets up DDL for given topics and starts catchup for those topics + +## Client implementation +VertxPersistentConsumer - consumes events from vertx eventbus. Creates system event bus and catchup client, handles translation between vertx and postevent on the transactional consumer side. + +Todo +[] Implement autoclose on new classes +[] Adapt classes to use vertx threading model + + diff --git a/vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java b/vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java deleted file mode 100644 index 9e5720c..0000000 --- a/vertx/src/main/java/com/p14n/postevent/VertxEventBusConsumer.java +++ /dev/null @@ -1,275 +0,0 @@ -package com.p14n.postevent; - -import com.p14n.postevent.adapter.EventBusMessageBroker; -import com.p14n.postevent.broker.MessageSubscriber; -import com.p14n.postevent.broker.SystemEventBroker; -import com.p14n.postevent.catchup.CatchupService; -import com.p14n.postevent.client.EventBusCatchupClient; -import com.p14n.postevent.codec.EventCodec; -import com.p14n.postevent.data.ConfigData; -import com.p14n.postevent.data.Event; -import io.opentelemetry.api.OpenTelemetry; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.MessageConsumer; - -import javax.sql.DataSource; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Main consumer class that provides event consumption capabilities using Vert.x - * EventBus. - * This consumer combines real-time event delivery via EventBus with catchup - * capabilities - * for guaranteed event processing. - * - *

- * The consumer provides: - *

    - *
  • Real-time event consumption via Vert.x EventBus
  • - *
  • Automatic catchup for missed events using CatchupService
  • - *
  • Persistent event storage integration
  • - *
  • OpenTelemetry observability
  • - *
- *

- * - *

- * Architecture: - * - *

- * Events → Database → EventBus → VertxEventBusConsumer → Subscriber
- *                        ↑
- *                   CatchupService (for missed events)
- * 
- *

- * - *

- * Example usage: - *

- * - *
{@code
- * // Create configuration
- * ConfigData config = new ConfigData("myapp", Set.of("orders"), ...);
- * 
- * // Create consumer
- * VertxEventBusConsumer consumer = new VertxEventBusConsumer(config, OpenTelemetry.noop(), 30);
- * 
- * // Subscribe to events
- * consumer.subscribe("orders", event -> {
- *     System.out.println("Received: " + event);
- * });
- * 
- * // Start consuming
- * consumer.start(Set.of("orders"), dataSource);
- * }
- */ -public class VertxEventBusConsumer implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(VertxEventBusConsumer.class); - - private final Vertx vertx; - private final EventBus eventBus; - private final ConfigData config; - private final OpenTelemetry openTelemetry; - private final int catchupIntervalSeconds; - private final Map> consumers = new ConcurrentHashMap<>(); - - private CatchupService catchupService; - private EventBusMessageBroker messageBroker; - private boolean started = false; - - /** - * Creates a new VertxEventBusConsumer. - * - * @param config Configuration data for the consumer - * @param openTelemetry OpenTelemetry instance for observability - * @param catchupIntervalSeconds Interval in seconds for catchup operations - */ - public VertxEventBusConsumer(ConfigData config, OpenTelemetry openTelemetry, int catchupIntervalSeconds) { - this.config = config; - this.openTelemetry = openTelemetry; - this.catchupIntervalSeconds = catchupIntervalSeconds; - this.vertx = Vertx.vertx(); - this.eventBus = vertx.eventBus(); - - // Register Event codec for EventBus serialization - eventBus.registerDefaultCodec(Event.class, new EventCodec()); - - logger.atInfo() - .addArgument(config.affinity()) - .addArgument(catchupIntervalSeconds) - .log("VertxEventBusConsumer created for {} with catchup interval {}s"); - } - - /** - * Starts the consumer with the specified topics and data source. - * This initializes the catchup service and prepares for event consumption. - * - * @param topics The set of topics to consume from - * @param dataSource The DataSource for database operations - * @throws Exception If startup fails - */ - public void start(Set topics, DataSource dataSource) throws Exception { - if (started) { - throw new IllegalStateException("Consumer is already started"); - } - - logger.atInfo() - .addArgument(topics) - .log("Starting VertxEventBusConsumer for topics: {}"); - - try { - // Create EventBus-based catchup client - EventBusCatchupClient catchupClient = new EventBusCatchupClient(eventBus); - - // Create system event broker for catchup coordination - SystemEventBroker systemEventBroker = new SystemEventBroker(openTelemetry); - - // Initialize catchup service - catchupService = new CatchupService(dataSource, catchupClient, systemEventBroker); - - started = true; - - logger.atInfo() - .addArgument(topics) - .log("VertxEventBusConsumer started successfully for topics: {}"); - - } catch (Exception e) { - logger.atError() - .setCause(e) - .log("Failed to start VertxEventBusConsumer"); - throw e; - } - } - - /** - * Subscribes to events on a specific topic via the EventBus. - * Creates an EventBus consumer that listens for events on the topic. - * - * @param topic The topic to subscribe to - * @param subscriber The subscriber that will receive events - */ - public void subscribe(String topic, MessageSubscriber subscriber) { - if (!started) { - throw new IllegalStateException("Consumer must be started before subscribing"); - } - - logger.atInfo() - .addArgument(topic) - .log("Subscribing to topic: {}"); - - String eventBusAddress = "events." + topic; - - MessageConsumer consumer = eventBus.consumer(eventBusAddress); - consumer.handler(message -> { - Event event = message.body(); - logger.atDebug() - .addArgument(topic) - .addArgument(event.id()) - .log("Received event on topic {} with id {}"); - - try { - subscriber.onMessage(event); - } catch (Exception e) { - logger.atError() - .addArgument(topic) - .addArgument(event.id()) - .setCause(e) - .log("Error processing event on topic {} with id {}"); - } - }); - - // Store consumer for cleanup - consumers.put(topic, consumer); - - logger.atInfo() - .addArgument(topic) - .addArgument(eventBusAddress) - .log("Successfully subscribed to topic {} at address {}"); - } - - /** - * Unsubscribes from a specific topic. - * - * @param topic The topic to unsubscribe from - */ - public void unsubscribe(String topic) { - MessageConsumer consumer = consumers.remove(topic); - if (consumer != null) { - consumer.unregister(); - logger.atInfo() - .addArgument(topic) - .log("Unsubscribed from topic: {}"); - } - } - - /** - * Gets the Vert.x instance used by this consumer. - * This can be useful for advanced integrations. - * - * @return The Vert.x instance - */ - public Vertx getVertx() { - return vertx; - } - - /** - * Gets the EventBus instance used by this consumer. - * This can be useful for advanced integrations. - * - * @return The EventBus instance - */ - public EventBus getEventBus() { - return eventBus; - } - - /** - * Checks if the consumer is currently started. - * - * @return true if started, false otherwise - */ - public boolean isStarted() { - return started; - } - - /** - * Closes the consumer and cleans up all resources. - * This stops the catchup service, unregisters all consumers, and closes Vert.x. - */ - @Override - public void close() { - logger.atInfo().log("Closing VertxEventBusConsumer"); - - // Unregister all EventBus consumers - consumers.values().forEach(MessageConsumer::unregister); - consumers.clear(); - - // Stop catchup service - // Note: CatchupService doesn't need explicit closing - // It's managed by the SystemEventBroker - - // Close message broker if created - if (messageBroker != null) { - try { - messageBroker.close(); - } catch (Exception e) { - logger.atWarn() - .setCause(e) - .log("Error closing message broker"); - } - } - - // Close Vert.x - if (vertx != null) { - vertx.close(); - } - - started = false; - - logger.atInfo().log("VertxEventBusConsumer closed"); - } -} diff --git a/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java b/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java deleted file mode 100644 index 736c86a..0000000 --- a/vertx/src/main/java/com/p14n/postevent/example/VertxEventBusExample.java +++ /dev/null @@ -1,174 +0,0 @@ -package com.p14n.postevent.example; - -import com.p14n.postevent.VertxEventBusConsumer; -import com.p14n.postevent.adapter.EventBusMessageBroker; -import com.p14n.postevent.adapter.EventBusCatchupService; -import com.p14n.postevent.broker.DefaultExecutor; -import com.p14n.postevent.catchup.CatchupServer; -import com.p14n.postevent.data.ConfigData; -import com.p14n.postevent.data.Event; -import com.p14n.postevent.db.DatabaseSetup; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import io.opentelemetry.api.OpenTelemetry; -import io.vertx.core.Vertx; - -import javax.sql.DataSource; -import java.time.Instant; -import java.util.Set; - -/** - * Example demonstrating how to use the Vert.x EventBus module for event - * processing. - * - * This example shows: - * 1. Setting up a server with EventBus message broker and catchup service - * 2. Creating a consumer that receives events via EventBus - * 3. Publishing events that are persisted and distributed via EventBus - * 4. Automatic catchup for guaranteed event delivery - */ -public class VertxEventBusExample { - - public static void main(String[] args) throws Exception { - // Setup database - DataSource dataSource = createDataSource(); - - // Configuration - ConfigData config = new ConfigData( - "vertx-example", - Set.of("orders", "payments"), - "localhost", - 5432, - "postevent", - "postgres", - "password"); - - // Start server components - VertxEventBusServer server = new VertxEventBusServer(config, dataSource); - server.start(); - - // Start consumer - VertxEventBusConsumer consumer = new VertxEventBusConsumer(config, OpenTelemetry.noop(), 30); - consumer.start(Set.of("orders", "payments"), dataSource); - - // Subscribe to events - consumer.subscribe("orders", event -> { - System.out.println("📦 Order Event: " + event.id() + " - " + event.type()); - }); - - consumer.subscribe("payments", event -> { - System.out.println("💳 Payment Event: " + event.id() + " - " + event.type()); - }); - - // Publish some test events - publishTestEvents(server.getMessageBroker()); - - // Keep running for demonstration - System.out.println("🚀 Vert.x EventBus example running..."); - System.out.println("📡 Events are being published and consumed via EventBus"); - System.out.println("🔄 Catchup service ensures no events are missed"); - System.out.println("Press Ctrl+C to stop"); - - // Add shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("\n🛑 Shutting down..."); - try { - consumer.close(); - server.stop(); - } catch (Exception e) { - e.printStackTrace(); - } - })); - - // Keep main thread alive - Thread.currentThread().join(); - } - - private static void publishTestEvents(EventBusMessageBroker broker) { - // Publish order events - for (int i = 1; i <= 5; i++) { - Event orderEvent = Event.create( - "order-" + i, - "order-service", - "order.created", - "application/json", - null, - "orders", - ("Order " + i + " data").getBytes(), - null); - broker.publish("orders", orderEvent); - } - - // Publish payment events - for (int i = 1; i <= 3; i++) { - Event paymentEvent = Event.create( - "payment-" + i, - "payment-service", - "payment.processed", - "application/json", - null, - "payments", - ("Payment " + i + " data").getBytes(), - null); - broker.publish("payments", paymentEvent); - } - } - - private static DataSource createDataSource() { - HikariConfig config = new HikariConfig(); - config.setJdbcUrl("jdbc:postgresql://localhost:5432/postevent"); - config.setUsername("postgres"); - config.setPassword("password"); - config.setMaximumPoolSize(10); - return new HikariDataSource(config); - } - - /** - * Server component that sets up EventBus message broker and catchup service. - */ - public static class VertxEventBusServer { - private final ConfigData config; - private final DataSource dataSource; - private final Vertx vertx; - - private EventBusMessageBroker messageBroker; - private EventBusCatchupService catchupService; - - public VertxEventBusServer(ConfigData config, DataSource dataSource) { - this.config = config; - this.dataSource = dataSource; - this.vertx = Vertx.vertx(); - } - - public void start() throws Exception { - // Create message broker that publishes to DB + EventBus - DefaultExecutor executor = new DefaultExecutor(2); - messageBroker = new EventBusMessageBroker( - vertx.eventBus(), dataSource, executor, OpenTelemetry.noop(), "vertx-server"); - - // Create catchup service for EventBus requests - CatchupServer catchupServer = new CatchupServer(dataSource); - catchupService = new EventBusCatchupService(catchupServer, vertx.eventBus(),Set.of()); - catchupService.start(); - - System.out.println("🌐 Vert.x EventBus server started"); - } - - public void stop() throws Exception { - if (catchupService != null) { - catchupService.stop(); - } - if (messageBroker != null) { - messageBroker.close(); - } - if (vertx != null) { - vertx.close(); - } - System.out.println("🛑 Vert.x EventBus server stopped"); - } - - public EventBusMessageBroker getMessageBroker() { - return messageBroker; - } - } -} diff --git a/vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java similarity index 81% rename from vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java rename to vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java index bd20d61..5448155 100644 --- a/vertx/src/main/java/com/p14n/postevent/VertxConsumerServer.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java @@ -1,7 +1,7 @@ -package com.p14n.postevent; +package com.p14n.postevent.vertx; -import com.p14n.postevent.adapter.EventBusCatchupService; -import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.vertx.adapter.EventBusCatchupService; +import com.p14n.postevent.vertx.adapter.EventBusMessageBroker; import com.p14n.postevent.broker.AsyncExecutor; import com.p14n.postevent.catchup.CatchupServer; import com.p14n.postevent.db.DatabaseSetup; @@ -39,6 +39,9 @@ public void start(EventBus eb, EventBusMessageBroker mb, Set topics) thr var catchupService = new EventBusCatchupService(catchupServer,eb,topics); closeables = List.of(catchupService, mb, asyncExecutor); + System.out.println("🌐 Vert.x EventBus server started"); + //System.out.println("🛑 Vert.x EventBus server stopped"); + } } diff --git a/vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java similarity index 94% rename from vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java rename to vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java index 29e909d..4bd92e3 100644 --- a/vertx/src/main/java/com/p14n/postevent/VertxPersistentConsumer.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java @@ -1,11 +1,13 @@ -package com.p14n.postevent; +package com.p14n.postevent.vertx; -import com.p14n.postevent.adapter.EventBusMessageBroker; +import com.p14n.postevent.Publisher; +import com.p14n.postevent.db.DatabaseSetup; +import com.p14n.postevent.vertx.adapter.EventBusMessageBroker; import com.p14n.postevent.broker.*; import com.p14n.postevent.catchup.CatchupService; import com.p14n.postevent.catchup.PersistentBroker; import com.p14n.postevent.catchup.UnprocessedSubmitter; -import com.p14n.postevent.client.EventBusCatchupClient; +import com.p14n.postevent.vertx.client.EventBusCatchupClient; import com.p14n.postevent.data.UnprocessedEventFinder; import io.opentelemetry.api.OpenTelemetry; import io.vertx.core.eventbus.EventBus; @@ -42,6 +44,8 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage logger.atError().log("Consumer client already started"); throw new IllegalStateException("Already started"); } + var db = new DatabaseSetup(ds); + db.setupClient(); try { seb = new SystemEventBroker(asyncExecutor, ot); @@ -73,6 +77,7 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage logger.atInfo().log("Consumer client started successfully"); + } catch (Exception e) { logger.atError() .setCause(e) diff --git a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java similarity index 75% rename from vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java rename to vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java index ac53d96..39eac9d 100644 --- a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusCatchupService.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java @@ -1,4 +1,4 @@ -package com.p14n.postevent.adapter; +package com.p14n.postevent.vertx.adapter; import com.p14n.postevent.catchup.CatchupServerInterface; import com.p14n.postevent.data.Event; @@ -15,7 +15,8 @@ import org.slf4j.LoggerFactory; /** - * Service that exposes CatchupServerInterface methods via Vert.x EventBus messaging. + * Service that exposes CatchupServerInterface methods via Vert.x EventBus + * messaging. * This allows remote clients to request catchup operations through the EventBus * using a request-reply pattern. * @@ -29,7 +30,8 @@ * EventBus addresses: *
    *
  • {@code catchup.fetchEvents} - Fetch events within a range
  • - *
  • {@code catchup.getLatestMessageId} - Get the latest message ID for a topic
  • + *
  • {@code catchup.getLatestMessageId} - Get the latest message ID for a + * topic
  • *
*

* @@ -49,16 +51,16 @@ */ public class EventBusCatchupService implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupService.class); - - private static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events."; - private static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest."; - + + public static final String FETCH_EVENTS_ADDRESS = "catchup.fetch_events."; + public static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.get_latest."; + private final CatchupServerInterface catchupServer; private final EventBus eventBus; private List> fetchEventsConsumers; private List> getLatestMessageIdConsumers; private Set topics; - + /** * Creates a new EventBusCatchupService. * @@ -70,60 +72,62 @@ public EventBusCatchupService(CatchupServerInterface catchupServer, EventBus eve this.eventBus = eventBus; this.topics = topics; } - + /** * Starts the service by registering EventBus consumers for catchup operations. - * This method sets up listeners for both fetchEvents and getLatestMessageId requests. + * This method sets up listeners for both fetchEvents and getLatestMessageId + * requests. */ public void start() { logger.atInfo().log("Starting EventBusCatchupService"); - + // Register consumer for fetchEvents requests - fetchEventsConsumers = topics.stream().map( topic -> { + fetchEventsConsumers = topics.stream().map(topic -> { logger.atInfo() - .addArgument(FETCH_EVENTS_ADDRESS+topic) + .addArgument(FETCH_EVENTS_ADDRESS + topic) .log("EventBusCatchupService started, listening on address: {}"); - return eventBus.consumer(FETCH_EVENTS_ADDRESS+topic, this::handleFetchEvents); + return eventBus.consumer(FETCH_EVENTS_ADDRESS + topic, this::handleFetchEvents); }).toList(); - + // Register consumer for getLatestMessageId requests getLatestMessageIdConsumers = topics.stream().map(topic -> { logger.atInfo() - .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS+topic) + .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS + topic) .log("EventBusCatchupService started, listening on address: {}"); - return eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS+topic, this::handleGetLatestMessageId); + return eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS + topic, this::handleGetLatestMessageId); }).toList(); - + } - + /** * Stops the service by unregistering EventBus consumers. */ public void stop() { logger.atInfo().log("Stopping EventBusCatchupService"); - + if (fetchEventsConsumers != null) { - for(var c : fetchEventsConsumers){ + for (var c : fetchEventsConsumers) { c.unregister(); } fetchEventsConsumers = null; } - + if (getLatestMessageIdConsumers != null) { - for (var c:getLatestMessageIdConsumers){ + for (var c : getLatestMessageIdConsumers) { c.unregister(); } getLatestMessageIdConsumers = null; } - + logger.atInfo().log("EventBusCatchupService stopped"); } - + /** * Handles fetchEvents requests from the EventBus. * * Expected request format: + * *
{@code
      * {
      *   "fromId": 100,
@@ -137,43 +141,44 @@ public void stop() {
      */
     private void handleFetchEvents(Message message) {
         JsonObject request = message.body();
-        
+
         try {
             long fromId = request.getLong("fromId");
             long toId = request.getLong("toId");
             int limit = request.getInteger("limit");
             String topic = request.getString("topic");
-            
+
             logger.atDebug()
-                .addArgument(fromId)
-                .addArgument(toId)
-                .addArgument(limit)
-                .addArgument(topic)
-                .log("Handling fetchEvents request: fromId={}, toId={}, limit={}, topic={}");
-            
+                    .addArgument(fromId)
+                    .addArgument(toId)
+                    .addArgument(limit)
+                    .addArgument(topic)
+                    .log("Handling fetchEvents request: fromId={}, toId={}, limit={}, topic={}");
+
             List events = catchupServer.fetchEvents(fromId, toId, limit, topic);
-            
+
             // Serialize events to JSON and reply
             String eventsJson = Json.encode(events);
             message.reply(eventsJson);
-            
+
             logger.atDebug()
-                .addArgument(events.size())
-                .addArgument(topic)
-                .log("Successfully fetched {} events for topic {}", events.size(), topic);
-            
+                    .addArgument(events.size())
+                    .addArgument(topic)
+                    .log("Successfully fetched {} events for topic {}", events.size(), topic);
+
         } catch (Exception e) {
             logger.atError()
-                .setCause(e)
-                .log("Error handling fetchEvents request");
+                    .setCause(e)
+                    .log("Error handling fetchEvents request");
             message.fail(500, e.getMessage());
         }
     }
-    
+
     /**
      * Handles getLatestMessageId requests from the EventBus.
      * 
      * Expected request format:
+     * 
      * 
{@code
      * {
      *   "topic": "orders"
@@ -184,29 +189,29 @@ private void handleFetchEvents(Message message) {
      */
     private void handleGetLatestMessageId(Message message) {
         JsonObject request = message.body();
-        
+
         try {
             String topic = request.getString("topic");
-            
+
             logger.atDebug()
-                .addArgument(topic)
-                .log("Handling getLatestMessageId request for topic: {}");
-            
+                    .addArgument(topic)
+                    .log("Handling getLatestMessageId request for topic: {}");
+
             long latestId = catchupServer.getLatestMessageId(topic);
-            
+
             // Create response with latest ID
             JsonObject response = new JsonObject().put("latestId", latestId);
             message.reply(response);
-            
+
             logger.atDebug()
-                .addArgument(latestId)
-                .addArgument(topic)
-                .log("Successfully retrieved latest message ID {} for topic {}", latestId, topic);
-            
+                    .addArgument(latestId)
+                    .addArgument(topic)
+                    .log("Successfully retrieved latest message ID {} for topic {}", latestId, topic);
+
         } catch (Exception e) {
             logger.atError()
-                .setCause(e)
-                .log("Error handling getLatestMessageId request");
+                    .setCause(e)
+                    .log("Error handling getLatestMessageId request");
             message.fail(500, e.getMessage());
         }
     }
diff --git a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
similarity index 98%
rename from vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java
rename to vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
index 8fe7437..780f70c 100644
--- a/vertx/src/main/java/com/p14n/postevent/adapter/EventBusMessageBroker.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
@@ -1,13 +1,12 @@
-package com.p14n.postevent.adapter;
+package com.p14n.postevent.vertx.adapter;
 
 import com.p14n.postevent.Publisher;
 import com.p14n.postevent.broker.AsyncExecutor;
 import com.p14n.postevent.broker.EventMessageBroker;
 import com.p14n.postevent.broker.MessageSubscriber;
-import com.p14n.postevent.codec.EventCodec;
+import com.p14n.postevent.vertx.codec.EventCodec;
 import com.p14n.postevent.data.Event;
 import io.opentelemetry.api.OpenTelemetry;
-import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
 import io.vertx.core.eventbus.MessageConsumer;
 
diff --git a/vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
similarity index 94%
rename from vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java
rename to vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
index 8e1839e..bcc96d7 100644
--- a/vertx/src/main/java/com/p14n/postevent/client/EventBusCatchupClient.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
@@ -1,4 +1,4 @@
-package com.p14n.postevent.client;
+package com.p14n.postevent.vertx.client;
 
 import com.p14n.postevent.catchup.CatchupServerInterface;
 import com.p14n.postevent.data.Event;
@@ -12,6 +12,8 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static com.p14n.postevent.vertx.adapter.EventBusCatchupService.FETCH_EVENTS_ADDRESS;
+import static com.p14n.postevent.vertx.adapter.EventBusCatchupService.GET_LATEST_MESSAGE_ID_ADDRESS;
 
 /**
  * Client implementation of CatchupServerInterface that sends requests
@@ -47,8 +49,6 @@
 public class EventBusCatchupClient implements CatchupServerInterface {
     private static final Logger logger = LoggerFactory.getLogger(EventBusCatchupClient.class);
 
-    private static final String FETCH_EVENTS_ADDRESS = "catchup.fetchEvents";
-    private static final String GET_LATEST_MESSAGE_ID_ADDRESS = "catchup.getLatestMessageId";
     private static final long DEFAULT_TIMEOUT_SECONDS = 30;
 
     private final EventBus eventBus;
@@ -103,7 +103,7 @@ public List fetchEvents(long fromId, long toId, int limit, String topic)
         try {
             CompletableFuture future = new CompletableFuture<>();
 
-            eventBus.request(FETCH_EVENTS_ADDRESS, request, reply -> {
+            eventBus.request(FETCH_EVENTS_ADDRESS+topic, request, reply -> {
                 if (reply.succeeded()) {
                     String eventsJson = (String) reply.result().body();
                     future.complete(eventsJson);
@@ -151,7 +151,7 @@ public long getLatestMessageId(String topic) {
         try {
             CompletableFuture future = new CompletableFuture<>();
 
-            eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS, request, reply -> {
+            eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS+topic, request, reply -> {
                 if (reply.succeeded()) {
                     JsonObject response = (JsonObject) reply.result().body();
                     future.complete(response);
diff --git a/vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
similarity index 98%
rename from vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java
rename to vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
index 2e7af58..4945481 100644
--- a/vertx/src/main/java/com/p14n/postevent/codec/EventCodec.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java
@@ -1,4 +1,4 @@
-package com.p14n.postevent.codec;
+package com.p14n.postevent.vertx.codec;
 
 import com.p14n.postevent.data.Event;
 import io.vertx.core.buffer.Buffer;
diff --git a/vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java b/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java
similarity index 52%
rename from vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java
rename to vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java
index d7f090c..bdfa5dc 100644
--- a/vertx/src/main/java/com/p14n/postevent/example/VertxConsumerExample.java
+++ b/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java
@@ -1,22 +1,27 @@
-package com.p14n.postevent.example;
+package com.p14n.postevent.vertx.example;
 
-import com.p14n.postevent.VertxConsumerServer;
-import com.p14n.postevent.VertxPersistentConsumer;
-import com.p14n.postevent.adapter.EventBusMessageBroker;
+import com.p14n.postevent.vertx.VertxConsumerServer;
+import com.p14n.postevent.vertx.VertxPersistentConsumer;
+import com.p14n.postevent.vertx.adapter.EventBusMessageBroker;
 import com.p14n.postevent.broker.DefaultExecutor;
 import com.p14n.postevent.data.Event;
-import com.p14n.postevent.db.DatabaseSetup;
 import io.opentelemetry.api.OpenTelemetry;
 import io.vertx.core.Vertx;
 
 import javax.sql.DataSource;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
 
 public class VertxConsumerExample {
 
     public static void start(DataSource ds) throws IOException, InterruptedException {
+
         DefaultExecutor executor = new DefaultExecutor(2);
         var ot = OpenTelemetry.noop();
         var vertx = Vertx.vertx();
@@ -25,10 +30,22 @@ public static void start(DataSource ds) throws IOException, InterruptedException
         var server = new VertxConsumerServer(ds,executor,ot);
         server.start(vertx.eventBus(),mb,topics);
 
+        var latch = new CountDownLatch(2);
+
         var client = new VertxPersistentConsumer(ot,executor,20);
         client.start(topics,ds,vertx.eventBus(),mb);
+
+        mb.publish("order", Event.create(UUID.randomUUID().toString(),
+                "test",
+                "test",
+                "text",
+                null,
+                "test",
+                "hello".getBytes(), Instant.now(),1L ,"order",null));
+
         client.subscribe("order", message -> {
             System.out.println("Got message");
+            latch.countDown();
         });
 
         mb.publish("order", Event.create(UUID.randomUUID().toString(),
@@ -37,6 +54,17 @@ public static void start(DataSource ds) throws IOException, InterruptedException
                 "text",
                 null,
                 "test",
-                "hello".getBytes(),null));
+                "hello".getBytes(), Instant.now(),2L ,"order",null));
+
+        latch.await(10, TimeUnit.SECONDS);
+    }
+
+    public static void main(String[] args){
+        try(var pg = EmbeddedPostgres.start()){
+            start(pg.getPostgresDatabase());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
     }
 }

From 295cc6298094aa62bc1f2d9be2fc5cd0c7145260 Mon Sep 17 00:00:00 2001
From: Dean Chapman 
Date: Thu, 18 Sep 2025 21:46:52 +0100
Subject: [PATCH 4/9] Implement Autoclosable on new classes

---
 vertx/README.md                               |  4 +-
 .../postevent/vertx/VertxConsumerServer.java  | 16 ++++-
 .../vertx/VertxPersistentConsumer.java        |  5 --
 .../vertx/example/VertxConsumerExample.java   | 67 ++++++++++---------
 4 files changed, 52 insertions(+), 40 deletions(-)

diff --git a/vertx/README.md b/vertx/README.md
index ceb5ec8..dfab57a 100644
--- a/vertx/README.md
+++ b/vertx/README.md
@@ -8,7 +8,7 @@ VertxConsumerServer - sets up DDL for given topics and starts catchup for those
 VertxPersistentConsumer - consumes events from vertx eventbus.  Creates system event bus and catchup client, handles translation between vertx and postevent on the transactional consumer side.
 
 Todo
-[] Implement autoclose on new classes
-[] Adapt classes to use vertx threading model
+ - [x] Implement autoclose on new classes
+ - [ ] Adapt classes to use vertx threading model
 
 
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
index 5448155..c9d254b 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
@@ -15,7 +15,7 @@
 import java.util.List;
 import java.util.Set;
 
-public class VertxConsumerServer {
+public class VertxConsumerServer implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(VertxConsumerServer.class);
 
     private DataSource ds;
@@ -40,8 +40,20 @@ public void start(EventBus eb, EventBusMessageBroker mb, Set topics) thr
 
         closeables = List.of(catchupService, mb, asyncExecutor);
         System.out.println("🌐 Vert.x EventBus server started");
-        //System.out.println("🛑 Vert.x EventBus server stopped");
 
     }
 
+    @Override
+    public void close() {
+        if(closeables != null){
+            for(var c : closeables){
+                try {
+                    c.close();
+                } catch (Exception e){
+
+                }
+            }
+        }
+        System.out.println("🛑 Vert.x EventBus server stopped");
+    }
 }
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
index 4bd92e3..7130cef 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java
@@ -51,11 +51,6 @@ public void start(Set topics, DataSource ds,EventBus eb, EventBusMessage
             seb = new SystemEventBroker(asyncExecutor, ot);
             tb = new TransactionalBroker(ds, asyncExecutor, ot, seb);
             var pb = new PersistentBroker<>(tb, ds, seb);
-            //var client = new MessageBrokerGrpcClient(asyncExecutor, ot, channel); // needs fixed threads
-            //var catchupClient = new CatchupGrpcClient(channel);
-
-            //Create a client that can listen to system events, register with seb
-            //register the pb with the eb for all topics
             var catchupClient = new EventBusCatchupClient(eb);
 
             for (var topic : topics) {
diff --git a/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java b/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java
index bdfa5dc..830754b 100644
--- a/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java
+++ b/vertx/src/test/java/com/p14n/postevent/vertx/example/VertxConsumerExample.java
@@ -26,37 +26,42 @@ public static void start(DataSource ds) throws IOException, InterruptedException
         var ot = OpenTelemetry.noop();
         var vertx = Vertx.vertx();
         var topics = Set.of("order");
-        var mb = new EventBusMessageBroker(vertx.eventBus(),ds,executor, ot, "consumer_server");
-        var server = new VertxConsumerServer(ds,executor,ot);
-        server.start(vertx.eventBus(),mb,topics);
-
-        var latch = new CountDownLatch(2);
-
-        var client = new VertxPersistentConsumer(ot,executor,20);
-        client.start(topics,ds,vertx.eventBus(),mb);
-
-        mb.publish("order", Event.create(UUID.randomUUID().toString(),
-                "test",
-                "test",
-                "text",
-                null,
-                "test",
-                "hello".getBytes(), Instant.now(),1L ,"order",null));
-
-        client.subscribe("order", message -> {
-            System.out.println("Got message");
-            latch.countDown();
-        });
-
-        mb.publish("order", Event.create(UUID.randomUUID().toString(),
-                "test",
-                "test",
-                "text",
-                null,
-                "test",
-                "hello".getBytes(), Instant.now(),2L ,"order",null));
-
-        latch.await(10, TimeUnit.SECONDS);
+
+        try(var mb = new EventBusMessageBroker(vertx.eventBus(),ds,executor, ot, "consumer_server");
+            var server = new VertxConsumerServer(ds,executor,ot);
+            var client = new VertxPersistentConsumer(ot,executor,20)){
+
+            server.start(vertx.eventBus(),mb,topics);
+
+            var latch = new CountDownLatch(2);
+
+            client.start(topics,ds,vertx.eventBus(),mb);
+
+            mb.publish("order", Event.create(UUID.randomUUID().toString(),
+                    "test",
+                    "test",
+                    "text",
+                    null,
+                    "test",
+                    "hello".getBytes(), Instant.now(),1L ,"order",null));
+
+            client.subscribe("order", message -> {
+                System.out.println("Got message");
+                latch.countDown();
+            });
+
+            mb.publish("order", Event.create(UUID.randomUUID().toString(),
+                    "test",
+                    "test",
+                    "text",
+                    null,
+                    "test",
+                    "hello".getBytes(), Instant.now(),2L ,"order",null));
+
+            latch.await(10, TimeUnit.SECONDS);
+
+        }
+        vertx.close();
     }
 
     public static void main(String[] args){

From 51ac0398c3bbc68a86da664cad461489d2aa8578 Mon Sep 17 00:00:00 2001
From: Dean Chapman 
Date: Thu, 18 Sep 2025 22:14:20 +0100
Subject: [PATCH 5/9] Don't block the event loop

---
 .../postevent/vertx/VertxConsumerServer.java  |  2 +-
 .../vertx/adapter/EventBusCatchupService.java | 37 ++++++++++----
 .../vertx/adapter/EventBusMessageBroker.java  | 50 ++++++++++++++-----
 .../vertx/client/EventBusCatchupClient.java   |  4 +-
 4 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
index c9d254b..7740aac 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxConsumerServer.java
@@ -36,7 +36,7 @@ public void start(EventBus eb, EventBusMessageBroker mb, Set topics) thr
         var db = new DatabaseSetup(ds);
         db.setupServer(topics);
         var catchupServer = new CatchupServer(ds);
-        var catchupService = new EventBusCatchupService(catchupServer,eb,topics);
+        var catchupService = new EventBusCatchupService(catchupServer,eb,topics,this.asyncExecutor);
 
         closeables = List.of(catchupService, mb, asyncExecutor);
         System.out.println("🌐 Vert.x EventBus server started");
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
index 39eac9d..44a25ab 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java
@@ -1,5 +1,6 @@
 package com.p14n.postevent.vertx.adapter;
 
+import com.p14n.postevent.broker.AsyncExecutor;
 import com.p14n.postevent.catchup.CatchupServerInterface;
 import com.p14n.postevent.data.Event;
 import io.vertx.core.eventbus.EventBus;
@@ -60,6 +61,7 @@ public class EventBusCatchupService implements AutoCloseable {
     private List> fetchEventsConsumers;
     private List> getLatestMessageIdConsumers;
     private Set topics;
+    private AsyncExecutor executor;
 
     /**
      * Creates a new EventBusCatchupService.
@@ -67,10 +69,14 @@ public class EventBusCatchupService implements AutoCloseable {
      * @param catchupServer The underlying catchup server implementation
      * @param eventBus      The Vert.x EventBus to use for messaging
      */
-    public EventBusCatchupService(CatchupServerInterface catchupServer, EventBus eventBus, Set topics) {
+    public EventBusCatchupService(CatchupServerInterface catchupServer,
+                                  EventBus eventBus,
+                                  Set topics,
+                                  AsyncExecutor executor) {
         this.catchupServer = catchupServer;
         this.eventBus = eventBus;
         this.topics = topics;
+        this.executor = executor;
     }
 
     /**
@@ -155,16 +161,29 @@ private void handleFetchEvents(Message message) {
                     .addArgument(topic)
                     .log("Handling fetchEvents request: fromId={}, toId={}, limit={}, topic={}");
 
-            List events = catchupServer.fetchEvents(fromId, toId, limit, topic);
+            executor.submit(() -> {
 
-            // Serialize events to JSON and reply
-            String eventsJson = Json.encode(events);
-            message.reply(eventsJson);
+                try{
+                    List events = catchupServer.fetchEvents(fromId, toId, limit, topic);
 
-            logger.atDebug()
-                    .addArgument(events.size())
-                    .addArgument(topic)
-                    .log("Successfully fetched {} events for topic {}", events.size(), topic);
+                    // Serialize events to JSON and reply
+                    String eventsJson = Json.encode(events);
+                    message.reply(eventsJson);
+
+                    logger.atDebug()
+                            .addArgument(events.size())
+                            .addArgument(topic)
+                            .log("Successfully fetched {} events for topic {}", events.size(), topic);
+
+                } catch (Exception e){
+                    logger.atError()
+                            .setCause(e)
+                            .log("Error handling fetchEvents request");
+                    message.fail(500, e.getMessage());
+
+                }
+                return null;
+            });
 
         } catch (Exception e) {
             logger.atError()
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
index 780f70c..5e585b3 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java
@@ -63,6 +63,7 @@ public class EventBusMessageBroker extends EventMessageBroker {
     private final EventBus eventBus;
     private final DataSource dataSource;
     private final Map> consumers = new ConcurrentHashMap<>();
+    private final AsyncExecutor executor;
 
     /**
      * Creates a new EventBusMessageBroker.
@@ -78,7 +79,7 @@ public EventBusMessageBroker(EventBus eventBus, DataSource dataSource, AsyncExec
         super(executor, ot, name);
         this.eventBus = eventBus;
         this.dataSource = dataSource;
-
+        this.executor = executor;
         // Register the Event codec for EventBus serialization
         eventBus.registerDefaultCodec(Event.class, new EventCodec());
 
@@ -87,7 +88,6 @@ public EventBusMessageBroker(EventBus eventBus, DataSource dataSource, AsyncExec
                 .log("EventBusMessageBroker initialized: {}");
     }
 
-
     /**
      * Publishes an event using the dual-write pattern.
      * The event is first persisted to the database, then published to the EventBus.
@@ -103,17 +103,32 @@ public void publish(String topic, Event event) {
                 .log("Publishing event to topic {} with id {}");
 
         try {
-            // First, persist to database using existing Publisher
-            Publisher.publish(event, dataSource, topic);
 
-            // Then, publish to EventBus for real-time distribution
-            String eventBusAddress = "events." + topic;
-            eventBus.publish(eventBusAddress, event);
+            executor.submit(() -> {
+                try {
+                    Publisher.publish(event, dataSource, topic);
 
-            logger.atDebug()
-                    .addArgument(topic)
-                    .addArgument(event.id())
-                    .log("Successfully published event to topic {} with id {}");
+                    // Then, publish to EventBus for real-time distribution
+                    String eventBusAddress = "events." + topic;
+                    eventBus.publish(eventBusAddress, event);
+
+                    logger.atDebug()
+                            .addArgument(topic)
+                            .addArgument(event.id())
+                            .log("Successfully published event to topic {} with id {}");
+
+                } catch (Exception e) {
+                    logger.atError()
+                            .addArgument(topic)
+                            .addArgument(event.id())
+                            .setCause(e)
+                            .log("Failed to publish event to topic {} with id {}");
+
+                }
+                return null;
+            });
+
+            // First, persist to database using existing Publisher
 
         } catch (Exception e) {
             logger.atError()
@@ -148,7 +163,18 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe
                     .log("Received event on topic {} with id {}");
 
             try {
-                subscriber.onMessage(event);
+                executor.submit(() -> {
+                    try {
+                        subscriber.onMessage(event);
+                    } catch (Exception e) {
+                        logger.atError()
+                                .addArgument(topic)
+                                .addArgument(event.id())
+                                .setCause(e)
+                                .log("Error processing event on topic {} with id {}");
+                    }
+                    return null;
+                });
             } catch (Exception e) {
                 logger.atError()
                         .addArgument(topic)
diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
index bcc96d7..1670a53 100644
--- a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
+++ b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java
@@ -103,7 +103,7 @@ public List fetchEvents(long fromId, long toId, int limit, String topic)
         try {
             CompletableFuture future = new CompletableFuture<>();
 
-            eventBus.request(FETCH_EVENTS_ADDRESS+topic, request, reply -> {
+            eventBus.request(FETCH_EVENTS_ADDRESS + topic, request, reply -> {
                 if (reply.succeeded()) {
                     String eventsJson = (String) reply.result().body();
                     future.complete(eventsJson);
@@ -151,7 +151,7 @@ public long getLatestMessageId(String topic) {
         try {
             CompletableFuture future = new CompletableFuture<>();
 
-            eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS+topic, request, reply -> {
+            eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS + topic, request, reply -> {
                 if (reply.succeeded()) {
                     JsonObject response = (JsonObject) reply.result().body();
                     future.complete(response);

From 9883ccd5d0e9a6fc59e1316b077a33e9186de97b Mon Sep 17 00:00:00 2001
From: Dean Chapman 
Date: Fri, 19 Sep 2025 08:13:37 +0100
Subject: [PATCH 6/9] Refactor dependencies

---
 app/src/main/java/com/p14n/postevent/App.java |  4 +--
 core/build.gradle                             |  4 ---
 .../postevent/broker/DefaultExecutor.java     | 23 +++++++++++----
 .../com/p14n/postevent/db/DatabaseSetup.java  | 28 ++++++-------------
 debezium/build.gradle                         |  3 ++
 .../postevent/LocalPersistentConsumer.java    |  3 +-
 .../java/com/p14n/postevent/db/PoolSetup.java | 23 +++++++++++++++
 .../com/p14n/postevent/ConsumerServer.java    |  4 +--
 vertx/build.gradle                            |  3 --
 9 files changed, 58 insertions(+), 37 deletions(-)
 create mode 100644 debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java

diff --git a/app/src/main/java/com/p14n/postevent/App.java b/app/src/main/java/com/p14n/postevent/App.java
index 268ed92..3e5166d 100644
--- a/app/src/main/java/com/p14n/postevent/App.java
+++ b/app/src/main/java/com/p14n/postevent/App.java
@@ -21,7 +21,7 @@
 import com.p14n.postevent.data.ConfigData;
 import com.p14n.postevent.data.Event;
 import com.p14n.postevent.db.DatabaseSetup;
-
+import com.p14n.postevent.db.PoolSetup;
 import com.p14n.postevent.telemetry.OpenTelemetryFunctions;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.trace.Tracer;
@@ -119,7 +119,7 @@ private static void run(String affinity, String[] write, String[] read, String d
         RemotePersistentConsumer cc = null;
 
         var ot = Opentelemetry.create("postevent");
-        var ds = JdbcTelemetry.create(ot).wrap(DatabaseSetup.createPool(cfg));
+        var ds = JdbcTelemetry.create(ot).wrap(PoolSetup.createPool(cfg));
 
         try {
             if (write.length > 0) {
diff --git a/core/build.gradle b/core/build.gradle
index 8c0d204..a02acbe 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -17,12 +17,8 @@ repositories {
 
 dependencies {
     // Core database and connection pooling
-    implementation 'com.zaxxer:HikariCP:6.2.1'
     implementation 'org.slf4j:slf4j-api:2.0.9'
 
-    // Guava for utilities
-    implementation 'com.google.guava:guava:32.0.0-jre'
-
     // OpenTelemetry core dependencies
     implementation 'io.opentelemetry:opentelemetry-api:1.32.0'
     
diff --git a/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java b/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
index 3d954c3..f2bef1d 100644
--- a/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
+++ b/core/src/main/java/com/p14n/postevent/broker/DefaultExecutor.java
@@ -3,8 +3,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static java.lang.String.format;
 
 /**
  * Default implementation of {@link AsyncExecutor} that provides configurable
@@ -49,6 +50,19 @@ public DefaultExecutor(int scheduledSize, int fixedSize) {
                 this.es = createFixedExecutorService(fixedSize);
         }
 
+        protected ThreadFactory createNamedFactory(String nameFormat,ThreadFactory backingFactory){
+                AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
+                return runnable -> {
+                        Thread thread = backingFactory.newThread(runnable);
+                        if (nameFormat != null) {
+                                thread.setName(format(nameFormat, count.getAndIncrement()));
+                        }
+                        return thread;
+                };
+        }
+        protected ThreadFactory createNamedFactory(String nameFormat) {
+                return createNamedFactory(nameFormat,Executors.defaultThreadFactory());
+        }
         /**
          * Creates a fixed-size thread pool with named threads.
          *
@@ -57,7 +71,7 @@ public DefaultExecutor(int scheduledSize, int fixedSize) {
          */
         protected ExecutorService createFixedExecutorService(int size) {
                 return Executors.newFixedThreadPool(size,
-                                new ThreadFactoryBuilder().setNameFormat("post-event-fixed-%d").build());
+                        createNamedFactory("post-event-fixed-%d"));
         }
 
         /**
@@ -67,8 +81,7 @@ protected ExecutorService createFixedExecutorService(int size) {
          */
         protected ExecutorService createVirtualExecutorService() {
                 return Executors.newThreadPerTaskExecutor(
-                                new ThreadFactoryBuilder().setThreadFactory(Thread.ofVirtual().factory())
-                                                .setNameFormat("post-event-virtual-%d").build());
+                        createNamedFactory("post-event-virtual-%d",Thread.ofVirtual().factory()));
         }
 
         /**
@@ -79,7 +92,7 @@ protected ExecutorService createVirtualExecutorService() {
          */
         protected ScheduledExecutorService createScheduledExecutorService(int size) {
                 return Executors.newScheduledThreadPool(size,
-                                new ThreadFactoryBuilder().setNameFormat("post-event-scheduled-%d").build());
+                                createNamedFactory("post-event-scheduled-%d"));
         }
 
         @Override
diff --git a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
index 1ed10ff..362428b 100644
--- a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
+++ b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
@@ -1,7 +1,6 @@
 package com.p14n.postevent.db;
 
 import com.p14n.postevent.data.PostEventConfig;
-import com.zaxxer.hikari.HikariDataSource;
 
 import javax.sql.DataSource;
 
@@ -35,16 +34,14 @@
  * Example usage:
  * 

* - *
{@code
+ * 
+ * {@code
  * PostEventConfig config = // initialize configuration
  * DatabaseSetup setup = new DatabaseSetup(config);
  * 
  * // Setup all required tables for given topics
  * setup.setupAll(Set.of("orders", "inventory"));
  * 
- * // Create connection pool
- * DataSource pool = DatabaseSetup.createPool(config);
- * }
*/ public class DatabaseSetup { private static final Logger logger = LoggerFactory.getLogger(DatabaseSetup.class); @@ -78,6 +75,7 @@ public DatabaseSetup(String jdbcUrl, String username, String password) { this.password = password; this.ds = null; } + public DatabaseSetup(DataSource ds) { this.jdbcUrl = null; this.username = null; @@ -99,15 +97,18 @@ public DatabaseSetup setupAll(Set topics) { setupDebezium(); return this; } + public DatabaseSetup setupDebezium() { clearOldSlots(); return this; } + public DatabaseSetup setupServer(Set topics) { createSchemaIfNotExists(); topics.stream().forEach(this::createTableIfNotExists); return this; } + public DatabaseSetup setupClient() { createSchemaIfNotExists(); createMessagesTableIfNotExists(); @@ -309,22 +310,9 @@ topic_name VARCHAR(255) PRIMARY KEY, * @throws SQLException if connection fails */ private Connection getConnection() throws SQLException { - if(ds!=null) return ds.getConnection(); + if (ds != null) + return ds.getConnection(); return DriverManager.getConnection(jdbcUrl, username, password); } - /** - * Creates and configures a connection pool using HikariCP. - * - * @param cfg Configuration containing database connection details - * @return Configured DataSource - */ - public static DataSource createPool(PostEventConfig cfg) { - HikariDataSource ds = new HikariDataSource(); - ds.setJdbcUrl(cfg.jdbcUrl()); - ds.setUsername(cfg.dbUser()); - ds.setPassword(cfg.dbPassword()); - return ds; - } - } diff --git a/debezium/build.gradle b/debezium/build.gradle index 64f6442..c6af107 100644 --- a/debezium/build.gradle +++ b/debezium/build.gradle @@ -42,6 +42,9 @@ dependencies { constraints { implementation 'com.google.guava:guava:32.0.0-jre' } + + // Database connection pooling + implementation 'com.zaxxer:HikariCP:5.0.1' // Test dependencies testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0') diff --git a/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java b/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java index c394d2c..a3e89bd 100644 --- a/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java +++ b/debezium/src/main/java/com/p14n/postevent/LocalPersistentConsumer.java @@ -8,6 +8,7 @@ import com.p14n.postevent.data.PostEventConfig; import com.p14n.postevent.data.UnprocessedEventFinder; import com.p14n.postevent.db.DatabaseSetup; +import com.p14n.postevent.db.PoolSetup; import io.opentelemetry.api.OpenTelemetry; @@ -104,7 +105,7 @@ public LocalPersistentConsumer(DataSource ds, PostEventConfig cfg, OpenTelemetry * @param batchSize Maximum number of events to process in a batch */ public LocalPersistentConsumer(PostEventConfig cfg, OpenTelemetry ot) { - this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2, 10), ot, 10); + this(PoolSetup.createPool(cfg), cfg, new DefaultExecutor(2, 10), ot, 10); } /** diff --git a/debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java b/debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java new file mode 100644 index 0000000..3157de1 --- /dev/null +++ b/debezium/src/main/java/com/p14n/postevent/db/PoolSetup.java @@ -0,0 +1,23 @@ +package com.p14n.postevent.db; + +import com.p14n.postevent.data.PostEventConfig; +import com.zaxxer.hikari.HikariDataSource; + +import javax.sql.DataSource; + +public class PoolSetup { + /** + * Creates and configures a connection pool using HikariCP. + * + * @param cfg Configuration containing database connection details + * @return Configured DataSource + */ + public static DataSource createPool(PostEventConfig cfg) { + HikariDataSource ds = new HikariDataSource(); + ds.setJdbcUrl(cfg.jdbcUrl()); + ds.setUsername(cfg.dbUser()); + ds.setPassword(cfg.dbPassword()); + return ds; + } + +} diff --git a/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java b/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java index 672a099..c3d7fb4 100644 --- a/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java +++ b/grpc/src/main/java/com/p14n/postevent/ConsumerServer.java @@ -13,7 +13,7 @@ import com.p14n.postevent.catchup.CatchupServer; import com.p14n.postevent.catchup.remote.CatchupGrpcServer; import com.p14n.postevent.data.ConfigData; -import com.p14n.postevent.db.DatabaseSetup; +import com.p14n.postevent.db.PoolSetup; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -72,7 +72,7 @@ public class ConsumerServer implements AutoCloseable { * @param ot The OpenTelemetry instance for monitoring and tracing */ public ConsumerServer(ConfigData cfg, OpenTelemetry ot) { - this(DatabaseSetup.createPool(cfg), cfg, new DefaultExecutor(2), ot); + this(PoolSetup.createPool(cfg), cfg, new DefaultExecutor(2), ot); } /** diff --git a/vertx/build.gradle b/vertx/build.gradle index f513ea6..c8d3bef 100644 --- a/vertx/build.gradle +++ b/vertx/build.gradle @@ -25,9 +25,6 @@ dependencies { // OpenTelemetry (from core module) implementation 'io.opentelemetry:opentelemetry-api:1.32.0' - // Database connection pooling (for examples) - implementation 'com.zaxxer:HikariCP:5.0.1' - // Logging implementation 'org.slf4j:slf4j-api:2.0.9' From 1075f3125f08a2922318058a930a977450d95290 Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Fri, 19 Sep 2025 08:23:04 +0100 Subject: [PATCH 7/9] Upgrade to vertx 5 --- vertx/build.gradle | 2 +- .../p14n/postevent/vertx/client/EventBusCatchupClient.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/vertx/build.gradle b/vertx/build.gradle index c8d3bef..8c0917a 100644 --- a/vertx/build.gradle +++ b/vertx/build.gradle @@ -20,7 +20,7 @@ dependencies { implementation project(':core') // Vert.x dependencies - implementation 'io.vertx:vertx-core:4.5.1' + implementation 'io.vertx:vertx-core:5.0.4' // OpenTelemetry (from core module) implementation 'io.opentelemetry:opentelemetry-api:1.32.0' diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java index 1670a53..cd3c289 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java @@ -103,7 +103,7 @@ public List fetchEvents(long fromId, long toId, int limit, String topic) try { CompletableFuture future = new CompletableFuture<>(); - eventBus.request(FETCH_EVENTS_ADDRESS + topic, request, reply -> { + eventBus.request(FETCH_EVENTS_ADDRESS + topic, request).andThen(reply -> { if (reply.succeeded()) { String eventsJson = (String) reply.result().body(); future.complete(eventsJson); @@ -151,7 +151,7 @@ public long getLatestMessageId(String topic) { try { CompletableFuture future = new CompletableFuture<>(); - eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS + topic, request, reply -> { + eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS + topic, request).andThen(reply -> { if (reply.succeeded()) { JsonObject response = (JsonObject) reply.result().body(); future.complete(response); From c97c9201f949e1fa400c7f37b7e39e1a699a18fc Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Tue, 23 Sep 2025 19:58:39 +0100 Subject: [PATCH 8/9] Update core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java index 362428b..c7bdc30 100644 --- a/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java +++ b/core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java @@ -77,6 +77,9 @@ public DatabaseSetup(String jdbcUrl, String username, String password) { } public DatabaseSetup(DataSource ds) { + if (ds == null) { + throw new IllegalArgumentException("DataSource must not be null"); + } this.jdbcUrl = null; this.username = null; this.password = null; From ed6189a77575f38d29031666498872f7f0740a2f Mon Sep 17 00:00:00 2001 From: Dean Chapman Date: Tue, 23 Sep 2025 20:35:21 +0100 Subject: [PATCH 9/9] Review feedback --- .../vertx/VertxPersistentConsumer.java | 2 +- .../vertx/adapter/EventBusCatchupService.java | 41 +++--- .../vertx/adapter/EventBusMessageBroker.java | 24 +++- .../vertx/client/EventBusCatchupClient.java | 132 ++++++------------ .../postevent/vertx/codec/EventCodec.java | 6 +- 5 files changed, 86 insertions(+), 119 deletions(-) diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java index 7130cef..c228766 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/VertxPersistentConsumer.java @@ -92,7 +92,7 @@ public void close() { for (AutoCloseable c : closeables) { try { - c.close(); + if(c != null) c.close(); } catch (Exception e) { logger.atWarn() .setCause(e) diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java index 44a25ab..aa06861 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusCatchupService.java @@ -85,24 +85,29 @@ public EventBusCatchupService(CatchupServerInterface catchupServer, * requests. */ public void start() { - logger.atInfo().log("Starting EventBusCatchupService"); - - // Register consumer for fetchEvents requests - fetchEventsConsumers = topics.stream().map(topic -> { - logger.atInfo() - .addArgument(FETCH_EVENTS_ADDRESS + topic) - .log("EventBusCatchupService started, listening on address: {}"); - - return eventBus.consumer(FETCH_EVENTS_ADDRESS + topic, this::handleFetchEvents); - }).toList(); - - // Register consumer for getLatestMessageId requests - getLatestMessageIdConsumers = topics.stream().map(topic -> { - logger.atInfo() - .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS + topic) - .log("EventBusCatchupService started, listening on address: {}"); - return eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS + topic, this::handleGetLatestMessageId); - }).toList(); + if(fetchEventsConsumers == null) { + + logger.atInfo().log("Starting EventBusCatchupService"); + + // Register consumer for fetchEvents requests + fetchEventsConsumers = topics.stream().map(topic -> { + logger.atInfo() + .addArgument(FETCH_EVENTS_ADDRESS + topic) + .log("EventBusCatchupService started, listening on address: {}"); + + return eventBus.consumer(FETCH_EVENTS_ADDRESS + topic, this::handleFetchEvents); + }).toList(); + + // Register consumer for getLatestMessageId requests + getLatestMessageIdConsumers = topics.stream().map(topic -> { + logger.atInfo() + .addArgument(GET_LATEST_MESSAGE_ID_ADDRESS + topic) + .log("EventBusCatchupService started, listening on address: {}"); + return eventBus.consumer(GET_LATEST_MESSAGE_ID_ADDRESS + topic, this::handleGetLatestMessageId); + }).toList(); + } else { + logger.atInfo().log("EventBusCatchupService already started"); + } } diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java index 5e585b3..f96e068 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/adapter/EventBusMessageBroker.java @@ -11,6 +11,8 @@ import io.vertx.core.eventbus.MessageConsumer; import javax.sql.DataSource; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -62,7 +64,7 @@ public class EventBusMessageBroker extends EventMessageBroker { private static final Logger logger = LoggerFactory.getLogger(EventBusMessageBroker.class); private final EventBus eventBus; private final DataSource dataSource; - private final Map> consumers = new ConcurrentHashMap<>(); + private final Map>> consumers = new ConcurrentHashMap<>(); private final AsyncExecutor executor; /** @@ -185,7 +187,13 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe }); // Store consumer for potential cleanup - consumers.put(topic, consumer); + consumers.compute(topic, (k,l) -> { + if(l == null){ + l = new ArrayList<>(); + } + l.add(consumer); + return l; + }); logger.atInfo() .addArgument(topic) @@ -199,9 +207,11 @@ public void subscribeToEventBus(String topic, MessageSubscriber subscribe * @param topic The topic to unsubscribe from */ public void unsubscribe(String topic) { - MessageConsumer consumer = consumers.remove(topic); - if (consumer != null) { - consumer.unregister(); + List> consumerList = consumers.remove(topic); + if (consumerList != null) { + for(var consumer: consumerList){ + consumer.unregister(); + } logger.atInfo() .addArgument(topic) .log("Unsubscribed from topic: {}"); @@ -216,7 +226,9 @@ public void close() { logger.atInfo().log("Closing EventBusMessageBroker"); // Unregister all consumers - consumers.values().forEach(MessageConsumer::unregister); + consumers.values().forEach( l -> { + l.forEach(MessageConsumer::unregister); + }); consumers.clear(); super.close(); diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java index cd3c289..8b1b1b7 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/client/EventBusCatchupClient.java @@ -1,5 +1,6 @@ package com.p14n.postevent.vertx.client; +import com.fasterxml.jackson.core.type.TypeReference; import com.p14n.postevent.catchup.CatchupServerInterface; import com.p14n.postevent.data.Event; import io.vertx.core.eventbus.EventBus; @@ -9,6 +10,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,109 +76,55 @@ public EventBusCatchupClient(EventBus eventBus, long timeoutSeconds) { this.timeoutSeconds = timeoutSeconds; } - /** - * Fetches events within the specified ID range for a topic. - * Sends a request to the EventBusCatchupService and waits for the response. - * - * @param fromId The starting event ID (inclusive) - * @param toId The ending event ID (inclusive) - * @param limit Maximum number of events to return - * @param topic The topic to fetch events from - * @return List of events within the specified range - * @throws Exception If the request fails or times out - */ - @Override - public List fetchEvents(long fromId, long toId, int limit, String topic) { - logger.atDebug() - .addArgument(fromId) - .addArgument(toId) - .addArgument(limit) - .addArgument(topic) - .log("Fetching events: fromId={}, toId={}, limit={}, topic={}"); - - JsonObject request = new JsonObject() - .put("fromId", fromId) - .put("toId", toId) - .put("limit", limit) - .put("topic", topic); + private R requestAndDecode( + String address, + JsonObject payload, + Function decoder + ) { try { - CompletableFuture future = new CompletableFuture<>(); - - eventBus.request(FETCH_EVENTS_ADDRESS + topic, request).andThen(reply -> { - if (reply.succeeded()) { - String eventsJson = (String) reply.result().body(); - future.complete(eventsJson); + CompletableFuture fut = new CompletableFuture<>(); + eventBus.request(address, payload).andThen( ar -> { + if (ar.succeeded()) { + fut.complete(ar.result().body()); } else { - future.completeExceptionally(new RuntimeException( - "Failed to fetch events: " + reply.cause().getMessage(), reply.cause())); + fut.completeExceptionally( + new RuntimeException("Bus request failed: " + ar.cause().getMessage(), ar.cause()) + ); } }); - - String eventsJson = future.get(timeoutSeconds, TimeUnit.SECONDS); - List events = Json.decodeValue(eventsJson, List.class); - - logger.atDebug() - .addArgument(events.size()) - .addArgument(topic) - .log("Successfully fetched {} events for topic {}", events.size(), topic); - - return events; - + Object body = fut.get(timeoutSeconds, TimeUnit.SECONDS); + return decoder.apply(body); } catch (Exception e) { - logger.atError() - .addArgument(topic) - .setCause(e) - .log("Error fetching events for topic {}", topic); - throw new RuntimeException("Failed to fetch events for topic " + topic, e); + throw new RuntimeException("Request to " + address + " failed", e); } } - /** - * Gets the latest message ID for a specific topic. - * Sends a request to the EventBusCatchupService and waits for the response. - * - * @param topic The topic to get the latest message ID for - * @return The latest message ID for the topic - * @throws Exception If the request fails or times out - */ @Override - public long getLatestMessageId(String topic) { - logger.atDebug() - .addArgument(topic) - .log("Getting latest message ID for topic: {}"); - - JsonObject request = new JsonObject().put("topic", topic); - - try { - CompletableFuture future = new CompletableFuture<>(); - - eventBus.request(GET_LATEST_MESSAGE_ID_ADDRESS + topic, request).andThen(reply -> { - if (reply.succeeded()) { - JsonObject response = (JsonObject) reply.result().body(); - future.complete(response); - } else { - future.completeExceptionally(new RuntimeException( - "Failed to get latest message ID: " + reply.cause().getMessage(), reply.cause())); - } - }); - - JsonObject response = future.get(timeoutSeconds, TimeUnit.SECONDS); - long latestId = response.getLong("latestId"); - - logger.atDebug() - .addArgument(latestId) - .addArgument(topic) - .log("Successfully retrieved latest message ID {} for topic {}", latestId, topic); + public List fetchEvents(long fromId, long toId, int limit, String topic) { + JsonObject req = new JsonObject() + .put("fromId", fromId) + .put("toId", toId) + .put("limit", limit) + .put("topic", topic); - return latestId; + // decode the reply-body string into List + return requestAndDecode( + FETCH_EVENTS_ADDRESS + topic, + req, + body -> Json.decodeValue((String) body, List.class ) + ); + } - } catch (Exception e) { - logger.atError() - .addArgument(topic) - .setCause(e) - .log("Error getting latest message ID for topic {}", topic); - throw new RuntimeException("Failed to get latest message ID for topic " + topic, e); - } + @Override + public long getLatestMessageId(String topic) { + JsonObject req = new JsonObject().put("topic", topic); + + // extract "latestId" from the returned JsonObject + return requestAndDecode( + GET_LATEST_MESSAGE_ID_ADDRESS + topic, + req, + body -> ((JsonObject) body).getLong("latestId") + ); } } diff --git a/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java index 4945481..10271d9 100644 --- a/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java +++ b/vertx/src/main/java/com/p14n/postevent/vertx/codec/EventCodec.java @@ -5,6 +5,8 @@ import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.json.Json; +import java.nio.charset.StandardCharsets; + /** * MessageCodec for serializing Event objects on the Vert.x EventBus. * This codec handles the conversion between Event objects and their wire format @@ -32,7 +34,7 @@ public class EventCodec implements MessageCodec { @Override public void encodeToWire(Buffer buffer, Event event) { String json = Json.encode(event); - byte[] jsonBytes = json.getBytes(); + byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8); // Write length prefix followed by JSON bytes buffer.appendInt(jsonBytes.length); @@ -54,7 +56,7 @@ public Event decodeFromWire(int pos, Buffer buffer) { // Read JSON bytes and convert to string byte[] jsonBytes = buffer.getBytes(pos + 4, pos + 4 + length); - String json = new String(jsonBytes); + String json = new String(jsonBytes,StandardCharsets.UTF_8); // Deserialize from JSON return Json.decodeValue(json, Event.class);