Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ WORKDIR /app
# Copy Maven wrapper and pom.xml first for better layer caching
COPY mvnw .
COPY mvnw.cmd .
COPY spotless ./spotless
COPY .mvn .mvn
COPY pom.xml .

Expand Down
1 change: 1 addition & 0 deletions Dockerfile.scale
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ COPY mvnw .
COPY mvnw.cmd .
COPY .mvn .mvn
COPY pom.xml .
COPY spotless ./spotless

RUN ./mvnw dependency:go-offline -B

Expand Down
1 change: 1 addition & 0 deletions Dockerfile.scale.standard
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ COPY mvnw .
COPY mvnw.cmd .
COPY .mvn .mvn
COPY pom.xml .
COPY spotless ./spotless

RUN ./mvnw dependency:go-offline -B

Expand Down
1 change: 1 addition & 0 deletions Dockerfile.standard
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ COPY mvnw .
COPY mvnw.cmd .
COPY .mvn .mvn
COPY pom.xml .
COPY spotless ./spotless

RUN ./mvnw dependency:go-offline -B

Expand Down
83 changes: 47 additions & 36 deletions README.md

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,9 @@
<version>3.0.0</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.17.0</version>
<style>GOOGLE</style>
</googleJavaFormat>
<eclipse>
<file>${project.basedir}/spotless/java-formatter.xml</file>
</eclipse>
<removeUnusedImports/>
<trimTrailingWhitespace/>
<endWithNewline/>
Expand Down
51 changes: 21 additions & 30 deletions src/main/java/fish/payara/resource/HelloWorldResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,28 @@
@Path("hello")
public class HelloWorldResource {

@Inject
@ConfigProperty(name = "defaultName", defaultValue = "world")
private String defaultName;
@Inject
@ConfigProperty(name = "defaultName", defaultValue = "world")
private String defaultName;

@GET
@Operation(summary = "Get a personalized greeting")
@APIResponses(
value = {
@APIResponse(responseCode = "200", description = "Successful operation"),
@APIResponse(responseCode = "400", description = "Invalid input")
})
@Counted(name = "helloEndpointCount", description = "Count of calls to the hello endpoint")
@Timed(name = "helloEndpointTime", description = "Time taken to execute the hello endpoint")
@Timeout(3000)
@Retry(maxRetries = 3)
@Fallback(fallbackMethod = "fallbackMethod")
public Response hello(
@QueryParam("name")
@Parameter(
name = "name",
description = "Name to include in the greeting",
required = false,
example = "John")
String name) {
if ((name == null) || name.trim().isEmpty()) {
name = defaultName;
@GET
@Operation(summary = "Get a personalized greeting")
@APIResponses(value = {@APIResponse(responseCode = "200", description = "Successful operation"),
@APIResponse(responseCode = "400", description = "Invalid input")})
@Counted(name = "helloEndpointCount", description = "Count of calls to the hello endpoint")
@Timed(name = "helloEndpointTime", description = "Time taken to execute the hello endpoint")
@Timeout(3000)
@Retry(maxRetries = 3)
@Fallback(fallbackMethod = "fallbackMethod")
public Response hello(
@QueryParam("name") @Parameter(name = "name", description = "Name to include in the greeting", required = false, example = "John") String name) {
if ((name == null) || name.trim().isEmpty()) {
name = defaultName;
}
return Response.ok(name).build();
}
return Response.ok(name).build();
}

public Response fallbackMethod(@QueryParam("name") String name) {
return Response.ok("Fallback data").build();
}
public Response fallbackMethod(@QueryParam("name") String name) {
return Response.ok("Fallback data").build();
}
}
225 changes: 105 additions & 120 deletions src/main/java/fish/payara/trader/aeron/AeronSubscriberBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,152 +21,137 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Aeron Ingress Singleton Bean Launches an embedded MediaDriver and subscribes to market data
* stream. Uses SBE decoders for zero-copy message processing. Runs in a dedicated thread to
* continuously poll for messages. IMPORTANT: This must initialize BEFORE MarketDataPublisher
* Aeron Ingress Singleton Bean Launches an embedded MediaDriver and subscribes to market data stream. Uses SBE decoders for zero-copy message processing. Runs
* in a dedicated thread to continuously poll for messages. IMPORTANT: This must initialize BEFORE MarketDataPublisher
*/
@ApplicationScoped
public class AeronSubscriberBean {

private static final Logger LOGGER = Logger.getLogger(AeronSubscriberBean.class.getName());
private static final Logger LOGGER = Logger.getLogger(AeronSubscriberBean.class.getName());

private static final String CHANNEL = "aeron:ipc";
private static final int STREAM_ID = 1001;
private static final int FRAGMENT_LIMIT = 10;
private static final String CHANNEL = "aeron:ipc";
private static final int STREAM_ID = 1001;
private static final int FRAGMENT_LIMIT = 10;

private MediaDriver mediaDriver;
private Aeron aeron;
private Subscription subscription;
private volatile boolean running = false;
private Future<?> pollingFuture;
private MediaDriver mediaDriver;
private Aeron aeron;
private Subscription subscription;
private volatile boolean running = false;
private Future<?> pollingFuture;

@Inject private MarketDataFragmentHandler fragmentHandler;
@Inject
private MarketDataFragmentHandler fragmentHandler;

@Inject @VirtualThreadExecutor private ManagedExecutorService managedExecutorService;
@Inject
@VirtualThreadExecutor
private ManagedExecutorService managedExecutorService;

@Inject
@ConfigProperty(name = "TRADER_INGESTION_MODE", defaultValue = "AERON")
private String ingestionMode;
@Inject
@ConfigProperty(name = "TRADER_INGESTION_MODE", defaultValue = "AERON")
private String ingestionMode;

void contextInitialized(@Observes @Initialized(ApplicationScoped.class) Object event) {
managedExecutorService.submit(this::init);
}

public void init() {
if ("DIRECT".equalsIgnoreCase(ingestionMode)) {
LOGGER.info("Running in DIRECT mode - Skipping Aeron/MediaDriver initialization.");
return;
void contextInitialized(@Observes @Initialized(ApplicationScoped.class) Object event) {
managedExecutorService.submit(this::init);
}

LOGGER.info("Initializing Aeron Subscriber Bean...");

try {
LOGGER.info("Launching embedded MediaDriver...");
mediaDriver =
MediaDriver.launchEmbedded(
new MediaDriver.Context()
.threadingMode(ThreadingMode.SHARED)
.dirDeleteOnStart(true)
.dirDeleteOnShutdown(true));

LOGGER.info("MediaDriver launched at: " + mediaDriver.aeronDirectoryName());
LOGGER.info("Connecting Aeron client...");
aeron =
Aeron.connect(
new Aeron.Context()
.aeronDirectoryName(mediaDriver.aeronDirectoryName())
.errorHandler(this::onError)
.availableImageHandler(
image -> LOGGER.info("Available image: " + image.sourceIdentity()))
.unavailableImageHandler(
image -> LOGGER.info("Unavailable image: " + image.sourceIdentity())));
LOGGER.info("Adding subscription on channel: " + CHANNEL + ", stream: " + STREAM_ID);
subscription = aeron.addSubscription(CHANNEL, STREAM_ID);
startPolling();
LOGGER.info("Aeron Subscriber Bean initialized successfully");

} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed to initialize Aeron Subscriber Bean", e);
cleanup();
throw new RuntimeException("Failed to initialize Aeron", e);
public void init() {
if ("DIRECT".equalsIgnoreCase(ingestionMode)) {
LOGGER.info("Running in DIRECT mode - Skipping Aeron/MediaDriver initialization.");
return;
}

LOGGER.info("Initializing Aeron Subscriber Bean...");

try {
LOGGER.info("Launching embedded MediaDriver...");
mediaDriver = MediaDriver
.launchEmbedded(new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).dirDeleteOnStart(true).dirDeleteOnShutdown(true));

LOGGER.info("MediaDriver launched at: " + mediaDriver.aeronDirectoryName());
LOGGER.info("Connecting Aeron client...");
aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName())
.errorHandler(this::onError)
.availableImageHandler(image -> LOGGER.info("Available image: " + image.sourceIdentity()))
.unavailableImageHandler(image -> LOGGER.info("Unavailable image: " + image.sourceIdentity())));
LOGGER.info("Adding subscription on channel: " + CHANNEL + ", stream: " + STREAM_ID);
subscription = aeron.addSubscription(CHANNEL, STREAM_ID);
startPolling();
LOGGER.info("Aeron Subscriber Bean initialized successfully");

} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed to initialize Aeron Subscriber Bean", e);
cleanup();
throw new RuntimeException("Failed to initialize Aeron", e);
}
}
}

/** Start background task to continuously poll for messages */
private void startPolling() {
running = true;
pollingFuture =
managedExecutorService.submit(
() -> {
LOGGER.info("Aeron polling task started");

final IdleStrategy idleStrategy =
new BackoffIdleStrategy(
100,
10,
TimeUnit.MICROSECONDS.toNanos(1),
TimeUnit.MICROSECONDS.toNanos(100));

while (running && !Thread.currentThread().isInterrupted()) {

/** Start background task to continuously poll for messages */
private void startPolling() {
running = true;
pollingFuture = managedExecutorService.submit(() -> {
LOGGER.info("Aeron polling task started");

final IdleStrategy idleStrategy = new BackoffIdleStrategy(100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100));

while (running && !Thread.currentThread().isInterrupted()) {
try {
final int fragmentsRead = subscription.poll(fragmentHandler, FRAGMENT_LIMIT);
final int fragmentsRead = subscription.poll(fragmentHandler, FRAGMENT_LIMIT);

idleStrategy.idle(fragmentsRead);
idleStrategy.idle(fragmentsRead);

} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error polling subscription", e);
LOGGER.log(Level.SEVERE, "Error polling subscription", e);
}
}
}

LOGGER.info("Aeron polling task stopped");
});
}

LOGGER.info("Aeron polling task stopped");
});
}
/** Error handler for Aeron */
private void onError(Throwable throwable) {
LOGGER.log(Level.SEVERE, "Aeron error occurred", throwable);
}

/** Error handler for Aeron */
private void onError(Throwable throwable) {
LOGGER.log(Level.SEVERE, "Aeron error occurred", throwable);
}
@PreDestroy
public void shutdown() {
LOGGER.info("Shutting down Aeron Subscriber Bean...");
running = false;

@PreDestroy
public void shutdown() {
LOGGER.info("Shutting down Aeron Subscriber Bean...");
running = false;
if (pollingFuture != null) {
pollingFuture.cancel(true);
}

if (pollingFuture != null) {
pollingFuture.cancel(true);
cleanup();
LOGGER.info("Aeron Subscriber Bean shut down");
}

cleanup();
LOGGER.info("Aeron Subscriber Bean shut down");
}

/** Clean up all Aeron resources */
private void cleanup() {
CloseHelper.quietClose(subscription);
CloseHelper.quietClose(aeron);
CloseHelper.quietClose(mediaDriver);
}

/** Get subscription statistics */
public String getStatus() {
if (subscription != null) {
return String.format(
"Channel: %s, Stream: %d, Images: %d, Running: %b",
subscription.channel(), subscription.streamId(), subscription.imageCount(), running);
/** Clean up all Aeron resources */
private void cleanup() {
CloseHelper.quietClose(subscription);
CloseHelper.quietClose(aeron);
CloseHelper.quietClose(mediaDriver);
}
return "Not initialized";
}

/** Get the Aeron directory name for connecting publishers */
public String getAeronDirectoryName() {
if (mediaDriver != null) {
return mediaDriver.aeronDirectoryName();
/** Get subscription statistics */
public String getStatus() {
if (subscription != null) {
return String.format("Channel: %s, Stream: %d, Images: %d, Running: %b", subscription.channel(), subscription.streamId(), subscription.imageCount(),
running);
}
return "Not initialized";
}
return null;
}

/** Check if the MediaDriver is ready */
public boolean isReady() {
return mediaDriver != null && aeron != null && subscription != null && running;
}
/** Get the Aeron directory name for connecting publishers */
public String getAeronDirectoryName() {
if (mediaDriver != null) {
return mediaDriver.aeronDirectoryName();
}
return null;
}

/** Check if the MediaDriver is ready */
public boolean isReady() {
return mediaDriver != null && aeron != null && subscription != null && running;
}
}
Loading