Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/src/main/java/com/p14n/postevent/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.p14n.postevent.data.ConfigData;
import com.p14n.postevent.data.Event;
import com.p14n.postevent.db.DatabaseSetup;

import com.p14n.postevent.db.PoolSetup;
import com.p14n.postevent.telemetry.OpenTelemetryFunctions;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
Expand Down Expand Up @@ -119,7 +119,7 @@ private static void run(String affinity, String[] write, String[] read, String d
RemotePersistentConsumer cc = null;

var ot = Opentelemetry.create("postevent");
var ds = JdbcTelemetry.create(ot).wrap(DatabaseSetup.createPool(cfg));
var ds = JdbcTelemetry.create(ot).wrap(PoolSetup.createPool(cfg));

try {
if (write.length > 0) {
Expand Down
178 changes: 178 additions & 0 deletions build.gradle.ref
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import com.vanniktech.maven.publish.SonatypeHost
import com.vanniktech.maven.publish.JavaLibrary
import com.vanniktech.maven.publish.JavadocJar

plugins {
id 'java'
id 'com.adarshr.test-logger' version '4.0.0'
id 'com.google.protobuf' version '0.9.2'
id "com.vanniktech.maven.publish" version "0.31.0"
}

compileJava {
sourceCompatibility = 21
targetCompatibility = 21
}

group = 'com.p14n'
version = '1.0.1-SNAPSHOT'

repositories {
mavenCentral()
}

dependencies {
implementation 'io.debezium:debezium-api:3.0.1.Final'
implementation ('io.debezium:debezium-embedded:3.0.1.Final') {
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
exclude group: 'org.eclipse.jetty'
}
implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final'
implementation 'io.debezium:debezium-storage-jdbc:3.0.1.Final'
implementation 'org.slf4j:slf4j-api:2.0.9'
implementation 'com.zaxxer:HikariCP:6.2.1'

constraints {
implementation 'com.google.guava:guava:32.0.0-jre'
}

// gRPC dependencies
implementation 'io.grpc:grpc-netty-shaded:1.53.0'
implementation 'io.grpc:grpc-protobuf:1.53.0'
implementation 'io.grpc:grpc-stub:1.53.0'
implementation 'io.grpc:grpc-api:1.53.0'

implementation 'javax.annotation:javax.annotation-api:1.3.2'

// For code generation
implementation 'com.google.protobuf:protobuf-java:3.21.7'

testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0')
testImplementation 'io.zonky.test:embedded-postgres:2.0.7'
testImplementation 'net.jqwik:jqwik:1.8.2'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0'
testRuntimeOnly 'ch.qos.logback:logback-classic:1.4.11'
testImplementation 'org.mockito:mockito-core:3.12.4'

// OpenTelemetry core dependencies
implementation 'io.opentelemetry:opentelemetry-api:1.32.0'

// Instrumentation for GRPC
implementation 'io.opentelemetry.instrumentation:opentelemetry-grpc-1.6:1.32.0-alpha'

}

testlogger {
theme 'standard'
showExceptions true
showStackTraces true
showFullStackTraces false
showCauses true
slowThreshold 2000
showSummary true
showSimpleNames false
showPassed true
showSkipped true
showFailed true
showStandardStreams false
showPassedStandardStreams true
showSkippedStandardStreams true
showFailedStandardStreams true
}

test {
useJUnitPlatform {
includeEngines 'jqwik', 'junit-jupiter'
}
maxHeapSize = "1G"
minHeapSize = "512M"
maxParallelForks = 1
failFast = true
testLogging.showStandardStreams = true
}

task fastTest( type: Test ) {
useJUnitPlatform {
includeEngines 'junit-jupiter'
exclude '**/dst/**'

}
}

jar {
manifest {
}
}

// Configure Protobuf plugin
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.21.7'
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.53.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}

sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}

tasks.withType(Jar) {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}

javadoc {
exclude "**/grpc/**"
source = sourceSets.main.allJava
}

mavenPublishing {

configure(new JavaLibrary(new JavadocJar.Javadoc(), true))

publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL, true)

signAllPublications()

coordinates("com.p14n", "postevent", version)

pom {
name = "Postevent"
description = 'A reliable event publishing and consumption system using PostgreSQL and gRPC'
inceptionYear = "2025"
url = "https://github.com/p14n/postevent/"
licenses {
license {
name = 'MIT License'
url = 'https://opensource.org/licenses/MIT'
}
}
developers {
developer {
id = 'p14n'
name = 'Dean Chapman'
email = 'dean@p14n.com'
}
}
scm {
connection = 'scm:git:git://github.com/p14n/postevent.git'
developerConnection = 'scm:git:ssh://github.com:p14n/postevent.git'
url = 'https://github.com/p14n/postevent'
}
}
}
4 changes: 0 additions & 4 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ repositories {

dependencies {
// Core database and connection pooling
implementation 'com.zaxxer:HikariCP:6.2.1'
implementation 'org.slf4j:slf4j-api:2.0.9'

// Guava for utilities
implementation 'com.google.guava:guava:32.0.0-jre'

// OpenTelemetry core dependencies
implementation 'io.opentelemetry:opentelemetry-api:1.32.0'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static java.lang.String.format;

/**
* Default implementation of {@link AsyncExecutor} that provides configurable
Expand Down Expand Up @@ -49,6 +50,19 @@ public DefaultExecutor(int scheduledSize, int fixedSize) {
this.es = createFixedExecutorService(fixedSize);
}

protected ThreadFactory createNamedFactory(String nameFormat,ThreadFactory backingFactory){
AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
return runnable -> {
Thread thread = backingFactory.newThread(runnable);
if (nameFormat != null) {
thread.setName(format(nameFormat, count.getAndIncrement()));
}
return thread;
};
}
protected ThreadFactory createNamedFactory(String nameFormat) {
return createNamedFactory(nameFormat,Executors.defaultThreadFactory());
}
/**
* Creates a fixed-size thread pool with named threads.
*
Expand All @@ -57,7 +71,7 @@ public DefaultExecutor(int scheduledSize, int fixedSize) {
*/
protected ExecutorService createFixedExecutorService(int size) {
return Executors.newFixedThreadPool(size,
new ThreadFactoryBuilder().setNameFormat("post-event-fixed-%d").build());
createNamedFactory("post-event-fixed-%d"));
}

/**
Expand All @@ -67,8 +81,7 @@ protected ExecutorService createFixedExecutorService(int size) {
*/
protected ExecutorService createVirtualExecutorService() {
return Executors.newThreadPerTaskExecutor(
new ThreadFactoryBuilder().setThreadFactory(Thread.ofVirtual().factory())
.setNameFormat("post-event-virtual-%d").build());
createNamedFactory("post-event-virtual-%d",Thread.ofVirtual().factory()));
}

/**
Expand All @@ -79,7 +92,7 @@ protected ExecutorService createVirtualExecutorService() {
*/
protected ScheduledExecutorService createScheduledExecutorService(int size) {
return Executors.newScheduledThreadPool(size,
new ThreadFactoryBuilder().setNameFormat("post-event-scheduled-%d").build());
createNamedFactory("post-event-scheduled-%d"));
}

@Override
Expand Down
56 changes: 35 additions & 21 deletions core/src/main/java/com/p14n/postevent/db/DatabaseSetup.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.p14n.postevent.db;

import com.p14n.postevent.data.PostEventConfig;
import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;

Expand Down Expand Up @@ -35,16 +34,14 @@
* Example usage:
* </p>
*
* <pre>{@code
* <pre>
* {@code
* PostEventConfig config = // initialize configuration
* DatabaseSetup setup = new DatabaseSetup(config);
*
* // Setup all required tables for given topics
* setup.setupAll(Set.of("orders", "inventory"));
*
* // Create connection pool
* DataSource pool = DatabaseSetup.createPool(config);
* }</pre>
*/
public class DatabaseSetup {
private static final Logger logger = LoggerFactory.getLogger(DatabaseSetup.class);
Expand All @@ -53,6 +50,8 @@ public class DatabaseSetup {
private final String username;
private final String password;

private final DataSource ds;

/**
* Creates a new DatabaseSetup instance using configuration from
* PostEventConfig.
Expand All @@ -74,6 +73,17 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.ds = null;
}

public DatabaseSetup(DataSource ds) {
if (ds == null) {
throw new IllegalArgumentException("DataSource must not be null");
}
this.jdbcUrl = null;
this.username = null;
this.password = null;
this.ds = ds;
}

/**
Expand All @@ -85,11 +95,27 @@ public DatabaseSetup(String jdbcUrl, String username, String password) {
* @throws RuntimeException if database operations fail
*/
public DatabaseSetup setupAll(Set<String> topics) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring to centralize schema creation and unify constructors to eliminate duplication and null-checks.

Here are two small refactorings that will remove the duplication and the “null-check” complexity without changing any public API or behavior:

  1. Collapse all schema-creation into the single entry point (setupAll) so that setupClient/setupServer no longer need to each call createSchemaIfNotExists().
public DatabaseSetup setupAll(Set<String> topics) {
    createSchemaIfNotExists();          // only here
    setupClient();
    setupServer(topics);
    setupDebezium();
    return this;
}

public DatabaseSetup setupClient() {
    createMessagesTableIfNotExists();
    createContiguousHwmTableIfNotExists();
    return this;
}

public DatabaseSetup setupServer(Set<String> topics) {
    topics.forEach(this::createTableIfNotExists);
    return this;
}

public DatabaseSetup setupDebezium() {
    clearOldSlots();
    return this;
}
  1. Unify your constructors so you always hold exactly one DataSource (and drop the JDBC fields + null‐check in getConnection()):
// primary ctor
public DatabaseSetup(DataSource ds) {
  this.ds = ds;
}

// convenience ctors chain to primary
public DatabaseSetup(PostEventConfig cfg) {
  this(createPool(cfg));
}

public DatabaseSetup(String jdbcUrl, String username, String password) {
  this(createPool(new PostEventConfig(jdbcUrl, username, password)));
}

private Connection getConnection() throws SQLException {
  return ds.getConnection();
}

This way you:

  • Call createSchemaIfNotExists() exactly once.
  • Eliminate the if(ds!=null)… branch.
  • Keep all four public API methods intact.

setupClient();
setupServer(topics);
setupDebezium();
return this;
}

public DatabaseSetup setupDebezium() {
clearOldSlots();
return this;
}

public DatabaseSetup setupServer(Set<String> topics) {
createSchemaIfNotExists();
topics.stream().forEach(this::createTableIfNotExists);
return this;
}

public DatabaseSetup setupClient() {
createSchemaIfNotExists();
createMessagesTableIfNotExists();
createContiguousHwmTableIfNotExists();
topics.stream().forEach(this::createTableIfNotExists);
clearOldSlots();
return this;
}

Expand Down Expand Up @@ -287,21 +313,9 @@ topic_name VARCHAR(255) PRIMARY KEY,
* @throws SQLException if connection fails
*/
private Connection getConnection() throws SQLException {
if (ds != null)
return ds.getConnection();
return DriverManager.getConnection(jdbcUrl, username, password);
}

/**
* Creates and configures a connection pool using HikariCP.
*
* @param cfg Configuration containing database connection details
* @return Configured DataSource
*/
public static DataSource createPool(PostEventConfig cfg) {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(cfg.jdbcUrl());
ds.setUsername(cfg.dbUser());
ds.setPassword(cfg.dbPassword());
return ds;
}

}
3 changes: 3 additions & 0 deletions debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ dependencies {
constraints {
implementation 'com.google.guava:guava:32.0.0-jre'
}

// Database connection pooling
implementation 'com.zaxxer:HikariCP:5.0.1'

// Test dependencies
testImplementation platform('io.zonky.test.postgres:embedded-postgres-binaries-bom:16.2.0')
Expand Down
Loading
Loading