Formal API contracts, data model, behavioral rules, configuration, and observability for the outbox-java framework.
For project goals and acceptance criteria, see OBJECTIVE.md. For tutorials and code examples, see TUTORIAL.md.
- Architecture Overview
- Modules
- Core Abstractions
- Data Model
- Event Envelope
- Type-Safe Event and Aggregate Types
- Public API
- JDBC Outbox Store
- OutboxDispatcher
- OutboxPoller
- Registries
- Retry Policy
- Backpressure and Downgrade
- Configuration
- Observability
- Thread Safety
- Event Purge
- Dead Event Management
- Ordered Delivery
- Outbox Composite Builder
| Component | Responsibility |
|---|---|
| Outbox | Composite builder (singleNode/multiNode/ordered) that wires dispatcher, poller, and writer into a single AutoCloseable |
| OutboxWriter | API used by business code inside a transaction context |
| TxContext | Abstraction for transaction lifecycle hooks (afterCommit/afterRollback) |
| OutboxStore | Insert/update/query via java.sql.Connection |
| OutboxDispatcher | Hot/cold queues + worker pool; executes listeners; updates status |
| ListenerRegistry | Maps event types to event listeners |
| OutboxPoller | Low-frequency fallback DB scan; forwards events to handler |
| OutboxPurgeScheduler | Scheduled purge of terminal events (DONE/DEAD) older than retention |
| InFlightTracker | In-memory deduplication |
+--------------------------------------------------------------+
| Transaction Scope |
| |
| Business Code --> OutboxWriter.write() |
| | |
| +--> OutboxStore.insertNew() --> [DB] |
| | |
| +--> TxContext.afterCommit(hook) |
+-------------------------+-------------------------+-----------+
| |
afterCommit hook poll pending
| |
v v
HOT PATH COLD PATH
| |
v v
WriterHook.afterCommit() OutboxPoller.poll()
Dispatcher.enqueueHot() pollPending()/claimPending()
| Handler.handle()
| Dispatcher.enqueueCold()
| |
+-----------+-------------+
|
v
OutboxDispatcher.process()
-> ListenerRegistry.listenerFor()
-> EventListener.onEvent()
-> markDone/Deferred/Retry/Dead()
OutboxDispatcher MUST prioritize:
- Hot Queue: afterCommit enqueue from business thread (priority)
- Cold Queue: poller/handler fallback
Core interfaces, hooks, dispatcher, poller, and registries. Zero external dependencies.
Packages:
io.outbox- Main API: Outbox (composite builder), OutboxWriter (interface), DefaultOutboxWriter, EventEnvelope, EventType, AggregateType, EventListener, BoundEventListener, DispatchResult (sealed interface: Done, RetryAfter, Dead), RetryAfterException, EventException, RecoverableException, UnrecoverableException, PayloadParseException, WriterHookio.outbox.spi- Extension point interfaces: TxContext, ConnectionProvider, OutboxStore, EventPurger, MetricsExporter, JsonCodec (SPI interface), JsonCodecHolder (ServiceLoader discovery)io.outbox.model- Domain objects: OutboxEvent, EventStatusio.outbox.dispatch- OutboxDispatcher, retry policy, inflight trackingio.outbox.poller- OutboxPoller, OutboxPollerHandlerio.outbox.registry- Listener registryio.outbox.purge- OutboxPurgeScheduler (scheduled purge of terminal events)io.outbox.dead- DeadEventManager (connection-managed facade for dead event queries)io.outbox.util- DaemonThreadFactory
JDBC outbox store hierarchy, event purger hierarchy, and manual transaction helpers.
Packages:
io.outbox.jdbc— Shared utilities: JdbcTemplate, OutboxStoreException, DataSourceConnectionProviderio.outbox.jdbc.store— OutboxStore hierarchy (ServiceLoader-registered)io.outbox.jdbc.purge— EventPurger hierarchyio.outbox.jdbc.tx— Transaction management
Classes by package:
io.outbox.jdbc.store
AbstractJdbcOutboxStore- Base outbox store with shared SQL, row mapper, and H2-compatible default claimH2OutboxStore- H2 (inherits default subquery-based claim)MySqlOutboxStore- MySQL/TiDB (UPDATE...ORDER BY...LIMIT claim)PostgresOutboxStore- PostgreSQL (FOR UPDATE SKIP LOCKED + RETURNING claim)JdbcOutboxStores- Static utility with ServiceLoader registry (META-INF/services/io.outbox.jdbc.store.AbstractJdbcOutboxStore) anddetect(DataSource)auto-detection
io.outbox.jdbc.purge
AbstractJdbcEventPurger- Base event purger with subquery-based DELETE (H2/PostgreSQL default)H2EventPurger- H2 (inherits default)MySqlEventPurger- MySQL/TiDB (DELETE...ORDER BY...LIMIT)PostgresEventPurger- PostgreSQL (inherits default)
io.outbox.jdbc.tx
ThreadLocalTxContext- ThreadLocal-based TxContext for manual transactionsJdbcTransactionManager- Helper for manual JDBC transactions
io.outbox.jdbc (root)
JdbcTemplate- Lightweight JDBC helper (update, query, updateReturning)OutboxStoreException- JDBC-layer exceptionDataSourceConnectionProvider- ConnectionProvider from DataSource
Optional Spring integration.
Classes:
SpringTxContext- Implements TxContext using Spring's TransactionSynchronizationManager
Micrometer metrics bridge for Prometheus, Grafana, Datadog, and other monitoring backends.
Classes:
MicrometerMetricsExporter- ImplementsMetricsExporterusing MicrometerMeterRegistry
Spring Boot auto-configuration for the outbox framework. Eliminates manual bean wiring — users add the dependency,
annotate listeners, and configure via application.properties.
Classes:
OutboxAutoConfiguration- Main auto-configuration: wiresOutboxStore,ConnectionProvider,TxContext,ListenerRegistry,Outbox, andOutboxWriterfrom aDataSourceandOutboxPropertiesOutboxMicrometerAutoConfiguration- Conditional auto-configuration for Micrometer metrics (enabled by default when Micrometer is on classpath)OutboxProperties-@ConfigurationProperties(prefix = "outbox")with nested classes for dispatcher, retry, poller, claim-locking, purge, and metrics settingsOutboxListener- Type-level annotation for declaring event listeners with string-based or type-safe class-based event/aggregate type specificationOutboxListenerRegistrar-SmartInitializingSingletonthat scans@OutboxListenerbeans andBoundEventListenerbeans, registering them in theDefaultListenerRegistryJacksonJsonCodec- Jackson-basedJsonCodecusing application'sObjectMapper; auto-configured and set as default viaJsonCodec.setDefault()
Conditions:
@ConditionalOnClass(Outbox.class)— only activates when outbox-core is on classpath@ConditionalOnBean(DataSource.class)— requires a DataSource- All beans are
@ConditionalOnMissingBean— users can override any component
Operating modes (via outbox.mode property):
SINGLE_NODE(default) — hot path + poller fallbackMULTI_NODE— hot path + claim-based locking (requiresoutbox.claim-locking.enabled=true)ORDERED— poller-only, single worker, no retryWRITER_ONLY— CDC mode, no dispatcher/poller; optional age-based purge
Lightweight Gson-based JsonCodec implementation. Add to classpath for automatic SPI discovery.
Classes:
GsonJsonCodec- ImplementsJsonCodecusing Gson; registered viaMETA-INF/services/io.outbox.spi.JsonCodec
Standalone H2 demonstration (no Spring).
Spring Boot REST API demonstration.
Multi-datasource demo (two H2 databases).
public interface TxContext {
boolean isTransactionActive();
Connection currentConnection();
void afterCommit(Runnable callback);
void afterRollback(Runnable callback);
}Rules:
currentConnection()MUST return the same connection used by business operations.afterCommit()callback MUST run only if the transaction commits successfully.afterCommit()/afterRollback()registration requires transaction synchronization to be active.- Core MUST fail-fast if
write()called whenisTransactionActive() == false.
public interface ConnectionProvider {
Connection getConnection() throws SQLException;
}Used by OutboxDispatcher and OutboxPoller for short-lived connections outside the business transaction.
| Implementation | Module | Description |
|---|---|---|
ThreadLocalTxContext |
outbox-jdbc | Manual JDBC transaction management |
SpringTxContext |
outbox-spring-adapter | Spring @Transactional integration |
public interface JsonCodec {
static JsonCodec getDefault() { ... }
static void setDefault(JsonCodec codec) { ... }
String toJson(Object obj);
<T> T fromJson(String json, Class<T> type);
default Map<String, String> parseStringMap(String json) { ... }
}getDefault()returns the default instance — resolved via programmatic override (setDefault()) orServiceLoaderdiscovery fromMETA-INF/services/io.outbox.spi.JsonCodec. ThrowsIllegalStateExceptionif no implementation is found.setDefault(codec)programmatically overrides SPI discovery (typically called by Spring Boot auto-configuration).toJson(obj)serializes an object to JSON.fromJson(json, type)deserializes a JSON string into the given type.parseStringMap(json)returns an empty map fornull, blank, or"null"input; delegates tofromJsonotherwise.- Add
outbox-gsonto the classpath for automatic Gson-based codec, or callJsonCodec.setDefault()with a custom implementation (e.g.JacksonJsonCodecin Spring Boot).
CREATE TABLE outbox_event (
event_id VARCHAR(36) PRIMARY KEY,
event_type VARCHAR(128) NOT NULL,
aggregate_type VARCHAR(64),
aggregate_id VARCHAR(128),
tenant_id VARCHAR(64),
payload JSON NOT NULL,
headers JSON,
status TINYINT NOT NULL,
attempts INT NOT NULL DEFAULT 0,
available_at DATETIME(6) NOT NULL,
created_at DATETIME(6) NOT NULL,
done_at DATETIME(6),
last_error TEXT,
locked_by VARCHAR(128),
locked_at DATETIME(6)
);
CREATE INDEX idx_status_available ON outbox_event(status, available_at, created_at);| Value | Name | Description |
|---|---|---|
| 0 | NEW | Freshly inserted, awaiting processing |
| 1 | DONE | Successfully processed |
| 2 | RETRY | Failed, scheduled for retry |
| 3 | DEAD | Exceeded max attempts |
| Field | Type | Required | Default |
|---|---|---|---|
| eventId | String | No | ULID (monotonic) |
| eventType | String | Yes | - |
| occurredAt | Instant | No | Instant.now() |
| aggregateType | String | No | AggregateType.GLOBAL.name() ("__GLOBAL__") |
| aggregateId | String | No | null |
| tenantId | String | No | null |
| headers | Map<String,String> | No | empty map |
| payloadJson | String | Yes* | - |
| availableAt | Instant | No | null (immediate — uses occurredAt) |
| deliverAfter | Duration | No | null (builder-only, resolves to availableAt) |
*payloadJson is required. **Either availableAt or deliverAfter may be set, not both. deliverAfter must be positive.
- Maximum payload size: 1MB (1,048,576 bytes)
- EventEnvelope is immutable (defensive copies for headers)
payload(Class<T> type)deserializes payloadJson viaJsonCodec.getDefault().fromJson()- Header map MUST NOT contain null keys.
availableAtanddeliverAfterare mutually exclusive; setting both throwsIllegalArgumentExceptiondeliverAftermust be positive (> 0); zero or negative throwsIllegalArgumentExceptionavailableAtmust not be beforeoccurredAt; violation throwsIllegalArgumentExceptionavailableAt(null)anddeliverAfter(null)throwNullPointerException(fail-fast at setter)- When
deliverAfteris set,availableAtis resolved tooccurredAt + deliverAfter isDelayed()returns true whenavailableAt != null && availableAt.isAfter(occurredAt)
// With type-safe EventType
EventEnvelope envelope = EventEnvelope.builder(UserEvents.USER_CREATED)
.aggregateType(Aggregates.USER)
.aggregateId("123")
.payloadJson("{\"name\":\"John\"}")
.build();
// With string
EventEnvelope envelope = EventEnvelope.builder("UserCreated")
.payloadJson("{}")
.build();
// Shorthand
EventEnvelope envelope = EventEnvelope.ofJson("UserCreated", "{}");
// Delayed delivery — absolute time
EventEnvelope delayed = EventEnvelope.builder("ReminderEmail")
.availableAt(Instant.now().plus(Duration.ofHours(24)))
.payloadJson("{\"userId\":\"123\"}")
.build();
// Delayed delivery — relative duration
EventEnvelope delayed2 = EventEnvelope.builder("RetryWebhook")
.deliverAfter(Duration.ofMinutes(30))
.payloadJson("{\"url\":\"https://...\"}")
.build();The tenantId field provides pass-through metadata for multi-tenant applications:
EventEnvelope envelope = EventEnvelope.builder("OrderCreated")
.tenantId("tenant-123")
.aggregateId("order-456")
.payloadJson("{...}")
.build();Framework behavior:
tenantIdis stored in theoutbox_eventtabletenantIdis included when polling and dispatching events- Listeners receive
tenantIdviaevent.tenantId()
Framework does NOT provide:
- Tenant-based filtering during polling
- Tenant isolation or partitioning
- Per-tenant configuration
Application responsibility:
- Set
tenantIdwhen publishing events - Use
tenantIdin listeners to route events or apply tenant-specific logic - Implement tenant isolation at the database level if required (e.g., row-level security, separate schemas)
public interface EventType {
String name();
}public enum UserEvents implements EventType {
USER_CREATED,
USER_UPDATED,
USER_DELETED;
}
// Usage
EventEnvelope.
builder(UserEvents.USER_CREATED)
.
payloadJson("{}")
.
build();EventType type = StringEventType.of("DynamicEvent");
EventEnvelope.
builder(type)
.
payloadJson("{}")
.
build();public interface AggregateType {
AggregateType GLOBAL = ...; // name() returns "__GLOBAL__"
String name();
}AggregateType.GLOBAL is the default aggregate type used when none is explicitly set on an EventEnvelope.
// Enum-based
public enum Aggregates implements AggregateType {
USER, ORDER, PRODUCT
}
EventEnvelope.
builder(eventType)
.
aggregateType(Aggregates.USER)
.
aggregateId("user-123")
.
build();
// Dynamic
EventEnvelope.
builder(eventType)
.
aggregateType(StringAggregateType.of("CustomAggregate"))
.
aggregateId("id-456")
.
build();public interface OutboxWriter {
String write(EventEnvelope event); // returns null if suppressed
String write(String eventType, String payloadJson); // returns null if suppressed
String write(EventType eventType, String payloadJson); // returns null if suppressed
List<String> writeAll(List<EventEnvelope> events); // returns empty list if suppressed
}
public final class DefaultOutboxWriter implements OutboxWriter {
public DefaultOutboxWriter(TxContext txContext, OutboxStore outboxStore);
public DefaultOutboxWriter(TxContext txContext, OutboxStore outboxStore, WriterHook writerHook);
}Semantics:
- MUST require an active transaction via TxContext
write()delegates towriteAll()(single-element list)writeAll()callsWriterHook.beforeWrite()which may transform or suppress the list- If
beforeWritereturns null or empty, no events are inserted (suppressed write) - MUST insert outbox rows (NEW) using
TxContext.currentConnection()within the current transaction - MUST register a single
afterCommit/afterRollbackcallback perwriteAllbatch - If the hook throws in
afterWrite/afterCommit/afterRollback, it MUST NOT propagate (log and continue) - If no hook is provided (or
WriterHook.NOOP), no post-commit action is executed (poller/CDC is responsible)
public interface WriterHook {
default List<EventEnvelope> beforeWrite(List<EventEnvelope> events) {
return events;
}
default void afterWrite(List<EventEnvelope> events) {
}
default void afterCommit(List<EventEnvelope> events) {
}
default void afterRollback(List<EventEnvelope> events) {
}
WriterHook NOOP = new WriterHook() {
};
}Lifecycle: beforeWrite (transform/suppress) → insert → afterWrite → tx commit/rollback → afterCommit/
afterRollback.
beforeWritemay return a modified list; returning null or empty suppresses the writeafterWrite/afterCommit/afterRollbackexceptions are swallowed and loggedDispatcherWriterHookimplementsafterCommitto enqueue each event into the dispatcher's hot queue. Delayed events (isDelayed()) are skipped — they remain in the DB for the poller to deliver atavailableAt.
public interface OutboxStore {
void insertNew(Connection conn, EventEnvelope event);
int markDone(Connection conn, String eventId);
int markRetry(Connection conn, String eventId, Instant nextAt, String error);
int markDead(Connection conn, String eventId, String error);
// Handler-controlled deferred retry (default falls back to markRetry)
default int markDeferred(Connection conn, String eventId, Instant nextAt);
List<OutboxEvent> pollPending(Connection conn, Instant now, Duration skipRecent, int limit);
// Claim-based locking (default falls back to pollPending)
default List<OutboxEvent> claimPending(
Connection conn, String ownerId, Instant now,
Instant lockExpiry, Duration skipRecent, int limit);
}Insert New:
INSERT INTO outbox_event (event_id, event_type, aggregate_type, aggregate_id,
tenant_id, payload, headers, status, attempts, available_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, ?, ?)Mark Done (idempotent):
UPDATE outbox_event
SET status = 1, done_at = ?, locked_by = NULL, locked_at = NULL
WHERE event_id = ? AND status <> 1Mark Retry:
UPDATE outbox_event
SET status = 2, attempts = attempts + 1, available_at = ?, last_error = ?,
locked_by = NULL, locked_at = NULL
WHERE event_id = ? AND status <> 1Mark Dead:
UPDATE outbox_event
SET status = 3, last_error = ?, locked_by = NULL, locked_at = NULL
WHERE event_id = ? AND status <> 1Mark Deferred (handler-controlled retry without incrementing attempts):
UPDATE outbox_event
SET status = 0, available_at = ?, locked_by = NULL, locked_at = NULL
WHERE event_id = ? AND status <> 1Default implementation falls back to markRetry (which increments attempts). JDBC implementations override with
the above SQL to preserve the attempt count.
Poll Pending:
SELECT event_id, event_type, aggregate_type, aggregate_id, tenant_id,
payload, headers, attempts, created_at
FROM outbox_event
WHERE status IN (0, 2)
AND available_at <= ?
AND created_at <= ?
ORDER BY created_at
LIMIT ?- MUST use PreparedStatement with bound parameters
- MUST NOT close transaction-bound connection (caller manages lifecycle)
- Error messages truncated to 4000 characters
OutboxDispatcher dispatcher = OutboxDispatcher.builder()
.connectionProvider(connectionProvider) // required
.outboxStore(outboxStore) // required
.listenerRegistry(listenerRegistry) // required
.inFlightTracker(tracker) // default: DefaultInFlightTracker
.retryPolicy(policy) // default: ExponentialBackoffRetryPolicy(200, 60_000)
.maxAttempts(10) // default: 10
.workerCount(4) // default: 4
.hotQueueCapacity(1000) // default: 1000
.coldQueueCapacity(1000) // default: 1000
.metrics(metricsExporter) // default: MetricsExporter.NOOP
.interceptor(interceptor) // optional, repeatable
.interceptors(List.of(i1, i2)) // optional, bulk add
.drainTimeoutMs(5000) // default: 5000
.build();boolean enqueueHot(QueuedEvent event) // Returns false if queue full or shutting down
boolean enqueueCold(QueuedEvent event) // Returns false if queue full or shutting down
int coldQueueRemainingCapacity() // Number of slots available in cold queue
void close() // Graceful shutdown with drainFor each queued event:
- Dedupe: If eventId already inflight, drop
- Interceptors: Run
beforeDispatchin registration order - Route: Find single listener via
listenerRegistry.listenerFor(aggregateType, eventType) - Unroutable: If no listener, throw
UnroutableEventException-> mark DEAD immediately (no retry) - Execute: Call
listener.onEvent(event)→ returnsDispatchResult - After: Run
afterDispatchin reverse order (null error on success, exception on failure) - Result handling:
Done: markDone; remove from inflightRetryAfter(delay): markDeferred withnow + delay(no attempt increment); remove from inflightDead: markDead immediately with optional reason (no retry)
- Failure handling:
RetryAfterException: markRetry with handler delay (counts against maxAttempts)- Other exception: markRetry with
RetryPolicybackoff, or markDead after maxAttempts UnrecoverableException: markDead immediately (no retry, no attempt increment)
Workers execute listeners synchronously on the worker thread:
Worker Thread:
loop:
event = pollFairly() // 2:1 hot:cold weighted round-robin
interceptors.beforeDispatch(event)
listener = registry.listenerFor(aggregateType, eventType)
result = listener.onEvent(event) // blocking, returns DispatchResult
interceptors.afterDispatch(event, null)
if result is RetryAfter -> markDeferred(event, now + delay)
else -> markDone(event)
Cross-cutting hooks for audit, logging, and metrics:
public interface EventInterceptor {
default void beforeDispatch(EventEnvelope event) throws Exception {
}
default void afterDispatch(EventEnvelope event, Exception error) {
}
}beforeDispatchruns in registration order; exception short-circuits to RETRY/DEADafterDispatchruns in reverse order; exceptions are logged but swallowed- Factory methods:
EventInterceptor.before(hook),EventInterceptor.after(hook)
Workers use 2:1 weighted round-robin: hot queue gets 2/3 of poll attempts, cold queue 1/3. This prevents cold queue starvation under sustained hot load.
close() stops accepting new events, signals workers to drain remaining events,
and waits up to drainTimeoutMs before forcing shutdown.
This is intentional for natural backpressure:
workerCount= maximum concurrent events being processed- Slow listeners -> workers stay busy -> cannot poll more events
- Queues fill up ->
enqueueHot()returns false -> graceful degradation - No risk of overwhelming downstream systems (MQ, databases, APIs)
Tuning: Adjust workerCount to control max parallelism. Higher values increase throughput but may overwhelm
downstream services.
public class QueuedEvent {
EventEnvelope envelope;
Source source; // HOT or COLD
int attempts;
}Prevents concurrent processing of the same event.
public interface InFlightTracker {
boolean tryAcquire(String eventId); // Returns false if already in-flight
void release(String eventId); // Remove from tracking
}DefaultInFlightTracker:
new DefaultInFlightTracker() // No TTL
new
DefaultInFlightTracker(long ttlMs) // With TTL for stale entry recovery- Uses ConcurrentHashMap for thread-safe tracking
- TTL allows recovery from stuck entries (e.g., worker crash)
OutboxPoller poller = OutboxPoller.builder()
.connectionProvider(connectionProvider) // required
.outboxStore(outboxStore) // required
.handler(handler) // required
.skipRecent(Duration.ofSeconds(1)) // default: Duration.ZERO
.batchSize(50) // default: 50
.intervalMs(5000) // default: 5000
.metrics(metricsExporter) // default: MetricsExporter.NOOP
.claimLocking("poller-1", Duration.ofMinutes(5)) // optional: enables multi-node claim locking
.build();void start() // Start scheduled polling
void poll() // Execute single poll cycle
void close() // Stop polling- Runs on scheduled interval (default 5000ms)
- Checks handler capacity before polling; skips cycle if full
- Skips events created within
skipRecentduration (defaultDuration.ZERO) - Queries status IN (0, 2) with available_at <= now
- Converts OutboxEvent to EventEnvelope
- Delegates to handler for processing (subject to backpressure)
- On decode failure: marks event DEAD
When claimLocking is configured, the poller uses claim-based locking:
- Claim: Sets
locked_byandlocked_aton pending events atomically - Expiry: Locks older than the configured timeout are considered expired and can be reclaimed
- Release:
markDone/markRetry/markDeadclearlocked_byandlocked_at - Database-specific: PostgreSQL uses
FOR UPDATE SKIP LOCKED+RETURNING; MySQL usesUPDATE...ORDER BY...LIMIT; H2 uses subquery-based two-phase claim
@FunctionalInterface
public interface OutboxPollerHandler {
boolean handle(EventEnvelope event, int attempts);
default int availableCapacity() {
return Integer.MAX_VALUE;
}
}- Handler invoked for each decoded event
- Returning false stops the current poll cycle (backpressure)
For high-QPS workloads, CDC can replace the in-process poller and hot-path hook:
- Construct
DefaultOutboxWriterwithout a hook (or withWriterHook.NOOP) - Do not start
OutboxPoller - CDC consumer publishes downstream; status updates are optional in CDC-only mode
- If you do not mark DONE, treat the table as append-only and apply retention (e.g., partitioning + TTL)
- Dedupe by
event_id
/**
* Listener that reacts to outbox events.
*
* Each (aggregateType, eventType) pair maps to exactly one listener.
* For cross-cutting concerns (audit, logging), use EventInterceptor.
*/
@FunctionalInterface
public interface EventListener {
DispatchResult onEvent(EventEnvelope envelope) throws Exception;
}public interface ListenerRegistry {
EventListener listenerFor(String aggregateType, String eventType);
}Returns the single listener for the given (aggregateType, eventType), or null if none registered.
// Register with GLOBAL aggregate type (convenience)
registry.register("UserCreated",event ->{...});
// Register with specific aggregate type
registry.
register("Order","OrderPlaced",event ->{...});
// Type-safe registration
registry.
register(Aggregates.USER, UserEvents.USER_CREATED, event ->{...});- Duplicate registration for the same
(aggregateType, eventType)throwsIllegalStateException - Convenience
register(eventType, listener)usesAggregateType.GLOBAL
Abstract class that pre-binds aggregate type and event type at construction:
public abstract class BoundEventListener implements EventListener {
public BoundEventListener(String aggregateType, String eventType);
public BoundEventListener(AggregateType aggregateType, EventType eventType);
public String getAggregateType();
public String getEventType();
}Register directly via registry.register(boundListener). In Spring Boot, declare as a @Component bean — the
OutboxListenerRegistrar auto-discovers BoundEventListener beans.
- Lookup listener via
aggregateType + ":" + eventTypekey - If found, execute the single listener
- If not found, throw
UnroutableEventException-> event marked DEAD immediately (no retry) - For cross-cutting behavior (audit/logging), use
EventInterceptoron the dispatcher builder
public interface RetryPolicy {
long computeDelayMs(int attempts);
}public ExponentialBackoffRetryPolicy(long baseDelayMs, long maxDelayMs)Formula:
delay = min(maxDelay, baseDelay * 2^(attempts-1)) * jitter
jitter = random(0.5, 1.5)
| Parameter | Default |
|---|---|
| baseDelayMs | 200 |
| maxDelayMs | 60000 |
| maxAttempts | 10 |
In addition to the framework's RetryPolicy, handlers can control retry timing directly:
DispatchResult (sealed interface):
public sealed interface DispatchResult permits Done, RetryAfter, Dead {
static Done done();
static RetryAfter retryAfter(Duration delay);
static Dead dead();
static Dead dead(String reason);
record Done() implements DispatchResult {}
record RetryAfter(Duration delay) implements DispatchResult {}
record Dead(String reason) implements DispatchResult {}
}Done: event processed successfully →markDoneRetryAfter(delay): event not yet complete →markDeferred(resets to PENDING withavailable_at = now + delay). Does not incrementattempts.Dead(orDead(reason)): event should not be retried →markDeadimmediately. Optional reason stored in error column.
RetryAfterException:
public class RetryAfterException extends RecoverableException {
public RetryAfterException(Duration retryAfter);
public RetryAfterException(Duration retryAfter, String message);
public RetryAfterException(Duration retryAfter, Throwable cause);
public RetryAfterException(Duration retryAfter, String message, Throwable cause);
public Duration retryAfter();
}- Unlike
DispatchResult.RetryAfter, throwing this exception does count againstmaxAttempts - Dispatcher uses exception's
retryAfter()duration instead ofRetryPolicy.computeDelayMs() - If
maxAttemptsexhausted, event goes DEAD regardless
Comparison:
| Mechanism | Counts against maxAttempts | Delay source | Use case |
|---|---|---|---|
DispatchResult.RetryAfter |
No | Handler-specified | Polling, waiting for preconditions |
DispatchResult.Dead |
N/A (immediately DEAD) | N/A | Unrecoverable payload, business rejection |
RetryAfterException |
Yes | Handler-specified | Transient failure with known retry delay |
UnrecoverableException |
N/A (immediately DEAD) | N/A | Deterministic failure (bad payload, etc.) |
| Other exception | Yes | RetryPolicy |
Unexpected failure |
The framework implements backpressure at multiple levels to prevent overwhelming downstream systems.
+---------------------------------------------------------------------------+
| BACKPRESSURE FLOW |
+---------------------------------------------------------------------------+
| |
| [Slow Listener] |
| | |
| v |
| [Workers Blocked] --> Only N events processed concurrently |
| | (N = workerCount) |
| v |
| [Queues Fill Up] --> Bounded capacity prevents memory growth |
| | |
| v |
| [enqueueHot() returns false] |
| | |
| v |
| [Event stays in DB] --> OutboxPoller/CDC picks up when capacity frees |
| |
+---------------------------------------------------------------------------+
- Hot and cold queues MUST be bounded (
ArrayBlockingQueue) - Unbounded queues are forbidden
- Default capacity: 1000 each
Workers execute listeners synchronously (blocking). This provides natural rate limiting:
| Scenario | Effect |
|---|---|
| Fast listeners | Workers quickly return to polling, high throughput |
| Slow listeners | Workers blocked, queues fill, automatic throttling |
| Downstream outage | All workers blocked, queues full, events safe in DB |
Key insight: The database acts as a durable buffer when in-memory queues are full.
write()MUST NOT throwDispatcherWriterHooklogs WARNING and increments metric when the hot queue drops- Delayed events (
isDelayed()) are skipped entirely — not enqueued to the hot queue. The poller delivers them whenavailable_atarrives.incrementHotSkippedDelayed()metric is emitted. - Event remains in DB with status NEW
- OutboxPoller or CDC picks up when workers have capacity
- OutboxPoller stops enqueueing for current cycle
- Events remain in DB, retry on next poll cycle
- No data loss
The recommended way to configure the outbox is via the Outbox composite builder (Outbox.singleNode(),
Outbox.multiNode(), Outbox.ordered(), Outbox.writerOnly()), which wires all components with correct defaults. For
advanced use cases, OutboxDispatcher.Builder and OutboxPoller.Builder are available directly.
| Parameter | Default |
|---|---|
| workerCount | 4 |
| hotQueueCapacity | 1000 |
| coldQueueCapacity | 1000 |
| maxAttempts | 10 |
| retryPolicy | ExponentialBackoffRetryPolicy(200, 60_000) |
| drainTimeoutMs | 5000 |
| metrics | MetricsExporter.NOOP |
try(Outbox outbox = Outbox.singleNode()
.connectionProvider(connectionProvider)
.txContext(txContext)
.outboxStore(outboxStore)
.listenerRegistry(registry)
.workerCount(8)
.hotQueueCapacity(2000)
.build()){
OutboxWriter writer = outbox.writer();
// use writer inside transactions...
}OutboxDispatcher dispatcher = OutboxDispatcher.builder()
.connectionProvider(connectionProvider)
.outboxStore(outboxStore)
.listenerRegistry(registry)
.workerCount(8)
.hotQueueCapacity(2000)
.build();public interface MetricsExporter {
void incrementHotEnqueued();
void incrementHotDropped();
void incrementColdEnqueued();
void incrementDispatchSuccess();
void incrementDispatchFailure();
void incrementDispatchDead();
default void incrementDispatchDeferred(); // handler returned RetryAfter
default void incrementHotSkippedDelayed(); // delayed event skipped on hot path
void recordQueueDepths(int hotDepth, int coldDepth);
void recordOldestLagMs(long lagMs);
MetricsExporter NOOP = new Noop();
}The outbox-micrometer module provides MicrometerMetricsExporter, a ready-to-use implementation that registers
counters and gauges with a Micrometer MeterRegistry.
Constructors:
new MicrometerMetricsExporter(MeterRegistry registry) // default prefix: "outbox"
new
MicrometerMetricsExporter(MeterRegistry registry, String namePrefix) // custom prefixCounters (monotonically increasing):
| Metric Name | Description |
|---|---|
{prefix}.enqueue.hot |
Events enqueued via hot path |
{prefix}.enqueue.hot.dropped |
Events dropped (hot queue full) |
{prefix}.enqueue.cold |
Events enqueued via cold (poller) path |
{prefix}.dispatch.success |
Events dispatched successfully |
{prefix}.dispatch.failure |
Events failed (will retry) |
{prefix}.dispatch.dead |
Events moved to DEAD |
{prefix}.dispatch.deferred |
Events deferred by handler (RetryAfter) |
{prefix}.enqueue.hot.skipped.delayed |
Delayed events skipped on hot path |
Gauges (current value):
| Metric Name | Description |
|---|---|
{prefix}.queue.hot.depth |
Current hot queue depth |
{prefix}.queue.cold.depth |
Current cold queue depth |
{prefix}.lag.oldest.ms |
Lag of oldest pending event in milliseconds |
The default prefix is outbox. For multi-instance setups, use a custom prefix (e.g., "orders.outbox") to avoid metric
collisions.
| Level | Event |
|---|---|
| WARNING | Hot queue drop (DispatcherWriterHook) |
| ERROR | DEAD transition |
| ERROR | OutboxDispatcher/poller loop errors |
| SEVERE | Decode failures (malformed headers) |
Hot queue drop warnings are emitted by DispatcherWriterHook. If no hook is installed (CDC-only), no warning or metric
is produced.
- Listeners that publish to MQ MUST include eventId in message header/body
- Downstream systems must dedupe by eventId
- Framework provides at-least-once delivery
| Component | Strategy |
|---|---|
| OutboxDispatcher | Worker pool (ExecutorService), bounded BlockingQueues |
| Registries | ConcurrentHashMap |
| InFlightTracker | ConcurrentHashMap with CAS operations |
| OutboxPoller | Single-thread ScheduledExecutorService |
| OutboxPurgeScheduler | Single-thread ScheduledExecutorService |
| ThreadLocalTxContext | ThreadLocal storage |
The outbox table is a transient buffer, not an outbox store. Terminal events (DONE and DEAD) should be purged after a
retention period to prevent table bloat and maintain poller query performance. If clients need to archive events for
audit, they should do so in their EventListener.
public interface EventPurger {
int purge(Connection conn, Instant before, int limit);
}- Deletes terminal events (DONE + DEAD) where
done_at < before(falls back tocreated_at < beforewhendone_atis null) - Takes explicit
Connection(caller controls transaction), matching theOutboxStorepattern - Returns count of rows deleted
limitcaps the batch size per call to limit lock duration
| Class | Database | Strategy |
|---|---|---|
AbstractJdbcEventPurger |
Base | Subquery-based DELETE (default) |
H2EventPurger |
H2 | Inherits default |
MySqlEventPurger |
MySQL/TiDB | DELETE...ORDER BY...LIMIT |
PostgresEventPurger |
PostgreSQL | Inherits default |
Default purge SQL (H2, PostgreSQL):
DELETE FROM outbox_event WHERE event_id IN (
SELECT event_id FROM outbox_event
WHERE status IN (1, 3)
AND (done_at < ? OR (done_at IS NULL AND created_at < ?))
ORDER BY created_at, event_id LIMIT ?
)MySQL purge SQL:
DELETE FROM outbox_event
WHERE status IN (1, 3)
AND (done_at < ? OR (done_at IS NULL AND created_at < ?))
ORDER BY created_at, event_id LIMIT ?All purger classes support a custom table name via constructor (validated with the same regex as
AbstractJdbcOutboxStore).
Scheduled component modeled after OutboxPoller: builder pattern, AutoCloseable, daemon threads, synchronized
lifecycle.
OutboxPurgeScheduler scheduler = OutboxPurgeScheduler.builder()
.connectionProvider(connectionProvider) // required
.purger(purger) // required
.retention(Duration.ofDays(7)) // default: 7 days
.batchSize(500) // default: 500
.intervalSeconds(3600) // default: 3600 (1 hour)
.build();| Parameter | Type | Default | Required |
|---|---|---|---|
connectionProvider |
ConnectionProvider |
- | yes |
purger |
EventPurger |
- | yes |
retention |
Duration |
7 days | no |
batchSize |
int |
500 | no |
intervalSeconds |
long |
3600 | no |
void start() // Start scheduled purge loop
void runOnce() // Execute single purge cycle (loops batches until count < batchSize)
void close() // Stop purge and shut down scheduler thread- Each purge cycle calculates cutoff as
Instant.now().minus(retention) - Loops batches: each batch gets its own auto-committed connection
- Stops when a batch deletes fewer than
batchSizerows (backlog drained) - Logs total purged count at INFO level
- Errors are caught and logged at SEVERE (does not propagate)
- Calling
start()afterclose()MUST throwIllegalStateException.
Events that exceed maxAttempts or have no registered listener are marked DEAD. The framework provides tooling to
query, count, and replay dead events without writing raw SQL.
The OutboxStore interface includes default methods for dead event operations:
default List<OutboxEvent> queryDead(Connection conn, String eventType, String aggregateType, int limit);
default int replayDead(Connection conn, String eventId);
default int countDead(Connection conn, String eventType);queryDead— returns dead events matching optional filters (nullfor all), ordered oldest firstreplayDead— resets a single DEAD event to NEW status (returns number of rows updated)countDead— counts dead events, optionally filtered by event type
DeadEventManager (io.outbox.dead) is a convenience facade that manages connection lifecycle internally using a
ConnectionProvider:
public final class DeadEventManager {
public DeadEventManager(ConnectionProvider connectionProvider, OutboxStore outboxStore);
public List<OutboxEvent> query(String eventType, String aggregateType, int limit);
public boolean replay(String eventId);
public int replayAll(String eventType, String aggregateType, int batchSize);
public int count(String eventType);
}Methods:
| Method | Description |
|---|---|
query(eventType, aggregateType, limit) |
Query dead events with optional filters (null for all) |
replay(eventId) |
Replay a single dead event by resetting it to NEW; returns true if replayed |
replayAll(eventType, aggregateType, batchSize) |
Replay all matching dead events in batches; returns total replayed |
count(eventType) |
Count dead events, optionally filtered by event type (null for all) |
All DeadEventManager methods catch SQLException and log at SEVERE level:
query()returnsList.of()on failurereplay()returnsfalseon failurereplayAll()returns the count replayed so far and stops on failurecount()returns0on failure
For per-aggregate FIFO ordering:
| Setting | Value | Reason |
|---|---|---|
WriterHook |
NOOP (default) |
Disable hot path to avoid dual-path reordering |
OutboxPoller |
Single node | Prevent cross-node claim interleaving |
workerCount |
1 |
Sequential dispatch preserves poll order |
The poller reads events in ORDER BY created_at order. The single dispatch
worker processes them sequentially, guaranteeing that events for the same
aggregate are delivered in insertion order.
The dual hot+cold path architecture makes ordering hard in general — events
for the same aggregate can arrive via different paths in unpredictable order.
Disabling the hot path (WriterHook.NOOP) ensures all events flow through
the poller, which reads them in DB insertion order.
If event A fails and is marked RETRY with exponential backoff, its available_at
is set to a future timestamp. Meanwhile, event B (same aggregate, inserted after A)
has available_at <= now and will be polled and delivered before A's retry becomes
eligible — breaking per-aggregate ordering.
Mitigation: Set maxAttempts(1) so failed events go directly to DEAD without
retry. Use DeadEventManager to inspect and replay them manually after fixing the
underlying issue.
- Higher latency than hot-path mode (bounded by poll interval).
- Throughput limited by single-worker sequential processing (sufficient when DB poll is the bottleneck).
- Ordering is per-node; no cross-node ordering guarantee.
- Retries break ordering (see 19.3); use
maxAttempts(1)for strict FIFO.
The Outbox class is the recommended entry point for wiring the framework. It provides four scenario-specific builders
that create an OutboxWriter and optionally an OutboxDispatcher, OutboxPoller, and OutboxPurgeScheduler as a
single AutoCloseable unit.
| Builder | Hot Path | Poller Mode | workerCount | maxAttempts | WriterHook |
|---|---|---|---|---|---|
Outbox.singleNode() |
Yes | pollPending |
user-set (default 4) | user-set (default 10) | DispatcherWriterHook |
Outbox.multiNode() |
Yes | claimPending |
user-set (default 4) | user-set (default 10) | DispatcherWriterHook |
Outbox.ordered() |
No | pollPending |
1 (forced) | 1 (forced) | NOOP (forced) |
Outbox.writerOnly() |
No | None | N/A | N/A | NOOP (forced) |
Outbox (final, AutoCloseable)
├── singleNode() → SingleNodeBuilder
├── multiNode() → MultiNodeBuilder
├── ordered() → OrderedBuilder
├── writerOnly() → WriterOnlyBuilder
│
└── AbstractBuilder<B> (sealed, permits 4 concrete builders)
Required: connectionProvider, txContext, outboxStore, listenerRegistry
Optional: metrics, interceptor(s), intervalMs, batchSize,
skipRecent, drainTimeoutMs
SingleNodeBuilder and MultiNodeBuilder add: workerCount, hotQueueCapacity, coldQueueCapacity, maxAttempts,
retryPolicy. MultiNodeBuilder additionally requires claimLocking(Duration) or claimLocking(String, Duration).
OrderedBuilder exposes no additional parameters.
WriterOnlyBuilder only requires txContext and outboxStore. Optionally accepts purger, purgeRetention,
purgeBatchSize, purgeIntervalSeconds for age-based cleanup; if purger is set, connectionProvider is also
required. Inherited dispatcher/poller settings are ignored.
Each build():
- Validates required fields (
NullPointerExceptionif missing). MultiNodeBuilderchecksclaimLocking()was called (IllegalStateExceptionif not).- Builds
OutboxDispatcher(workers start immediately). Skipped forWriterOnlyBuilder. - Builds
OutboxPoller— wrapped in try-catch: if fails, dispatcher is closed before rethrowing. Skipped forWriterOnlyBuilder. - Starts poller. Skipped for
WriterOnlyBuilder. - Creates
OutboxWriter(withDispatcherWriterHookfor single/multi-node,NOOPfor ordered and writer-only). WriterOnlyBuilderoptionally builds and startsOutboxPurgeSchedulerif a purger is configured — wrapped in try-catch: ifstart()fails, the scheduler is closed before rethrowing.- Returns
Outbox.
Outbox.close() shuts down in order (null components are skipped):
purgeScheduler.close()— stop purge schedule.poller.close()— stop feeding cold queue.dispatcher.close()— drain remaining events withindrainTimeoutMs.
// Single-node (hot + poller)
try(Outbox outbox = Outbox.singleNode()
.connectionProvider(cp).txContext(tx).outboxStore(store).listenerRegistry(reg)
.workerCount(4)
.build()){
OutboxWriter writer = outbox.writer();
}
// Multi-node (hot + poller + claim locking)
try(
Outbox outbox = Outbox.multiNode()
.connectionProvider(cp).txContext(tx).outboxStore(store).listenerRegistry(reg)
.claimLocking(Duration.ofMinutes(5))
.workerCount(8)
.build()){
OutboxWriter writer = outbox.writer();
}
// Ordered delivery (poller-only, single worker, no retry)
try(
Outbox outbox = Outbox.ordered()
.connectionProvider(cp).txContext(tx).outboxStore(store).listenerRegistry(reg)
.intervalMs(1000)
.build()){
OutboxWriter writer = outbox.writer();
}
// Writer-only (CDC mode, no dispatcher/poller)
try(
Outbox outbox = Outbox.writerOnly()
.txContext(tx).outboxStore(store)
.build()){
OutboxWriter writer = outbox.writer();
}
// Writer-only with age-based purge
try(
Outbox outbox = Outbox.writerOnly()
.txContext(tx).outboxStore(store)
.connectionProvider(cp)
.purger(new H2AgeBasedPurger())
.purgeRetention(Duration.ofHours(24))
.purgeIntervalSeconds(1800)
.build()){
OutboxWriter writer = outbox.writer();
}