Skip to content

fix: improve CoinJoinManager message processor#292

Open
HashEngineering wants to merge 2 commits intomasterfrom
fix/coinjoin-manager-msg-processor
Open

fix: improve CoinJoinManager message processor#292
HashEngineering wants to merge 2 commits intomasterfrom
fix/coinjoin-manager-msg-processor

Conversation

@HashEngineering
Copy link
Collaborator

@HashEngineering HashEngineering commented Feb 18, 2026

Issue being fixed or feature implemented

What was done?

How Has This Been Tested?

Breaking Changes

Checklist:

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have added or updated relevant unit/integration/functional/e2e tests
  • I have made corresponding changes to the documentation

For repository code-owners and collaborators only

  • I have assigned this pull request to a milestone

Summary by CodeRabbit

Release Notes

  • Bug Fixes
    • Improved message processor initialization with delayed loading pattern, optimizing resource allocation.
    • Enhanced system stability with robust null-safety checks during shutdown and proper executor lifecycle management.
    • Strengthened message handling with state validation to gracefully handle edge cases and prevent execution rejections.

@HashEngineering HashEngineering self-assigned this Feb 18, 2026
@coderabbitai
Copy link

coderabbitai bot commented Feb 18, 2026

📝 Walkthrough

Walkthrough

This pull request refactors the message processor executor lifecycle in CoinJoinManager by introducing lazy initialization, adding null-safety guards in shutdown operations, checking executor state before submitting work, and handling rejected execution exceptions.

Changes

Cohort / File(s) Summary
Message Processor Lifecycle Management
core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java
Added MESSAGE_PROCESSOR constant for thread factory naming. Deferred executor initialization from field declaration to start() method. Implemented null-safety checks in stop() and close(). Added isMessageProcessorRunning() helper method. Updated preMessageReceivedEventListener to verify executor state before task submission and catch RejectedExecutionException.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Poem

🐰 A message processor springs to life,
Not born too soon, but just in time,
Lazy hops with grace and care,
Null-safe shutdown everywhere,
Rejected tasks handled with a bound—hop! 🌱

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: improving the CoinJoinManager message processor by introducing lazy initialization, null-safety guards, and better lifecycle management.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/coinjoin-manager-msg-processor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java (1)

192-199: ⚠️ Potential issue | 🟡 Minor

start() leaks the executor if called more than once.

There is no guard checking whether messageProcessingExecutor is already initialized. A second call overwrites the field, leaving the first pool running and untracked.

🛡️ Proposed guard
     public void start() {
         log.info("CoinJoinManager starting...");
         Context.propagate(context);
         schedule = scheduledExecutorService.scheduleWithFixedDelay(
                 maintenanceRunnable, 1, 1, TimeUnit.SECONDS);
+        if (messageProcessingExecutor == null) {
             messageProcessingExecutor = Executors.newFixedThreadPool(5,
                     new ContextPropagatingThreadFactory(MESSAGE_PROCESSOR));
+        }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java` around
lines 192 - 199, The start() method can overwrite and leak the existing
messageProcessingExecutor (and re-schedule maintenanceRunnable) if called twice;
update start() (referencing start(), messageProcessingExecutor, schedule,
scheduledExecutorService, maintenanceRunnable, ContextPropagatingThreadFactory
and MESSAGE_PROCESSOR) to guard against repeated starts by checking whether
messageProcessingExecutor is non-null and not shutdown (or whether schedule is
non-null and not cancelled) and either throw an IllegalStateException or return
early; alternatively, if you intend to restart, first shut down the existing
messageProcessingExecutor and cancel the existing schedule (call
shutdown()/shutdownNow() and cancel on schedule) before creating new ones so the
old executors are not leaked.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java`:
- Line 102: Declare the field messageProcessingExecutor as volatile to ensure
cross-thread visibility: update the declaration of messageProcessingExecutor to
be volatile so writes in stop() and close() are visible to the network I/O
thread that reads it in preMessageReceivedEventListener; no other
synchronization changes required.
- Around line 522-530: There’s a TOCTOU between isMessageProcessorRunning() and
using messageProcessingExecutor that can lead to an NPE if stop() nulls the
field; fix it by reading messageProcessingExecutor once into a local variable
(e.g., var exec = messageProcessingExecutor), use exec for the running check and
for exec.execute(() -> processMessage(peer, m)), and wrap exec.execute in the
existing try/catch for RejectedExecutionException while also guarding with if
(exec != null) to avoid NullPointerException; update references to
messageProcessingExecutor, isMessageProcessorRunning(), stop(), processMessage,
and the RejectedExecutionException handling accordingly.
- Around line 295-298: close() currently mutates messageProcessingExecutor
without acquiring lock and doesn't await termination, causing a race with
stop(); modify close() to acquire the same lock used in stop() before
reading/writing messageProcessingExecutor, perform the same shutdown sequence
(shutdown, awaitTermination with timeout/interrupt handling) and set
messageProcessingExecutor = null inside that locked section, mirroring stop()'s
behavior so both methods synchronize on lock and wait for executor termination
consistently.

---

Outside diff comments:
In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java`:
- Around line 192-199: The start() method can overwrite and leak the existing
messageProcessingExecutor (and re-schedule maintenanceRunnable) if called twice;
update start() (referencing start(), messageProcessingExecutor, schedule,
scheduledExecutorService, maintenanceRunnable, ContextPropagatingThreadFactory
and MESSAGE_PROCESSOR) to guard against repeated starts by checking whether
messageProcessingExecutor is non-null and not shutdown (or whether schedule is
non-null and not cancelled) and either throw an IllegalStateException or return
early; alternatively, if you intend to restart, first shut down the existing
messageProcessingExecutor and cancel the existing schedule (call
shutdown()/shutdownNow() and cancel on schedule) before creating new ones so the
old executors are not leaked.

private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService messageProcessingExecutor = Executors.newFixedThreadPool(5,
new ContextPropagatingThreadFactory("CoinJoin-MessageProcessor"));
private ExecutorService messageProcessingExecutor = null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

messageProcessingExecutor must be declared volatile for cross-thread visibility.

stop() and close() write this field on one thread; preMessageReceivedEventListener reads it on a network I/O thread with no lock. Without volatile, the JMM provides no happens-before guarantee, so the listener thread can observe a stale non-null value after stop() sets it to null.

🔒 Proposed fix
-    private ExecutorService messageProcessingExecutor = null;
+    private volatile ExecutorService messageProcessingExecutor = null;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private ExecutorService messageProcessingExecutor = null;
private volatile ExecutorService messageProcessingExecutor = null;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java` at line
102, Declare the field messageProcessingExecutor as volatile to ensure
cross-thread visibility: update the declaration of messageProcessingExecutor to
be volatile so writes in stop() and close() are visible to the network I/O
thread that reads it in preMessageReceivedEventListener; no other
synchronization changes required.

Comment on lines +295 to 298
if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) {
messageProcessingExecutor.shutdown();
messageProcessingExecutor = null;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

close() modifies messageProcessingExecutor without holding lock, inconsistent with stop().

stop() guards all writes to messageProcessingExecutor under the lock, but close() does not. If both are called concurrently (e.g. close() on a lifecycle teardown thread while stop() is already in progress), there is an unguarded data race on the field write at line 297. Additionally, close() does not await termination before returning, unlike stop().

💡 Suggested fix
         // Ensure executor is shut down
-        if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) {
-            messageProcessingExecutor.shutdown();
-            messageProcessingExecutor = null;
-        }
+        ExecutorService execToStop = null;
+        lock.lock();
+        try {
+            if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) {
+                execToStop = messageProcessingExecutor;
+                messageProcessingExecutor = null;
+            }
+        } finally {
+            lock.unlock();
+        }
+        if (execToStop != null) {
+            execToStop.shutdown();
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java` around
lines 295 - 298, close() currently mutates messageProcessingExecutor without
acquiring lock and doesn't await termination, causing a race with stop(); modify
close() to acquire the same lock used in stop() before reading/writing
messageProcessingExecutor, perform the same shutdown sequence (shutdown,
awaitTermination with timeout/interrupt handling) and set
messageProcessingExecutor = null inside that locked section, mirroring stop()'s
behavior so both methods synchronize on lock and wait for executor termination
consistently.

Comment on lines +522 to 530
if (isMessageProcessorRunning()) {
try {
messageProcessingExecutor.execute(() -> {
processMessage(peer, m);
});
} catch (RejectedExecutionException e) {
// swallow because this is being stopped
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

TOCTOU race between isMessageProcessorRunning() check and messageProcessingExecutor.execute() can cause an uncaught NullPointerException.

isMessageProcessorRunning() reads the field (check), then the listener reads the field again to call .execute(). Between those two reads, stop() can write null to the field. The RejectedExecutionException catch does not cover NPE, so this will crash the network I/O thread. Fix: capture the reference once in a local variable and use it throughout.

🐛 Proposed fix
-            if (isMessageProcessorRunning()) {
-                try {
-                    messageProcessingExecutor.execute(() -> {
-                        processMessage(peer, m);
-                    });
-                } catch (RejectedExecutionException e) {
-                    // swallow because this is being stopped
-                }
-            }
+            ExecutorService exec = messageProcessingExecutor;
+            if (exec != null && !exec.isShutdown() && !exec.isTerminated()) {
+                try {
+                    exec.execute(() -> processMessage(peer, m));
+                } catch (RejectedExecutionException e) {
+                    // swallow because this is being stopped
+                }
+            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java` around
lines 522 - 530, There’s a TOCTOU between isMessageProcessorRunning() and using
messageProcessingExecutor that can lead to an NPE if stop() nulls the field; fix
it by reading messageProcessingExecutor once into a local variable (e.g., var
exec = messageProcessingExecutor), use exec for the running check and for
exec.execute(() -> processMessage(peer, m)), and wrap exec.execute in the
existing try/catch for RejectedExecutionException while also guarding with if
(exec != null) to avoid NullPointerException; update references to
messageProcessingExecutor, isMessageProcessorRunning(), stop(), processMessage,
and the RejectedExecutionException handling accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments