From 78c570c405d0e635b02af5fa64aa16c64204d3fc Mon Sep 17 00:00:00 2001
From: Leumor <116955025+leumor@users.noreply.github.com>
Date: Thu, 26 Feb 2026 04:56:45 +0000
Subject: [PATCH 1/3] fix(node-fcp): Improve insert stall handling and
diagnostics
---
.gitignore | 3 +
.../crypta/client/async/ChosenBlock.java | 12 ++
.../crypta/client/async/ChosenBlockImpl.java | 9 ++
.../crypta/client/async/ClientRequester.java | 37 ++++++
.../client/async/SingleBlockInserter.java | 3 +-
.../client/async/SplitFileInserterSender.java | 3 +-
.../network/crypta/clients/fcp/ClientGet.java | 3 +
.../network/crypta/clients/fcp/ClientPut.java | 3 +
.../crypta/clients/fcp/ClientPutMessage.java | 22 +++-
.../clients/fcp/ClientPutMimeResolver.java | 19 +++
.../crypta/clients/fcp/ClientRequest.java | 26 ++++
.../java/network/crypta/node/BaseSender.java | 18 ++-
.../network/crypta/node/CHKInsertHandler.java | 69 +++++++---
.../network/crypta/node/CHKInsertSender.java | 15 ++-
.../crypta/node/NodeClientCoreTransfers.java | 121 +++++++++++++++++-
.../crypta/node/PeerNodeTransport.java | 78 ++++++++++-
.../network/crypta/node/PeerTransport.java | 26 ++++
.../network/crypta/node/RequestTracker.java | 40 ++++++
.../crypta/node/SendableGetRequestSender.java | 3 +-
.../crypta/node/SimpleSendableInsert.java | 3 +-
src/main/java/network/crypta/node/UIDTag.java | 102 ++++++++++++---
.../network/crypta/node/UIDTraceLogger.java | 5 +-
.../async/SplitFileInserterSenderTest.java | 3 +-
.../clients/fcp/ClientPutMessageTest.java | 32 +++++
.../fcp/ClientPutMimeResolverTest.java | 61 +++++++++
.../crypta/clients/fcp/ClientPutTest.java | 11 ++
.../crypta/node/CHKInsertHandlerTest.java | 26 +++-
.../node/NodeClientCoreTransfersTest.java | 14 +-
.../crypta/node/RequestTrackerTest.java | 16 +++
.../node/SendableGetRequestSenderTest.java | 7 +-
.../crypta/node/SimpleSendableInsertTest.java | 10 +-
.../crypta/node/UIDTagHardTimeoutTest.java | 25 +++-
32 files changed, 753 insertions(+), 72 deletions(-)
diff --git a/.gitignore b/.gitignore
index 7beb9649eb9..677cc857f23 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,6 +53,9 @@ out/jreleaser
build-logic/build/
build-logic/bin/
tmp*
+/changelogfull.md
+/changelogfull.txt
+
# WiX/jpackage debug artifacts (decompiled MSI and relink outputs)
/Crypta-*.wxs
/.vscode
diff --git a/src/main/java/network/crypta/client/async/ChosenBlock.java b/src/main/java/network/crypta/client/async/ChosenBlock.java
index 0fd966c7ab4..60f961453d8 100644
--- a/src/main/java/network/crypta/client/async/ChosenBlock.java
+++ b/src/main/java/network/crypta/client/async/ChosenBlock.java
@@ -243,6 +243,18 @@ private record ConstructionState(
*/
public abstract short getPriority();
+ /**
+ * Returns an optional external identifier used for diagnostics correlation.
+ *
+ *
The default implementation returns {@code null}. Implementations that can map a chosen block
+ * to a client-visible request identifier should override this method.
+ *
+ * @return external identifier string, or {@code null} if unavailable
+ */
+ public String getExternalRequestIdentifier() {
+ return null;
+ }
+
private boolean sendIsBlocking;
/**
diff --git a/src/main/java/network/crypta/client/async/ChosenBlockImpl.java b/src/main/java/network/crypta/client/async/ChosenBlockImpl.java
index 3c9465c1a1d..712e938ca08 100644
--- a/src/main/java/network/crypta/client/async/ChosenBlockImpl.java
+++ b/src/main/java/network/crypta/client/async/ChosenBlockImpl.java
@@ -243,6 +243,15 @@ public short getPriority() {
return request.getPriorityClass();
}
+ @Override
+ public String getExternalRequestIdentifier() {
+ ClientRequester clientRequester = request.getClientRequest();
+ if (clientRequester == null) {
+ return null;
+ }
+ return clientRequester.getExternalRequestIdentifier();
+ }
+
/**
* Obtains the sender capable of executing this block under the provided context. The returned
* sender defines whether sending blocks and performs the actual network/storage interaction.
diff --git a/src/main/java/network/crypta/client/async/ClientRequester.java b/src/main/java/network/crypta/client/async/ClientRequester.java
index 04d8fabd4c0..89b8f021416 100644
--- a/src/main/java/network/crypta/client/async/ClientRequester.java
+++ b/src/main/java/network/crypta/client/async/ClientRequester.java
@@ -89,6 +89,14 @@ public abstract void onTransition(
*/
protected transient RequestClient client;
+ /**
+ * Optional external identifier used to correlate scheduler activity with client-visible requests.
+ *
+ *
Typical values include an FCP request identifier prefix such as {@code fcp:...}. The value
+ * is intentionally transient so persistence format remains unchanged.
+ */
+ private transient volatile String externalRequestIdentifier;
+
/**
* Returns the current scheduling priority class for this request.
*
@@ -571,6 +579,35 @@ public RequestClient getClient() {
return client;
}
+ /**
+ * Assigns an external correlation identifier for diagnostics.
+ *
+ *
This does not affect routing or scheduling; it only enriches logs when low-level stalls are
+ * reported.
+ *
+ * @param externalRequestIdentifier external identifier string, or {@code null} to clear
+ */
+ public final void setExternalRequestIdentifier(String externalRequestIdentifier) {
+ this.externalRequestIdentifier = normalizeExternalRequestIdentifier(externalRequestIdentifier);
+ }
+
+ /**
+ * Returns the optional external correlation identifier used for diagnostics.
+ *
+ * @return external identifier, or {@code null} if none has been assigned
+ */
+ public final String getExternalRequestIdentifier() {
+ return externalRequestIdentifier;
+ }
+
+ private static String normalizeExternalRequestIdentifier(String externalRequestIdentifier) {
+ if (externalRequestIdentifier == null) {
+ return null;
+ }
+ String normalized = externalRequestIdentifier.trim();
+ return normalized.isEmpty() ? null : normalized;
+ }
+
/**
* Changes the scheduling priority class of this request and re-registers all sub-requests.
*
diff --git a/src/main/java/network/crypta/client/async/SingleBlockInserter.java b/src/main/java/network/crypta/client/async/SingleBlockInserter.java
index 9d369eb57fe..4b24a177328 100644
--- a/src/main/java/network/crypta/client/async/SingleBlockInserter.java
+++ b/src/main/java/network/crypta/client/async/SingleBlockInserter.java
@@ -758,7 +758,8 @@ private void putToStore(NodeClientCore core, ChosenBlock req, KeyBlock b, Client
req.forkOnCacheable,
Node.PREFER_INSERT_DEFAULT,
Node.IGNORE_LOW_BACKOFF_DEFAULT,
- req.realTimeFlag);
+ req.realTimeFlag,
+ req.getExternalRequestIdentifier());
}
private boolean handleCollision(
diff --git a/src/main/java/network/crypta/client/async/SplitFileInserterSender.java b/src/main/java/network/crypta/client/async/SplitFileInserterSender.java
index 8c0be0ff8a8..1b93f5c765f 100644
--- a/src/main/java/network/crypta/client/async/SplitFileInserterSender.java
+++ b/src/main/java/network/crypta/client/async/SplitFileInserterSender.java
@@ -259,7 +259,8 @@ public boolean send(
request.forkOnCacheable,
Node.PREFER_INSERT_DEFAULT,
Node.IGNORE_LOW_BACKOFF_DEFAULT,
- request.realTimeFlag);
+ request.realTimeFlag,
+ request.getExternalRequestIdentifier());
}
request.onInsertSuccess(key, context);
} catch (final LowLevelPutException e) {
diff --git a/src/main/java/network/crypta/clients/fcp/ClientGet.java b/src/main/java/network/crypta/clients/fcp/ClientGet.java
index f5281d530db..dfa704e7a4f 100644
--- a/src/main/java/network/crypta/clients/fcp/ClientGet.java
+++ b/src/main/java/network/crypta/clients/fcp/ClientGet.java
@@ -270,6 +270,7 @@ record ClientGetSetup(
this.extensionCheck = returnSetup.extension();
this.initialMetadata = setup.initialMetadata();
getter = makeGetter(setup.core(), returnSetup.bucket());
+ applyDiagnosticIdentifier(getter);
initHelpers();
}
@@ -290,6 +291,7 @@ record ClientGetSetup(
this.extensionCheck = returnSetup.extension();
this.initialMetadata = setup.initialMetadata();
getter = makeGetter(setup.core(), returnSetup.bucket());
+ applyDiagnosticIdentifier(getter);
initHelpers();
}
@@ -1183,6 +1185,7 @@ public void getClientDetail(DataOutputStream dos, ChecksumChecker checker) throw
restoredGetter = makeGetterForPersistence(makeBucket(false));
}
getter = restoredGetter;
+ applyDiagnosticIdentifier(getter);
initHelpers();
}
diff --git a/src/main/java/network/crypta/clients/fcp/ClientPut.java b/src/main/java/network/crypta/clients/fcp/ClientPut.java
index 8e68b7e4de6..be023287db3 100644
--- a/src/main/java/network/crypta/clients/fcp/ClientPut.java
+++ b/src/main/java/network/crypta/clients/fcp/ClientPut.java
@@ -191,6 +191,7 @@ public ClientPut(
options.overrideSplitfileCryptoKey(),
-1);
putter = ClientPutPutterFactory.create(putterRequest, putterOptions);
+ applyDiagnosticIdentifier(putter);
}
/**
@@ -291,6 +292,7 @@ public ClientPut(FCPConnectionHandler handler, ClientPutMessage message, FCPServ
message.overrideSplitfileCryptoKey,
message.metadataThreshold);
putter = ClientPutPutterFactory.create(putterRequest, putterOptions);
+ applyDiagnosticIdentifier(putter);
}
/**
@@ -839,6 +841,7 @@ RequestStatus getStatus() {
*/
@Override
public void innerResume(ClientContext context) throws ResumeFailedException {
+ applyDiagnosticIdentifier(putter);
if (data != null) data.onResume(context);
}
diff --git a/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java b/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java
index 3c702022483..cfc8111596f 100644
--- a/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java
+++ b/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java
@@ -144,7 +144,7 @@ public ClientPutMessage(SimpleFieldSet fs) throws MessageInvalidException {
fileHash = fs.get(ClientPutBase.FILE_HASH);
- UploadConfig uploadConfig = parseUploadSource(fs, identifier, global, filenameHint);
+ UploadConfig uploadConfig = parseUploadSource(fs, identifier, global, filenameHint, uri);
payloadLength = uploadConfig.length();
uploadFromType = uploadConfig.type();
origFilename = uploadConfig.originalFile();
@@ -299,7 +299,11 @@ private static short parsePriorityClass(String priorityString, String identifier
}
private UploadConfig parseUploadSource(
- SimpleFieldSet fs, String identifier, boolean global, String filenameHint)
+ SimpleFieldSet fs,
+ String identifier,
+ boolean global,
+ String filenameHint,
+ FreenetURI parsedUri)
throws MessageInvalidException {
String uploadFrom = fs.get(FIELD_UPLOAD_FROM);
if (uploadFrom == null || uploadFrom.equalsIgnoreCase("direct")) {
@@ -316,7 +320,10 @@ private UploadConfig parseUploadSource(
throw new MessageInvalidException(
ProtocolErrorMessage.FILE_NOT_FOUND, null, identifier, global);
bucket = new FileBucket(f, true, false, false, false);
- String resolvedName = filenameHint != null ? filenameHint : f.getName();
+ String resolvedName = filenameHint;
+ if (resolvedName == null && shouldInferDiskFilenameHint(parsedUri)) {
+ resolvedName = f.getName();
+ }
return new UploadConfig(f.length(), UploadFrom.DISK, f, null, resolvedName);
}
if (uploadFrom.equalsIgnoreCase("redirect")) {
@@ -405,6 +412,15 @@ private static String parseCompressorDescriptor(String codecs, String identifier
}
}
+ private static boolean shouldInferDiskFilenameHint(FreenetURI uri) {
+ // For bare CHK inserts we do not infer a target filename from the local disk path.
+ // Preserving a null filename keeps semantics aligned with direct payload uploads and avoids
+ // forcing a metadata wrapper when the client did not explicitly request one.
+ return !(uri.getRoutingKey() == null
+ && uri.getDocName() == null
+ && "CHK".equals(uri.getKeyType()));
+ }
+
private record UriParseResult(FreenetURI uri, String filenameHint) {}
private record UploadConfig(
diff --git a/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java b/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java
index 7d86c34f33e..b5b56342e4e 100644
--- a/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java
+++ b/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java
@@ -3,6 +3,7 @@
import java.io.File;
import network.crypta.client.DefaultMIMETypes;
import network.crypta.client.async.BinaryBlob;
+import network.crypta.keys.FreenetURI;
/**
* Resolves and validates MIME types for {@link ClientPut} requests.
@@ -88,6 +89,24 @@ static String resolve(
identifier,
global);
}
+ if (shouldSuppressMimeForDiskBareChk(message, targetFilename)) {
+ // Disk uploads for bare CHK@ without an explicit TargetFilename should follow direct-mode
+ // semantics and avoid forcing a two-block metadata wrapper solely for MIME information.
+ mimeType = null;
+ }
return mimeType;
}
+
+ private static boolean shouldSuppressMimeForDiskBareChk(
+ ClientPutMessage message, String targetFilename) {
+ return message.uploadFromType == ClientPutBase.UploadFrom.DISK
+ && targetFilename == null
+ && isBareChkInsertUri(message.uri);
+ }
+
+ private static boolean isBareChkInsertUri(FreenetURI uri) {
+ return uri.getRoutingKey() == null
+ && uri.getDocName() == null
+ && "CHK".equals(uri.getKeyType());
+ }
}
diff --git a/src/main/java/network/crypta/clients/fcp/ClientRequest.java b/src/main/java/network/crypta/clients/fcp/ClientRequest.java
index 6851b7c06b7..1f38684d37c 100644
--- a/src/main/java/network/crypta/clients/fcp/ClientRequest.java
+++ b/src/main/java/network/crypta/clients/fcp/ClientRequest.java
@@ -472,6 +472,32 @@ public String getIdentifier() {
return identifier;
}
+ /**
+ * Builds the diagnostics correlation identifier for this request.
+ *
+ *
The identifier is prefixed to make it explicit that the value originates from FCP.
+ *
+ * @return prefixed identifier string, or {@code null} when no identifier is available
+ */
+ protected final String diagnosticIdentifier() {
+ if (identifier == null) {
+ return null;
+ }
+ return "fcp:" + identifier;
+ }
+
+ /**
+ * Assigns this request's diagnostics identifier to a low-level requester.
+ *
+ * @param requester requester that should carry the diagnostics identifier
+ */
+ protected final void applyDiagnosticIdentifier(ClientRequester requester) {
+ if (requester == null) {
+ return;
+ }
+ requester.setExternalRequestIdentifier(diagnosticIdentifier());
+ }
+
/**
* Returns the underlying {@link ClientRequester} that performs the actual network work.
*
diff --git a/src/main/java/network/crypta/node/BaseSender.java b/src/main/java/network/crypta/node/BaseSender.java
index d0608987095..d6b9b950cfc 100644
--- a/src/main/java/network/crypta/node/BaseSender.java
+++ b/src/main/java/network/crypta/node/BaseSender.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Base class for request and insert senders.
@@ -71,6 +72,13 @@ public abstract class BaseSender implements ByteCounter, HighHtlAware {
long uid;
static final long SEARCH_TIMEOUT_BULK = MINUTES.toMillis(10);
static final long SEARCH_TIMEOUT_REALTIME = MINUTES.toMillis(1);
+
+ /** Keep legacy-route request sends short so blocked peer queues fail fast and reroute. */
+ static final long ROUTE_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15);
+
+ /** Extra wait after un-queue failure for route-request sends. */
+ static final long ROUTE_SEND_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2);
+
final int incomingSearchTimeout;
BaseSender(Key key, boolean realTimeFlag, PeerNode source, Node node, short htl, long uid) {
@@ -298,7 +306,9 @@ protected void innerRouteRequestsOld(PeerNode next, UIDTag origTag) {
// waiting in the peer’s sending queue.
// - sendAsync would increase ACCEPTED_TIMEOUT risk and leave many hanging requests, further
// overloading peers. Hence, we do NOT use sendAsync here.
- next.transport().sendSync(req, this, realTimeFlag);
+ next.transport()
+ .sendSync(
+ req, this, realTimeFlag, ROUTE_SEND_TIMEOUT_MILLIS, ROUTE_SEND_UNQUEUE_WAIT_MILLIS);
PeerNodeRoutingReporter.reportRoutedTo(
node,
next,
@@ -682,7 +692,11 @@ private boolean sendToPeer(NlmState state, UIDTag origTag, Message req) {
return false;
}
try {
- state.next.transport().sendSync(req, this, realTimeFlag);
+ state
+ .next
+ .transport()
+ .sendSync(
+ req, this, realTimeFlag, ROUTE_SEND_TIMEOUT_MILLIS, ROUTE_SEND_UNQUEUE_WAIT_MILLIS);
PeerNodeRoutingReporter.reportRoutedTo(
node,
state.next,
diff --git a/src/main/java/network/crypta/node/CHKInsertHandler.java b/src/main/java/network/crypta/node/CHKInsertHandler.java
index f541c4e42a5..1cd28460709 100644
--- a/src/main/java/network/crypta/node/CHKInsertHandler.java
+++ b/src/main/java/network/crypta/node/CHKInsertHandler.java
@@ -67,6 +67,24 @@ public class CHKInsertHandler implements PrioRunnable, ByteCounter {
static final long DATA_INSERT_TIMEOUT = SECONDS.toMillis(10);
+ /**
+ * Bound synchronous waiting for the initial {@code FNPAccepted} send so handler threads do not
+ * stay blocked on overloaded peers for the full transport default timeout.
+ */
+ private static final long ACCEPTED_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15);
+
+ /** Additional wait after an un-queue failure when sending {@code FNPAccepted}. */
+ private static final long ACCEPTED_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2);
+
+ /**
+ * Bound synchronous waiting for {@code FNPInsertTransfersCompleted} so byte accounting remains
+ * accurate without blocking handlers for the full transport default timeout.
+ */
+ private static final long COMPLETION_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15);
+
+ /** Additional wait after an un-queue failure when sending completion to the source peer. */
+ private static final long COMPLETION_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2);
+
final Node node;
final long uid;
final PeerNode source;
@@ -166,9 +184,16 @@ private boolean sendAccepted() {
// Consider inserting rate limiting here if the acceptance path requires backpressure.
Message accepted = DMT.createFNPAccepted(uid);
try {
- // Synchronous send here ensures the next message filter does not spuriously time out; we
- // either block here, or inside the filter, but we prefer to fail early on sending.
- source.transport().sendSync(accepted, this, realTimeFlag);
+ // Preserve send ordering with a bounded wait to avoid pinning handler threads for long
+ // transport-level timeouts.
+ source
+ .transport()
+ .sendSync(
+ accepted,
+ this,
+ realTimeFlag,
+ ACCEPTED_SEND_TIMEOUT_MILLIS,
+ ACCEPTED_UNQUEUE_WAIT_MILLIS);
return true;
} catch (NotConnectedException _) {
if (LOG.isDebugEnabled()) LOG.debug("Lost connection to source while sending FNPAccepted");
@@ -280,16 +305,16 @@ private boolean forwardNonTerminalOverloadIfNeeded(boolean alreadyForwarded) {
private void waitOnSender() {
final CHKInsertSender s = sender;
synchronized (s) {
- try {
- long deadline = System.currentTimeMillis() + 5000L;
- long remaining = deadline - System.currentTimeMillis();
- while (s.getStatus() == CHKInsertSender.NOT_FINISHED && remaining > 0L) {
+ long deadline = System.currentTimeMillis() + 5000L;
+ long remaining = deadline - System.currentTimeMillis();
+ while (s.getStatus() == CHKInsertSender.NOT_FINISHED && remaining > 0L) {
+ try {
s.wait(remaining);
- remaining = deadline - System.currentTimeMillis();
+ } catch (InterruptedException _) {
+ // Interrupts here are wake-ups from receive-failure handling; consume and re-check.
+ break;
}
- } catch (InterruptedException _) {
- // Restore interrupt status; likely set by receive failing.
- Thread.currentThread().interrupt();
+ remaining = deadline - System.currentTimeMillis();
}
}
}
@@ -556,8 +581,8 @@ private void waitForReceiveToComplete() {
try {
wait(SECONDS.toMillis(100));
} catch (InterruptedException _) {
- // Restore interrupted status
- Thread.currentThread().interrupt();
+ // Interrupts are used to wake this loop when receive-failure completion runs.
+ // Reasserting here can cause immediate rethrows and prevent completion from taking lock.
}
}
}
@@ -607,8 +632,7 @@ private boolean waitUntilSenderCompletedWithin(long deadlineMillis) {
if (t > 0) s.wait(t);
else return true; // took too long
} catch (InterruptedException _) {
- // Restore interrupted status and loop
- Thread.currentThread().interrupt();
+ // Interrupts here are used as wake-ups; consume and continue checking sender state.
}
}
}
@@ -622,8 +646,7 @@ private void waitUntilSenderCompletedNoTimeout() {
try {
s.wait(SECONDS.toMillis(10));
} catch (InterruptedException _) {
- // Restore interrupted status and loop
- Thread.currentThread().interrupt();
+ // Interrupts here are used as wake-ups; consume and continue checking sender state.
}
}
}
@@ -631,8 +654,16 @@ private void waitUntilSenderCompletedNoTimeout() {
private void sendCompletion(Message m) {
try {
- // We do need to sendSync here so we have accurate byte counter totals.
- source.transport().sendSync(m, this, realTimeFlag);
+ // Keep completion synchronous so handler/sender byte totals include this message before we
+ // report aggregate insert costs.
+ source
+ .transport()
+ .sendSync(
+ m,
+ this,
+ realTimeFlag,
+ COMPLETION_SEND_TIMEOUT_MILLIS,
+ COMPLETION_UNQUEUE_WAIT_MILLIS);
if (LOG.isDebugEnabled()) LOG.debug("Sent completion: {}" + FOR_STRING + "{}", m, this);
} catch (NotConnectedException _) {
if (LOG.isDebugEnabled()) LOG.debug("Not connected: {}" + FOR_STRING + "{}", source, this);
diff --git a/src/main/java/network/crypta/node/CHKInsertSender.java b/src/main/java/network/crypta/node/CHKInsertSender.java
index 8327990b055..e7ef6d0c02c 100644
--- a/src/main/java/network/crypta/node/CHKInsertSender.java
+++ b/src/main/java/network/crypta/node/CHKInsertSender.java
@@ -419,6 +419,13 @@ public void start() {
// Constants
static final long ACCEPTED_TIMEOUT = SECONDS.toMillis(10);
+
+ /** Bound waiting for initial {@code FNPDataInsert} queueing so routing can fail over quickly. */
+ static final long DATA_INSERT_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15);
+
+ /** Additional wait after un-queue failure for {@code FNPDataInsert}. */
+ static final long DATA_INSERT_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2);
+
static final long TRANSFER_COMPLETION_ACK_TIMEOUT_REALTIME = MINUTES.toMillis(1);
static final long TRANSFER_COMPLETION_ACK_TIMEOUT_BULK = MINUTES.toMillis(5);
@@ -1502,7 +1509,13 @@ protected void onAccepted(PeerNode next) {
if (LOG.isDebugEnabled()) LOG.debug("Sending DataInsert");
try {
- next.transport().sendSync(dataInsert, this, realTimeFlag);
+ next.transport()
+ .sendSync(
+ dataInsert,
+ this,
+ realTimeFlag,
+ DATA_INSERT_SEND_TIMEOUT_MILLIS,
+ DATA_INSERT_UNQUEUE_WAIT_MILLIS);
} catch (NotConnectedException _) {
if (LOG.isDebugEnabled())
LOG.debug("Not connected sending DataInsert: {}" + FOR + "{}", next, uid);
diff --git a/src/main/java/network/crypta/node/NodeClientCoreTransfers.java b/src/main/java/network/crypta/node/NodeClientCoreTransfers.java
index 2f09f9c8964..f9c2eb028ab 100644
--- a/src/main/java/network/crypta/node/NodeClientCoreTransfers.java
+++ b/src/main/java/network/crypta/node/NodeClientCoreTransfers.java
@@ -92,10 +92,28 @@ public void asyncGet(
final Key key,
final RequestCompletionListener listener,
NodeRoutingSubsystem.RequestSenderOptions options) {
+ asyncGet(key, listener, options, null);
+ }
+
+ /**
+ * Schedules a non-blocking fetch and records an optional external diagnostics identifier.
+ *
+ * @param key key to fetch; typically a {@link Key} or {@link NodeSSK}.
+ * @param listener callback notified for success, failure, or local-store hits.
+ * @param options request flags controlling store access, cache usage, offered-key handling, and
+ * real-time scheduling.
+ * @param externalRequestIdentifier optional external identifier used for diagnostics correlation
+ */
+ public void asyncGet(
+ final Key key,
+ final RequestCompletionListener listener,
+ NodeRoutingSubsystem.RequestSenderOptions options,
+ String externalRequestIdentifier) {
final long uid = makeUID();
final boolean isSSK = key instanceof NodeSSK;
final RequestTag tag =
new RequestTag(isSSK, RequestTag.START.ASYNC_GET, null, options.realTimeFlag(), uid, node);
+ tag.setExternalRequestIdentifier(externalRequestIdentifier);
if (!node.routing()
.tracker()
.lockUID(
@@ -654,6 +672,38 @@ public void realPut(
boolean ignoreLowBackoff,
boolean realTimeFlag)
throws LowLevelPutException {
+ realPut(
+ block,
+ canWriteClientCache,
+ forkOnCacheable,
+ preferInsert,
+ ignoreLowBackoff,
+ realTimeFlag,
+ null);
+ }
+
+ /**
+ * Synchronously inserts a CHK or SSK block into the network with optional diagnostics metadata.
+ *
+ * @param block block to insert; must be a {@link CHKBlock} or {@link SSKBlock}.
+ * @param canWriteClientCache whether the insert may update the client cache.
+ * @param forkOnCacheable true to fork inserts when the block is cacheable.
+ * @param preferInsert true to prefer insert strategies when alternatives exist.
+ * @param ignoreLowBackoff true to ignore low backoff during routing decisions.
+ * @param realTimeFlag true for latency-optimized inserts; false for bulk routing.
+ * @param externalRequestIdentifier optional external identifier used for diagnostics correlation
+ * @throws LowLevelPutException when routing fails, overload occurs, or internal errors arise.
+ * @throws IllegalArgumentException if the block type is unsupported for insert.
+ */
+ public void realPut(
+ KeyBlock block,
+ boolean canWriteClientCache,
+ boolean forkOnCacheable,
+ boolean preferInsert,
+ boolean ignoreLowBackoff,
+ boolean realTimeFlag,
+ String externalRequestIdentifier)
+ throws LowLevelPutException {
switch (block) {
case CHKBlock kBlock1 ->
realPutCHK(
@@ -662,7 +712,8 @@ public void realPut(
forkOnCacheable,
preferInsert,
ignoreLowBackoff,
- realTimeFlag);
+ realTimeFlag,
+ externalRequestIdentifier);
case SSKBlock kBlock ->
realPutSSK(
kBlock,
@@ -670,7 +721,8 @@ public void realPut(
forkOnCacheable,
preferInsert,
ignoreLowBackoff,
- realTimeFlag);
+ realTimeFlag,
+ externalRequestIdentifier);
default -> throw new IllegalArgumentException("Unknown put type " + block.getClass());
}
}
@@ -700,6 +752,38 @@ public void realPutCHK(
boolean ignoreLowBackoff,
boolean realTimeFlag)
throws LowLevelPutException {
+ realPutCHK(
+ block,
+ canWriteClientCache,
+ forkOnCacheable,
+ preferInsert,
+ ignoreLowBackoff,
+ realTimeFlag,
+ null);
+ }
+
+ /**
+ * Synchronously inserts a CHK block and applies local caching rules.
+ *
+ * @param block CHK block to insert and potentially store locally.
+ * @param canWriteClientCache whether the insert may update the client cache.
+ * @param forkOnCacheable true to fork inserts when the block is cacheable.
+ * @param preferInsert true to prefer insert strategies when alternatives exist.
+ * @param ignoreLowBackoff true to ignore low backoff during routing decisions.
+ * @param realTimeFlag true for latency-optimized inserts; false for bulk routing.
+ * @param externalRequestIdentifier optional external identifier used for diagnostics correlation
+ * @throws LowLevelPutException when routing fails, overload is reported, or internal errors
+ * arise.
+ */
+ public void realPutCHK(
+ CHKBlock block,
+ boolean canWriteClientCache,
+ boolean forkOnCacheable,
+ boolean preferInsert,
+ boolean ignoreLowBackoff,
+ boolean realTimeFlag,
+ String externalRequestIdentifier)
+ throws LowLevelPutException {
byte[] data = block.getData();
byte[] headers = block.getHeaders();
NodeRoutingSubsystem.ChkInsertOptions options =
@@ -714,6 +798,7 @@ public void realPutCHK(
CHKInsertSender is;
long uid = makeUID();
InsertTag tag = new InsertTag(false, InsertTag.START.LOCAL, null, realTimeFlag, uid, node);
+ tag.setExternalRequestIdentifier(externalRequestIdentifier);
if (!node.routing()
.tracker()
.lockUID(uid, RequestAdmissionMode.of(true, false, true, false, realTimeFlag), tag)) {
@@ -886,9 +971,41 @@ public void realPutSSK(
boolean ignoreLowBackoff,
boolean realTimeFlag)
throws LowLevelPutException {
+ realPutSSK(
+ block,
+ canWriteClientCache,
+ forkOnCacheable,
+ preferInsert,
+ ignoreLowBackoff,
+ realTimeFlag,
+ null);
+ }
+
+ /**
+ * Synchronously inserts an SSK block and handles collision checks.
+ *
+ * @param block SSK block to insert and potentially store locally.
+ * @param canWriteClientCache whether the insert may update the client cache.
+ * @param forkOnCacheable true to fork inserts when the block is cacheable.
+ * @param preferInsert true to prefer insert strategies when alternatives exist.
+ * @param ignoreLowBackoff true to ignore low backoff during routing decisions.
+ * @param realTimeFlag true for latency-optimized inserts; false for bulk routing.
+ * @param externalRequestIdentifier optional external identifier used for diagnostics correlation
+ * @throws LowLevelPutException on collision, routing failure, overload, or internal errors.
+ */
+ public void realPutSSK(
+ SSKBlock block,
+ boolean canWriteClientCache,
+ boolean forkOnCacheable,
+ boolean preferInsert,
+ boolean ignoreLowBackoff,
+ boolean realTimeFlag,
+ String externalRequestIdentifier)
+ throws LowLevelPutException {
SSKInsertSender is;
long uid = makeUID();
InsertTag tag = new InsertTag(true, InsertTag.START.LOCAL, null, realTimeFlag, uid, node);
+ tag.setExternalRequestIdentifier(externalRequestIdentifier);
if (!node.routing()
.tracker()
.lockUID(uid, RequestAdmissionMode.of(true, true, true, false, realTimeFlag), tag)) {
diff --git a/src/main/java/network/crypta/node/PeerNodeTransport.java b/src/main/java/network/crypta/node/PeerNodeTransport.java
index a1ce5e03ec8..a2306599aef 100644
--- a/src/main/java/network/crypta/node/PeerNodeTransport.java
+++ b/src/main/java/network/crypta/node/PeerNodeTransport.java
@@ -54,6 +54,19 @@ final class PeerNodeTransport implements PeerTransport {
/** Separator used in log messages to keep peer identifiers readable. */
private static final String STR_FOR = " for ";
+ /**
+ * Default upper bound for synchronous send waits before attempting un-queue.
+ *
+ *
Kept at one minute to preserve historical behavior for callers that do not provide explicit
+ * timeout bounds.
+ */
+ private static final long SEND_SYNC_WAIT_MILLIS_DEFAULT = MINUTES.toMillis(1);
+
+ /**
+ * Default extra wait after un-queue failure in {@link #sendSync(Message, ByteCounter, boolean)}.
+ */
+ private static final long SEND_SYNC_UNQUEUE_WAIT_MILLIS_DEFAULT = SECONDS.toMillis(10);
+
/** Owning peer for this transport; provides queues, stats, and crypto context. */
private final PeerNode peer;
@@ -188,20 +201,49 @@ public MessageItem sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter c
@Override
public void sendSync(Message req, ByteCounter ctr, boolean realTime)
throws NotConnectedException, SyncSendWaitedTooLongException {
+ sendSync(
+ req, ctr, realTime, SEND_SYNC_WAIT_MILLIS_DEFAULT, SEND_SYNC_UNQUEUE_WAIT_MILLIS_DEFAULT);
+ }
+
+ @Override
+ public void sendSync(
+ Message req,
+ ByteCounter ctr,
+ boolean realTime,
+ long sendTimeoutMillis,
+ long unqueueWaitMillis)
+ throws NotConnectedException, SyncSendWaitedTooLongException {
SyncMessageCallback cb = new SyncMessageCallback();
MessageItem item = sendAsync(req, cb, ctr);
- cb.waitForSend(MINUTES.toMillis(1));
+ long boundedSendTimeout = Math.max(1L, sendTimeoutMillis);
+ long boundedUnqueueTimeout = Math.max(1L, unqueueWaitMillis);
+ cb.waitForSend(boundedSendTimeout);
if (!cb.done) {
- LOG.warn("Waited too long for a blocking send for {} to {}", req, peer.selfPeerNode());
+ SendWaitDiagnostics diagnostics = buildSendWaitDiagnostics(item);
+ LOG.warn(
+ "Waited too long for a blocking send for {} to {} (sendWaitAgeMs={} uid={} external={}"
+ + " uidAgeMs={})",
+ req,
+ peer.selfPeerNode(),
+ diagnostics.sendWaitAgeMs(),
+ diagnostics.uidForLog(),
+ diagnostics.externalIdentifierForLog(),
+ diagnostics.uidAgeMsForLog());
peer.localRejectedOverload("SendSyncTimeout", realTime);
// Try to un-queue it, since it presumably won't be of any use now.
if (!peer.getMessageQueue().removeMessage(item)) {
- cb.waitForSend(SECONDS.toMillis(10));
+ cb.waitForSend(boundedUnqueueTimeout);
if (!cb.done) {
+ SendWaitDiagnostics secondDiagnostics = buildSendWaitDiagnostics(item);
LOG.error(
- "Waited too long for blocking send and then could not un-queue for {} to {}",
+ "Waited too long for blocking send and then could not un-queue for {} to {}"
+ + " (sendWaitAgeMs={} uid={} external={} uidAgeMs={})",
req,
peer.selfPeerNode(),
+ secondDiagnostics.sendWaitAgeMs(),
+ secondDiagnostics.uidForLog(),
+ secondDiagnostics.externalIdentifierForLog(),
+ secondDiagnostics.uidAgeMsForLog(),
new Exception(STR_ERROR));
// Can't cancel yet, can't send it, something seriously wrong.
// Treat as fatal timeout as probably their fault.
@@ -216,6 +258,34 @@ public void sendSync(Message req, ByteCounter ctr, boolean realTime)
}
}
+ private SendWaitDiagnostics buildSendWaitDiagnostics(MessageItem item) {
+ long sendWaitAgeMs = Math.max(0L, System.currentTimeMillis() - item.submitted);
+ long uid = item.getID();
+ if (uid < 0) {
+ return new SendWaitDiagnostics(sendWaitAgeMs, null, null, null);
+ }
+ UIDTag tag = peer.node.routing().tracker().findTagByUid(uid);
+ String externalIdentifier = tag == null ? null : tag.getExternalRequestIdentifier();
+ Long uidAgeMs = tag == null ? null : tag.age();
+ return new SendWaitDiagnostics(sendWaitAgeMs, uid, externalIdentifier, uidAgeMs);
+ }
+
+ private record SendWaitDiagnostics(
+ long sendWaitAgeMs, Long uid, String externalIdentifier, Long uidAgeMs) {
+
+ String uidForLog() {
+ return uid == null ? "unknown" : Long.toString(uid);
+ }
+
+ String externalIdentifierForLog() {
+ return externalIdentifier == null ? "n/a" : externalIdentifier;
+ }
+
+ String uidAgeMsForLog() {
+ return uidAgeMs == null ? "n/a" : Long.toString(uidAgeMs);
+ }
+ }
+
/**
* Sends a ping and waits briefly for a matching pong.
*
diff --git a/src/main/java/network/crypta/node/PeerTransport.java b/src/main/java/network/crypta/node/PeerTransport.java
index db44fa665e5..54d62b7a994 100644
--- a/src/main/java/network/crypta/node/PeerTransport.java
+++ b/src/main/java/network/crypta/node/PeerTransport.java
@@ -79,6 +79,32 @@ MessageItem sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter ctr)
void sendSync(Message req, ByteCounter ctr, boolean realTime)
throws NotConnectedException, SyncSendWaitedTooLongException;
+ /**
+ * Sends a message synchronously using explicit timeout bounds.
+ *
+ *
Implementations may override this to provide tuned timeouts for specific call sites while
+ * preserving the same synchronous semantics as {@link #sendSync(Message, ByteCounter, boolean)}.
+ * The default implementation delegates to the legacy method and ignores the timeout parameters to
+ * keep backward compatibility for transports that do not need custom bounds.
+ *
+ * @param req message to send; must be a fully populated, encodable instance
+ * @param ctr byte counter used for bandwidth accounting; may be {@code null}
+ * @param realTime {@code true} to request real-time scheduling; {@code false} otherwise
+ * @param sendTimeoutMillis primary wait before trying to un-queue a blocked send
+ * @param unqueueWaitMillis additional wait after un-queue failure before giving up
+ * @throws NotConnectedException if the peer disconnects before the send completes
+ * @throws SyncSendWaitedTooLongException if the acknowledgement does not arrive in time
+ */
+ default void sendSync(
+ Message req,
+ ByteCounter ctr,
+ boolean realTime,
+ long sendTimeoutMillis,
+ long unqueueWaitMillis)
+ throws NotConnectedException, SyncSendWaitedTooLongException {
+ sendSync(req, ctr, realTime);
+ }
+
/**
* Sends a low-level ping and waits for a corresponding pong.
*
diff --git a/src/main/java/network/crypta/node/RequestTracker.java b/src/main/java/network/crypta/node/RequestTracker.java
index ef8dbf7ce58..b8245650587 100644
--- a/src/main/java/network/crypta/node/RequestTracker.java
+++ b/src/main/java/network/crypta/node/RequestTracker.java
@@ -736,6 +736,46 @@ else if (mode.isInsert())
else return getRequestTracker(mode.isSSK(), mode.isLocal(), mode.realTimeFlag());
}
+ /**
+ * Finds the currently tracked tag for the given UID across all request categories.
+ *
+ *
This lookup is intended for diagnostics paths and therefore favors clarity over micro
+ * optimizations.
+ *
+ * @param uid uid to search for
+ * @return matching in-flight tag, or {@code null} when no active tag owns the uid
+ */
+ public UIDTag findTagByUid(long uid) {
+ return findTagInMaps(
+ uid,
+ runningCHKGetUIDsRT,
+ runningSSKGetUIDsRT,
+ runningCHKGetUIDsBulk,
+ runningSSKGetUIDsBulk,
+ runningCHKPutUIDsRT,
+ runningSSKPutUIDsRT,
+ runningCHKPutUIDsBulk,
+ runningSSKPutUIDsBulk,
+ runningCHKOfferReplyUIDsRT,
+ runningSSKOfferReplyUIDsRT,
+ runningCHKOfferReplyUIDsBulk,
+ runningSSKOfferReplyUIDsBulk);
+ }
+
+ @SafeVarargs
+ private static UIDTag findTagInMaps(long uid, Map... maps) {
+ for (Map map : maps) {
+ UIDTag found;
+ synchronized (map) {
+ found = map.get(uid);
+ }
+ if (found != null) {
+ return found;
+ }
+ }
+ return null;
+ }
+
private Map getRequestTracker(
boolean ssk, boolean local, boolean realTimeFlag) {
return realTimeFlag ? getRequestTrackerRT(ssk, local) : getRequestTrackerBulk(ssk, local);
diff --git a/src/main/java/network/crypta/node/SendableGetRequestSender.java b/src/main/java/network/crypta/node/SendableGetRequestSender.java
index 8cdec3d391e..347211ef612 100644
--- a/src/main/java/network/crypta/node/SendableGetRequestSender.java
+++ b/src/main/java/network/crypta/node/SendableGetRequestSender.java
@@ -120,7 +120,8 @@ public void onFailed(LowLevelGetException e) {
req.onFailure(e, context);
}
},
- options);
+ options,
+ req.getExternalRequestIdentifier());
} catch (RuntimeException | Error t) {
// Convert unexpected throwables into a failure callback to keep the scheduler healthy.
LOG.error("Unhandled throwable in send: {}", t, t);
diff --git a/src/main/java/network/crypta/node/SimpleSendableInsert.java b/src/main/java/network/crypta/node/SimpleSendableInsert.java
index 86b39def1c3..dbf0d845848 100644
--- a/src/main/java/network/crypta/node/SimpleSendableInsert.java
+++ b/src/main/java/network/crypta/node/SimpleSendableInsert.java
@@ -269,7 +269,8 @@ public boolean send(
Node.FORK_ON_CACHEABLE_DEFAULT,
Node.PREFER_INSERT_DEFAULT,
Node.IGNORE_LOW_BACKOFF_DEFAULT,
- false);
+ false,
+ req.getExternalRequestIdentifier());
succeeded = true;
} catch (LowLevelPutException e) {
onFailure(e, req.token, context);
diff --git a/src/main/java/network/crypta/node/UIDTag.java b/src/main/java/network/crypta/node/UIDTag.java
index 3d8f7cf96b2..3d69e1079e0 100644
--- a/src/main/java/network/crypta/node/UIDTag.java
+++ b/src/main/java/network/crypta/node/UIDTag.java
@@ -109,6 +109,9 @@ public abstract class UIDTag {
final long uid;
+ /** Optional external request identifier used for diagnostics correlation (for example FCP id). */
+ private volatile String externalRequestIdentifier;
+
/**
* Tracks whether the inbound handler has completed and unlocked.
*
@@ -159,6 +162,39 @@ long age() {
return System.currentTimeMillis() - createdTime;
}
+ /**
+ * Associates an external diagnostics identifier with this in-flight UID.
+ *
+ * This is best-effort metadata and does not affect routing decisions.
+ *
+ * @param externalRequestIdentifier optional external identifier (for example {@code fcp:})
+ */
+ public final void setExternalRequestIdentifier(String externalRequestIdentifier) {
+ String normalized = normalizeExternalRequestIdentifier(externalRequestIdentifier);
+ if (normalized == null || normalized.equals(this.externalRequestIdentifier)) {
+ return;
+ }
+ this.externalRequestIdentifier = normalized;
+ UIDTraceLogger.log("externalIdentifier", this, () -> "value=" + normalized);
+ }
+
+ /**
+ * Returns the external diagnostics identifier associated with this UID.
+ *
+ * @return external identifier, or {@code null} if none has been assigned
+ */
+ public final String getExternalRequestIdentifier() {
+ return externalRequestIdentifier;
+ }
+
+ private static String normalizeExternalRequestIdentifier(String externalRequestIdentifier) {
+ if (externalRequestIdentifier == null) {
+ return null;
+ }
+ String normalized = externalRequestIdentifier.trim();
+ return normalized.isEmpty() ? null : normalized;
+ }
+
/**
* Records that this tag is routing to a peer or fetching an offered key.
*
@@ -689,6 +725,9 @@ public synchronized String toString() {
sb.append(" (fetch offered keys from ").append(fetchingOfferedKeyFrom.size()).append(")");
if (sourceRestarted) sb.append(" (source restarted)");
if (timedOutButContinued) sb.append(" (timed out but continued)");
+ if (externalRequestIdentifier != null) {
+ sb.append(" (external=").append(externalRequestIdentifier).append(")");
+ }
return sb.toString();
}
@@ -746,7 +785,7 @@ public synchronized void setAccepted() {
private boolean timedOutButContinued;
private long timeoutContinueAt;
- private boolean hardTimeoutPeersTriggered;
+ private boolean hardTimeoutPeersLogged;
private boolean hardTimeoutWithoutPeersTriggered;
/**
@@ -776,7 +815,11 @@ private void maybeForceHardTimeout(long now) {
}
return;
}
- logAndForcePeerTimeout(context.continueAge(), context.routingPeers(), context.offeredPeers());
+ logAndForcePeerTimeout(
+ context.continueAge(),
+ context.routingPeers(),
+ context.offeredPeers(),
+ context.firstPeerForce());
}
private HardTimeoutContext resolveHardTimeoutContext(long now) {
@@ -794,18 +837,20 @@ private HardTimeoutContext resolveHardTimeoutContext(long now) {
List routingPeers = List.of(routingPeersArray);
List offeredPeers = List.of(offeredPeersArray);
boolean handleWithoutPeers = routingPeers.isEmpty() && offeredPeers.isEmpty();
+ boolean firstPeerForce = false;
if (!handleWithoutPeers) {
- if (!markHardTimeoutPeersTriggered()) return null;
+ firstPeerForce = markPeerHardTimeoutSeen();
} else if (!shouldHandleHardTimeoutWithoutPeers()) {
return null;
}
- return new HardTimeoutContext(continueAge, routingPeers, offeredPeers, handleWithoutPeers);
+ return new HardTimeoutContext(
+ continueAge, routingPeers, offeredPeers, handleWithoutPeers, firstPeerForce);
}
- private boolean markHardTimeoutPeersTriggered() {
+ private boolean markPeerHardTimeoutSeen() {
synchronized (this) {
- if (hardTimeoutPeersTriggered) return false;
- hardTimeoutPeersTriggered = true;
+ if (hardTimeoutPeersLogged) return false;
+ hardTimeoutPeersLogged = true;
return true;
}
}
@@ -823,7 +868,10 @@ private void markHardTimeoutWithoutPeersTriggered() {
}
private void logAndForcePeerTimeout(
- long continueAge, List routingPeers, List offeredPeers) {
+ long continueAge,
+ List routingPeers,
+ List offeredPeers,
+ boolean firstPeerForce) {
String routingSummary = formatPeers(routingPeers);
String offeredSummary = formatPeers(offeredPeers);
String elapsed = TimeUtil.formatTime(continueAge);
@@ -831,17 +879,36 @@ private void logAndForcePeerTimeout(
"timeoutHard",
this,
() -> "elapsed=" + elapsed + " routing=" + routingSummary + " offered=" + offeredSummary);
- LOG.warn(
- "Hard timeout after {} for {}. Forcing fatal timeout: routing={} offered={}",
- elapsed,
- this,
- routingSummary,
- offeredSummary);
+ if (firstPeerForce) {
+ LOG.warn(
+ "Hard timeout after {} for {}. Forcing fatal timeout: routing={} offered={}",
+ elapsed,
+ this,
+ routingSummary,
+ offeredSummary);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Hard-timeout retry after {} for {}. Re-forcing fatal timeout: routing={} offered={}",
+ elapsed,
+ this,
+ routingSummary,
+ offeredSummary);
+ }
for (PeerNode peer : routingPeers) {
- peer.fatalTimeout(this, false);
+ forceFatalTimeout(peer, false);
}
for (PeerNode peer : offeredPeers) {
- peer.fatalTimeout(this, true);
+ forceFatalTimeout(peer, true);
+ }
+ }
+
+ @SuppressWarnings("java:S1181")
+ private void forceFatalTimeout(PeerNode peer, boolean offeredKey) {
+ try {
+ peer.fatalTimeout(this, offeredKey);
+ } catch (Throwable t) {
+ LOG.error(
+ "Failed forcing fatal timeout on {} for {} offeredKey={}", peer, this, offeredKey, t);
}
}
@@ -894,7 +961,8 @@ private record HardTimeoutContext(
long continueAge,
List routingPeers,
List offeredPeers,
- boolean handleWithoutPeers) {}
+ boolean handleWithoutPeers,
+ boolean firstPeerForce) {}
private static String formatPeers(List peers) {
if (peers.isEmpty()) return "none";
diff --git a/src/main/java/network/crypta/node/UIDTraceLogger.java b/src/main/java/network/crypta/node/UIDTraceLogger.java
index 4a862c37917..47a92dcb062 100644
--- a/src/main/java/network/crypta/node/UIDTraceLogger.java
+++ b/src/main/java/network/crypta/node/UIDTraceLogger.java
@@ -21,8 +21,10 @@ static void log(String event, UIDTag tag, Supplier detailsSupplier) {
} else {
details = " " + details;
}
+ String externalIdentifier = tag.getExternalRequestIdentifier();
LOG.info(
- "event={} uid={} tag={} local={} originLocal={} rt={} ssk={} insert={} offer={}{}",
+ "event={} uid={} tag={} local={} originLocal={} rt={} ssk={} insert={} offer={}"
+ + " external={}{}",
event,
tag.uid,
tag.getClass().getSimpleName(),
@@ -32,6 +34,7 @@ static void log(String event, UIDTag tag, Supplier detailsSupplier) {
tag.isSSK(),
tag.isInsert(),
tag.isOfferReply(),
+ externalIdentifier == null ? "n/a" : externalIdentifier,
details);
}
}
diff --git a/src/test/java/network/crypta/client/async/SplitFileInserterSenderTest.java b/src/test/java/network/crypta/client/async/SplitFileInserterSenderTest.java
index ad5fe78e551..21140e0bcf6 100644
--- a/src/test/java/network/crypta/client/async/SplitFileInserterSenderTest.java
+++ b/src/test/java/network/crypta/client/async/SplitFileInserterSenderTest.java
@@ -379,7 +379,8 @@ void send_whenRemote_realPutAndSuccess() throws Exception {
true,
Node.PREFER_INSERT_DEFAULT,
Node.IGNORE_LOW_BACKOFF_DEFAULT,
- false);
+ false,
+ null);
verify(segment, times(1)).onInsertedBlock(blockNo, clientKey);
}
diff --git a/src/test/java/network/crypta/clients/fcp/ClientPutMessageTest.java b/src/test/java/network/crypta/clients/fcp/ClientPutMessageTest.java
index 8de43b31172..7c81395d599 100644
--- a/src/test/java/network/crypta/clients/fcp/ClientPutMessageTest.java
+++ b/src/test/java/network/crypta/clients/fcp/ClientPutMessageTest.java
@@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -206,6 +207,37 @@ void dataLength_diskUpload_returnsNegativeOne() throws Exception {
message.freeData();
}
+ @Test
+ void constructor_diskBareChkWithoutExplicitTarget_doesNotInferTargetFilename() throws Exception {
+ Path file = Files.createTempFile(tempDir, "changelog-short", ".md");
+ Files.writeString(file, "payload");
+
+ ClientPutMessage message = newDiskMessage(file);
+
+ assertEquals(ClientPutBase.UploadFrom.DISK, message.uploadFromType);
+ assertEquals(file.toFile(), message.origFilename);
+ assertEquals(-1L, message.dataLength());
+ assertNull(message.targetFilename);
+ message.freeData();
+ }
+
+ @Test
+ void constructor_diskSskWithoutExplicitTarget_infersFilenameForDocName() throws Exception {
+ Path file = Files.createTempFile(tempDir, "client-put", ".txt");
+ Files.writeString(file, "payload");
+ SimpleFieldSet fs = new SimpleFieldSet(true);
+ fs.putSingle("Identifier", "put-ssk-disk");
+ fs.putSingle("URI", "SSK@");
+ fs.put("Verbosity", 0);
+ fs.putSingle("UploadFrom", "disk");
+ fs.putSingle("Filename", file.toString());
+
+ ClientPutMessage message = new ClientPutMessage(fs);
+
+ assertEquals(file.getFileName().toString(), message.targetFilename);
+ message.freeData();
+ }
+
@Test
void freeData_whenBucketPresent_invokesBucketFree() throws MessageInvalidException {
ClientPutMessage message = newDirectMessage("put-free", 5L);
diff --git a/src/test/java/network/crypta/clients/fcp/ClientPutMimeResolverTest.java b/src/test/java/network/crypta/clients/fcp/ClientPutMimeResolverTest.java
index d46b443807c..267de2a0921 100644
--- a/src/test/java/network/crypta/clients/fcp/ClientPutMimeResolverTest.java
+++ b/src/test/java/network/crypta/clients/fcp/ClientPutMimeResolverTest.java
@@ -82,6 +82,51 @@ void resolve_whenBadMime_throwsMessageInvalid() throws Exception {
assertEquals(ProtocolErrorMessage.BAD_MIME_TYPE, error.protocolCode);
}
+ @Test
+ void resolve_whenDiskBareChkWithoutTargetFilename_stripsExplicitMime() throws Exception {
+ File file = createFile("disk-upload.txt");
+ ClientPutMessage message = buildDiskMessage(file, "text/plain", null);
+
+ String mime = ClientPutMimeResolver.resolve(message, file, null, false, "id", false);
+
+ assertNull(mime);
+ }
+
+ @Test
+ void resolve_whenDiskBareChkWithoutTargetFilenameAndBadMime_throwsMessageInvalid()
+ throws Exception {
+ File file = createFile("disk-upload.dat");
+ ClientPutMessage message = buildDiskMessage(file, "not a mime", null);
+
+ MessageInvalidException error =
+ assertThrows(
+ MessageInvalidException.class,
+ () -> ClientPutMimeResolver.resolve(message, file, null, false, "id", false));
+
+ assertEquals(ProtocolErrorMessage.BAD_MIME_TYPE, error.protocolCode);
+ }
+
+ @Test
+ void resolve_whenDiskBareChkWithoutTargetFilename_suppressesGuessedMime() throws Exception {
+ File file = createFile("disk-upload.html");
+ ClientPutMessage message = buildDiskMessage(file, null, null);
+
+ String mime = ClientPutMimeResolver.resolve(message, file, null, false, "id", false);
+
+ assertNull(mime);
+ }
+
+ @Test
+ void resolve_whenDiskBareChkWithTargetFilename_keepsMime() throws Exception {
+ File file = createFile("disk-upload.txt");
+ ClientPutMessage message = buildDiskMessage(file, "text/plain", "keep-mime.txt");
+
+ String mime =
+ ClientPutMimeResolver.resolve(message, file, message.targetFilename, false, "id", false);
+
+ assertEquals("text/plain", mime);
+ }
+
private File createFile(String name) throws Exception {
File file = tempDir.resolve(name).toFile();
assertTrue(file.createNewFile());
@@ -104,4 +149,20 @@ private ClientPutMessage buildMessage(String contentType, boolean binaryBlob)
}
return new ClientPutMessage(fields);
}
+
+ private ClientPutMessage buildDiskMessage(File file, String contentType, String targetFilename)
+ throws MessageInvalidException {
+ SimpleFieldSet fields = new SimpleFieldSet(true);
+ fields.putSingle("Identifier", "disk-request");
+ fields.putSingle("URI", "CHK@");
+ fields.putSingle("UploadFrom", "disk");
+ fields.putSingle("Filename", file.getAbsolutePath());
+ if (contentType != null) {
+ fields.putSingle("Metadata.ContentType", contentType);
+ }
+ if (targetFilename != null) {
+ fields.putSingle("TargetFilename", targetFilename);
+ }
+ return new ClientPutMessage(fields);
+ }
}
diff --git a/src/test/java/network/crypta/clients/fcp/ClientPutTest.java b/src/test/java/network/crypta/clients/fcp/ClientPutTest.java
index 452881a3207..3bb9c6edba6 100644
--- a/src/test/java/network/crypta/clients/fcp/ClientPutTest.java
+++ b/src/test/java/network/crypta/clients/fcp/ClientPutTest.java
@@ -195,6 +195,17 @@ void innerResume_whenBucketPresent_delegatesToBucket() throws Exception {
verify(bucket).onResume(context);
}
+ @Test
+ void innerResume_whenPutterPresent_reappliesDiagnosticIdentifier() throws Exception {
+ ClientPutter putter = mock(ClientPutter.class);
+ setField(ClientPut.class, clientPut, "putter", putter);
+ ClientContext context = mock(ClientContext.class);
+
+ clientPut.innerResume(context);
+
+ verify(putter).setExternalRequestIdentifier("fcp:test-id");
+ }
+
@Test
void innerResume_whenBucketThrows_propagatesException() throws Exception {
RandomAccessBucket bucket = mock(RandomAccessBucket.class);
diff --git a/src/test/java/network/crypta/node/CHKInsertHandlerTest.java b/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
index c97e1581a3a..55afde33054 100644
--- a/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
+++ b/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
@@ -19,10 +19,12 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
@@ -87,7 +89,9 @@ void run_whenNoDataInsert_expectTimeoutAndCompletionNotices() throws Exception {
when(usm.waitFor(any(MessageFilter.class), any(ByteCounter.class))).thenReturn(null);
// Peer interactions: allow sending without throwing
- doNothing().when(transport).sendSync(any(Message.class), any(ByteCounter.class), anyBoolean());
+ doNothing()
+ .when(transport)
+ .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean(), anyLong(), anyLong());
when(transport.sendAsync(any(Message.class), any(), any(ByteCounter.class)))
.thenAnswer(_ -> null);
@@ -107,7 +111,13 @@ void run_whenNoDataInsert_expectTimeoutAndCompletionNotices() throws Exception {
handler.run();
// Assert: 1) First, an FNPAccepted is sent synchronously
- verify(transport, times(1)).sendSync(messageCaptor.capture(), eq(handler), eq(false));
+ verify(transport, times(1))
+ .sendSync(
+ messageCaptor.capture(),
+ eq(handler),
+ eq(false),
+ eq(SECONDS.toMillis(15)),
+ eq(SECONDS.toMillis(2)));
Message accepted = messageCaptor.getValue();
assertEquals(DMT.FNPAccepted, accepted.getSpec(), "Expected FNPAccepted as first send");
assertEquals(uid, accepted.getLong(DMT.UID));
@@ -155,7 +165,9 @@ void run_whenDataInsertRejected_propagatesRejectionReason() throws Exception {
when(usm.waitFor(any(MessageFilter.class), any(ByteCounter.class))).thenReturn(rejected);
- doNothing().when(transport).sendSync(any(Message.class), any(ByteCounter.class), anyBoolean());
+ doNothing()
+ .when(transport)
+ .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean(), anyLong(), anyLong());
when(transport.sendAsync(any(Message.class), any(), any(ByteCounter.class)))
.thenAnswer(_ -> null);
@@ -174,7 +186,13 @@ void run_whenDataInsertRejected_propagatesRejectionReason() throws Exception {
handler.run();
// Assert: first, FNPAccepted; then echo FNPDataInsertRejected with the same reason
- verify(transport, times(1)).sendSync(any(Message.class), eq(handler), eq(true));
+ verify(transport, times(1))
+ .sendSync(
+ any(Message.class),
+ eq(handler),
+ eq(true),
+ eq(SECONDS.toMillis(15)),
+ eq(SECONDS.toMillis(2)));
verify(transport, times(1)).sendAsync(messageCaptor.capture(), any(), eq(handler));
Message echoed = messageCaptor.getValue();
diff --git a/src/test/java/network/crypta/node/NodeClientCoreTransfersTest.java b/src/test/java/network/crypta/node/NodeClientCoreTransfersTest.java
index bf41fd904b7..855df7bc576 100644
--- a/src/test/java/network/crypta/node/NodeClientCoreTransfersTest.java
+++ b/src/test/java/network/crypta/node/NodeClientCoreTransfersTest.java
@@ -238,26 +238,28 @@ void realGetKey_whenSskBlockInStore_expectReturnsClientBlock() throws Exception
void realPut_whenChkBlock_expectDelegatesToRealPutChk() throws Exception {
NodeClientCoreTransfers spy = spy(transfers);
CHKBlock block = mock(CHKBlock.class);
- doNothing().when(spy).realPutCHK(block, true, false, true, false, true);
+ doNothing().when(spy).realPutCHK(block, true, false, true, false, true, null);
spy.realPut(block, true, false, true, false, true);
- verify(spy).realPutCHK(block, true, false, true, false, true);
+ verify(spy).realPutCHK(block, true, false, true, false, true, null);
verify(spy, never())
- .realPutSSK(any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean());
+ .realPutSSK(
+ any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean(), any());
}
@Test
void realPut_whenSskBlock_expectDelegatesToRealPutSsk() throws Exception {
NodeClientCoreTransfers spy = spy(transfers);
SSKBlock block = mock(SSKBlock.class);
- doNothing().when(spy).realPutSSK(block, true, false, false, false, false);
+ doNothing().when(spy).realPutSSK(block, true, false, false, false, false, null);
spy.realPut(block, true, false, false, false, false);
- verify(spy).realPutSSK(block, true, false, false, false, false);
+ verify(spy).realPutSSK(block, true, false, false, false, false, null);
verify(spy, never())
- .realPutCHK(any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean());
+ .realPutCHK(
+ any(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean(), any());
}
@Test
diff --git a/src/test/java/network/crypta/node/RequestTrackerTest.java b/src/test/java/network/crypta/node/RequestTrackerTest.java
index 209d100c934..60f9f026933 100644
--- a/src/test/java/network/crypta/node/RequestTrackerTest.java
+++ b/src/test/java/network/crypta/node/RequestTrackerTest.java
@@ -16,6 +16,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -282,4 +283,19 @@ void addRunningUIDs_and_getTotalRunningUIDsAlt_expectConsistency() {
// Assert: cardinality matches alternative counter
assertEquals(tracker.getTotalRunningUIDsAlt(), ids.size());
}
+
+ @Test
+ void findTagByUid_whenTagExists_expectTagReturned() {
+ RequestTag tag = new RequestTag(false, START.LOCAL, null, false, 8_001L, node);
+ assertTrue(tracker.lockUID(tag));
+
+ UIDTag found = tracker.findTagByUid(8_001L);
+
+ assertSame(tag, found);
+ }
+
+ @Test
+ void findTagByUid_whenMissing_expectNull() {
+ assertNull(tracker.findTagByUid(9_001L));
+ }
}
diff --git a/src/test/java/network/crypta/node/SendableGetRequestSenderTest.java b/src/test/java/network/crypta/node/SendableGetRequestSenderTest.java
index 7bec492e6ae..fbd3bbf0f6e 100644
--- a/src/test/java/network/crypta/node/SendableGetRequestSenderTest.java
+++ b/src/test/java/network/crypta/node/SendableGetRequestSenderTest.java
@@ -17,6 +17,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
@@ -163,7 +164,7 @@ void send_whenAsyncGetSucceeds_invokesFetchSuccessCallback() {
return null;
})
.when(transfers)
- .asyncGet(eq(nodeKey), any(RequestCompletionListener.class), eq(options));
+ .asyncGet(eq(nodeKey), any(RequestCompletionListener.class), eq(options), isNull());
// Act
boolean result = sender.send(core, scheduler, context, req);
@@ -196,7 +197,7 @@ void send_whenAsyncGetFails_invokesFailureCallbackWithSameException() {
return null;
})
.when(transfers)
- .asyncGet(eq(nodeKey), any(RequestCompletionListener.class), eq(options));
+ .asyncGet(eq(nodeKey), any(RequestCompletionListener.class), eq(options), isNull());
// Act
boolean result = sender.send(core, scheduler, context, req);
@@ -273,7 +274,7 @@ void send_whenCoreAsyncGetThrows_reportsInternalErrorAndReturnsTrue() {
NodeRoutingSubsystem.RequestSenderOptions.of(false, false, false, true, false, false);
doThrow(new RuntimeException("asyncGet blew up"))
.when(transfers)
- .asyncGet(eq(nodeKey), any(RequestCompletionListener.class), eq(options));
+ .asyncGet(eq(nodeKey), any(RequestCompletionListener.class), eq(options), isNull());
// Act
boolean result = sender.send(core, scheduler, context, req);
diff --git a/src/test/java/network/crypta/node/SimpleSendableInsertTest.java b/src/test/java/network/crypta/node/SimpleSendableInsertTest.java
index 040acf06143..ea73ed34ed2 100644
--- a/src/test/java/network/crypta/node/SimpleSendableInsertTest.java
+++ b/src/test/java/network/crypta/node/SimpleSendableInsertTest.java
@@ -24,6 +24,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -95,7 +96,8 @@ void getSender_send_whenSuccess_callsRealPut_onSuccess_andRemovesRunningInsert()
Node.FORK_ON_CACHEABLE_DEFAULT,
Node.PREFER_INSERT_DEFAULT,
Node.IGNORE_LOW_BACKOFF_DEFAULT,
- false);
+ false,
+ null);
// Build a token and chosen block
SendableRequestItemKey tokenKey = new SendableRequestItemKey() {};
@@ -139,7 +141,8 @@ public SendableRequestItemKey getKey() {
Node.FORK_ON_CACHEABLE_DEFAULT,
Node.PREFER_INSERT_DEFAULT,
Node.IGNORE_LOW_BACKOFF_DEFAULT,
- false);
+ false,
+ null);
verify(insert, times(1)).onSuccess(token, null, clientContext);
verify(requestScheduler, times(1)).removeRunningInsert(insert, tokenKey);
assertTrue(insert.isCancelled());
@@ -166,7 +169,8 @@ void getSender_send_whenRealPutThrows_callsOnFailure_setsFinished_andSkipsRemove
eq(Node.FORK_ON_CACHEABLE_DEFAULT),
eq(Node.PREFER_INSERT_DEFAULT),
eq(Node.IGNORE_LOW_BACKOFF_DEFAULT),
- eq(false));
+ eq(false),
+ isNull());
SendableRequestItemKey tokenKey = new SendableRequestItemKey() {};
SendableRequestItem token =
diff --git a/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java b/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java
index 503c05f3dd8..8c3700d5a8a 100644
--- a/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java
+++ b/src/test/java/network/crypta/node/UIDTagHardTimeoutTest.java
@@ -6,6 +6,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -29,7 +30,7 @@ void setup() {
}
@Test
- void hardTimeout_whenTimedOutContinueAndHandlerUnlocked_triggersFatalTimeoutOnce() {
+ void hardTimeout_whenTimedOutContinueAndHandlerUnlocked_retriesFatalTimeoutOnLaterChecks() {
TestUIDTag tag = new TestUIDTag(node, UID);
PeerNode peer = mock(PeerNode.class);
@@ -43,7 +44,7 @@ void hardTimeout_whenTimedOutContinueAndHandlerUnlocked_triggersFatalTimeoutOnce
verify(peer, times(1)).fatalTimeout(tag, false);
tag.maybeLogStillPresent(now + RequestTracker.TIMEOUT, UID);
- verify(peer, times(1)).fatalTimeout(tag, false);
+ verify(peer, times(2)).fatalTimeout(tag, false);
}
@Test
@@ -61,6 +62,26 @@ void hardTimeout_beforeGraceWindow_doesNotTriggerFatalTimeout() {
verify(peer, never()).fatalTimeout(tag, false);
}
+ @Test
+ void hardTimeout_whenOnePeerThrows_stillForcesOtherPeers() {
+ TestUIDTag tag = new TestUIDTag(node, UID);
+ PeerNode failingPeer = mock(PeerNode.class);
+ PeerNode healthyPeer = mock(PeerNode.class);
+
+ tag.addRoutedTo(failingPeer, /* offeredKey= */ false);
+ tag.addRoutedTo(healthyPeer, /* offeredKey= */ false);
+ tag.timedOutToHandlerButContinued();
+ tag.unlockHandler();
+
+ doThrow(new RuntimeException("boom")).when(failingPeer).fatalTimeout(tag, false);
+
+ long now = System.currentTimeMillis() + RequestTracker.TIMEOUT + 1_000;
+ tag.maybeLogStillPresent(now, UID);
+
+ verify(failingPeer, times(1)).fatalTimeout(tag, false);
+ verify(healthyPeer, times(1)).fatalTimeout(tag, false);
+ }
+
private static final class TestUIDTag extends UIDTag {
private TestUIDTag(Node node, long uid) {
super(/* source= */ null, /* realTimeFlag= */ false, uid, node);
From 746a82e22cade9035ae745b90105e88e84897a62 Mon Sep 17 00:00:00 2001
From: Leumor <116955025+leumor@users.noreply.github.com>
Date: Thu, 26 Feb 2026 08:47:17 +0000
Subject: [PATCH 2/3] fix(core): harden interrupt and close guards
Preserve non-receive interrupts in CHKInsertHandler while stopping sender-status processing to avoid spin loops and interrupt leakage into synchronous sends.
Clear internal wake-up interrupts before completion and terminal sync sends so sendSync does not report false timeouts.
Replace success-flag cleanup with close-guard patterns when restoring RAF-backed resources, and add targeted tests for the CHKInsertHandler interrupt paths.
---
.../crypta/client/async/SplitFileFetcher.java | 32 ++-
.../crypt/EncryptedRandomAccessBuffer.java | 23 ++-
.../network/crypta/node/CHKInsertHandler.java | 92 +++++++--
.../crypta/node/PeerNodeTransport.java | 46 +++--
.../support/io/ReadOnlyFileSliceBucket.java | 52 ++---
.../client/async/SplitFileFetcherTest.java | 10 +-
.../crypta/node/CHKInsertHandlerTest.java | 189 ++++++++++++++++++
7 files changed, 362 insertions(+), 82 deletions(-)
diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcher.java b/src/main/java/network/crypta/client/async/SplitFileFetcher.java
index 470dcac6431..a59cf82294c 100644
--- a/src/main/java/network/crypta/client/async/SplitFileFetcher.java
+++ b/src/main/java/network/crypta/client/async/SplitFileFetcher.java
@@ -73,6 +73,27 @@ public final class SplitFileFetcher
/** Simple holder for completion-via-truncation configuration. */
private record TruncationConfig(FileGetCompletionCallback callback, File tempFile) {}
+ /**
+ * Ensures a restored RAF is closed if constructor resume fails before ownership is transferred to
+ * this fetcher instance.
+ */
+ private static final class ResumeRafCloseGuard implements AutoCloseable {
+ private LockableRandomAccessBuffer raf;
+
+ void track(LockableRandomAccessBuffer raf) {
+ this.raf = raf;
+ }
+
+ void release() {
+ raf = null;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(raf);
+ }
+ }
+
/**
* Stores the progress of the download, including the actual data, in a separate file. Created in
* onResume() or in the constructor, so must be volatile.
@@ -792,9 +813,8 @@ public boolean writeTrivialProgress(DataOutputStream dos) throws IOException {
public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext context)
throws StorageFormatException, ResumeFailedException, IOException {
LOG.info("Resuming splitfile download for {}", this);
- LockableRandomAccessBuffer restored = null;
- boolean success = false;
- try {
+ LockableRandomAccessBuffer restored;
+ try (ResumeRafCloseGuard closeGuard = new ResumeRafCloseGuard()) {
boolean completeViaTruncation = dis.readBoolean();
if (completeViaTruncation) {
fileCompleteViaTruncation = new File(dis.readUTF());
@@ -808,6 +828,7 @@ public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext
// Note: Could verify against finalLength to finish immediately if it matches.
restored =
new PooledFileRandomAccessBuffer(fileCompleteViaTruncation, false, rafSize, -1, true);
+ closeGuard.track(restored);
} else {
restored =
BucketTools.restoreRAFFrom(
@@ -817,6 +838,7 @@ public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext
context.getPersistentMasterSecret());
fileCompleteViaTruncation = null;
callbackCompleteViaTruncation = null;
+ closeGuard.track(restored);
}
this.raf = restored;
this.parent = getter;
@@ -830,9 +852,7 @@ public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext
// onResume() will do the rest.
LOG.info("Resumed splitfile download for {}", this);
lastNotifiedStoreFetch = System.currentTimeMillis();
- success = true;
- } finally {
- if (!success) IOUtils.closeQuietly(restored);
+ closeGuard.release();
}
}
diff --git a/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java b/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java
index a9422a57c58..ce53fb60d74 100755
--- a/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java
+++ b/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java
@@ -579,17 +579,30 @@ public static LockableRandomAccessBuffer create(
if (type == null) throw new StorageFormatException("Unknown EncryptedRandomAccessBufferType");
LockableRandomAccessBuffer underlying =
BucketTools.restoreRAFFrom(dis, fg, persistentFileTracker, masterKey);
- boolean success = false;
- try {
+ class UnderlyingCloseGuard implements AutoCloseable {
+ private LockableRandomAccessBuffer buffer;
+
+ private UnderlyingCloseGuard(LockableRandomAccessBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ private void release() {
+ buffer = null;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(buffer);
+ }
+ }
+ try (UnderlyingCloseGuard closeGuard = new UnderlyingCloseGuard(underlying)) {
EncryptedRandomAccessBuffer restored =
new EncryptedRandomAccessBuffer(type, underlying, masterKey, false);
- success = true;
+ closeGuard.release();
return restored;
} catch (GeneralSecurityException e) {
throw new ResumeFailedException(
new GeneralSecurityException("Crypto error resuming EncryptedRandomAccessBuffer", e));
- } finally {
- if (!success) IOUtils.closeQuietly(underlying);
}
}
diff --git a/src/main/java/network/crypta/node/CHKInsertHandler.java b/src/main/java/network/crypta/node/CHKInsertHandler.java
index 1cd28460709..bdb26c80f8e 100644
--- a/src/main/java/network/crypta/node/CHKInsertHandler.java
+++ b/src/main/java/network/crypta/node/CHKInsertHandler.java
@@ -272,7 +272,11 @@ private void setupForDataInsert(Message msg) {
private void processSenderStatuses() {
boolean receivedRejectedOverload = false;
while (true) {
- waitOnSender();
+ if (waitOnSender()) {
+ // A non-receive-failure interrupt (e.g. shutdown/cancellation) was preserved.
+ // Stop status processing to avoid spin loops and interrupt leakage into sync sends.
+ return;
+ }
if (receiveFailed()) {
finish(CHKInsertSender.RECEIVE_FAILED);
return;
@@ -302,7 +306,7 @@ private boolean forwardNonTerminalOverloadIfNeeded(boolean alreadyForwarded) {
return alreadyForwarded;
}
- private void waitOnSender() {
+ private boolean waitOnSender() {
final CHKInsertSender s = sender;
synchronized (s) {
long deadline = System.currentTimeMillis() + 5000L;
@@ -311,12 +315,18 @@ private void waitOnSender() {
try {
s.wait(remaining);
} catch (InterruptedException _) {
- // Interrupts here are wake-ups from receive-failure handling; consume and re-check.
- break;
+ // Interrupts here are usually internal wake-ups from receive-failure handling.
+ // Preserve non-internal interrupts to honor interruption policy (java:S2142).
+ if (!receiveFailed()) {
+ Thread.currentThread().interrupt();
+ return true;
+ }
+ return false;
}
remaining = deadline - System.currentTimeMillis();
}
}
+ return false;
}
private void handleTerminalStatus(int status) {
@@ -342,6 +352,7 @@ private void handleTerminalStatus(int status) {
*/
private void handleFatalOverload(int status) {
Message msg = DMT.createFNPRejectedOverload(uid, true);
+ clearInternalWakeupInterruptBeforeSyncSend("fatal overload");
try {
source.transport().sendSync(msg, this, realTimeFlag);
} catch (NotConnectedException _) {
@@ -359,6 +370,7 @@ private void handleFatalOverload(int status) {
/** Sends a route-not-found response including the final HTL, then finalizes the insert. */
private void handleRouteNotFound(int status) {
Message msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
+ clearInternalWakeupInterruptBeforeSyncSend("route-not-found");
try {
source.transport().sendSync(msg, this, realTimeFlag);
} catch (NotConnectedException _) {
@@ -582,7 +594,17 @@ private void waitForReceiveToComplete() {
wait(SECONDS.toMillis(100));
} catch (InterruptedException _) {
// Interrupts are used to wake this loop when receive-failure completion runs.
- // Reasserting here can cause immediate rethrows and prevent completion from taking lock.
+ Thread.currentThread().interrupt();
+ if (receiveCompleted) {
+ break;
+ }
+ // Clear before retrying the timed wait so we do not spin on immediate rethrow.
+ boolean interruptStatusCleared = Thread.interrupted();
+ if (!interruptStatusCleared && LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Interrupt status was already clear before retrying waitForReceiveToComplete on {}",
+ uid);
+ }
}
}
}
@@ -623,36 +645,53 @@ private Message awaitDownstreamAndBuildCompletionMsg(
private boolean waitUntilSenderCompletedWithin(long deadlineMillis) {
final CHKInsertSender s = sender;
- while (true) {
- synchronized (s) {
- if (s.completed()) return false;
- try {
- long remaining = deadlineMillis - System.currentTimeMillis();
- int t = (int) Math.clamp(remaining, 0L, Integer.MAX_VALUE);
- if (t > 0) s.wait(t);
- else return true; // took too long
- } catch (InterruptedException _) {
- // Interrupts here are used as wake-ups; consume and continue checking sender state.
+ boolean interrupted = false;
+ try {
+ while (true) {
+ synchronized (s) {
+ if (s.completed()) return false;
+ try {
+ long remaining = deadlineMillis - System.currentTimeMillis();
+ int t = (int) Math.clamp(remaining, 0L, Integer.MAX_VALUE);
+ if (t > 0) s.wait(t);
+ else return true; // took too long
+ } catch (InterruptedException _) {
+ // Interrupts here are used as wake-ups; consume and continue checking sender state.
+ interrupted = true;
+ }
}
}
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
private void waitUntilSenderCompletedNoTimeout() {
final CHKInsertSender s = sender;
- while (true) {
- synchronized (s) {
- if (s.completed()) return;
- try {
- s.wait(SECONDS.toMillis(10));
- } catch (InterruptedException _) {
- // Interrupts here are used as wake-ups; consume and continue checking sender state.
+ boolean interrupted = false;
+ try {
+ while (true) {
+ synchronized (s) {
+ if (s.completed()) return;
+ try {
+ s.wait(SECONDS.toMillis(10));
+ } catch (InterruptedException _) {
+ // Interrupts here are used as wake-ups; consume and continue checking sender state.
+ interrupted = true;
+ }
}
}
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
private void sendCompletion(Message m) {
+ clearInternalWakeupInterruptBeforeSyncSend("completion");
try {
// Keep completion synchronous so handler/sender byte totals include this message before we
// report aggregate insert costs.
@@ -674,6 +713,15 @@ private void sendCompletion(Message m) {
}
}
+ private void clearInternalWakeupInterruptBeforeSyncSend(String messageType) {
+ // Internal wake-up interrupts are used for receive-failure coordination; clear them before
+ // synchronous sending so transport wait logic does not misclassify healthy sends.
+ boolean interruptStatusCleared = Thread.interrupted();
+ if (interruptStatusCleared && LOG.isTraceEnabled()) {
+ LOG.trace("Cleared interrupt status before sending {} for {}", messageType, uid);
+ }
+ }
+
private void reportStatsIfNeeded(int code) {
if (code == CHKInsertSender.TIMED_OUT
|| code == CHKInsertSender.GENERATED_REJECTED_OVERLOAD
diff --git a/src/main/java/network/crypta/node/PeerNodeTransport.java b/src/main/java/network/crypta/node/PeerNodeTransport.java
index a2306599aef..4cf0212c930 100644
--- a/src/main/java/network/crypta/node/PeerNodeTransport.java
+++ b/src/main/java/network/crypta/node/PeerNodeTransport.java
@@ -219,32 +219,36 @@ public void sendSync(
long boundedUnqueueTimeout = Math.max(1L, unqueueWaitMillis);
cb.waitForSend(boundedSendTimeout);
if (!cb.done) {
- SendWaitDiagnostics diagnostics = buildSendWaitDiagnostics(item);
- LOG.warn(
- "Waited too long for a blocking send for {} to {} (sendWaitAgeMs={} uid={} external={}"
- + " uidAgeMs={})",
- req,
- peer.selfPeerNode(),
- diagnostics.sendWaitAgeMs(),
- diagnostics.uidForLog(),
- diagnostics.externalIdentifierForLog(),
- diagnostics.uidAgeMsForLog());
+ if (LOG.isWarnEnabled()) {
+ SendWaitDiagnostics diagnostics = buildSendWaitDiagnostics(item);
+ LOG.warn(
+ "Waited too long for a blocking send for {} to {} (sendWaitAgeMs={} uid={} external={}"
+ + " uidAgeMs={})",
+ req,
+ peer.selfPeerNode(),
+ diagnostics.sendWaitAgeMs(),
+ diagnostics.uidForLog(),
+ diagnostics.externalIdentifierForLog(),
+ diagnostics.uidAgeMsForLog());
+ }
peer.localRejectedOverload("SendSyncTimeout", realTime);
// Try to un-queue it, since it presumably won't be of any use now.
if (!peer.getMessageQueue().removeMessage(item)) {
cb.waitForSend(boundedUnqueueTimeout);
if (!cb.done) {
- SendWaitDiagnostics secondDiagnostics = buildSendWaitDiagnostics(item);
- LOG.error(
- "Waited too long for blocking send and then could not un-queue for {} to {}"
- + " (sendWaitAgeMs={} uid={} external={} uidAgeMs={})",
- req,
- peer.selfPeerNode(),
- secondDiagnostics.sendWaitAgeMs(),
- secondDiagnostics.uidForLog(),
- secondDiagnostics.externalIdentifierForLog(),
- secondDiagnostics.uidAgeMsForLog(),
- new Exception(STR_ERROR));
+ if (LOG.isErrorEnabled()) {
+ SendWaitDiagnostics secondDiagnostics = buildSendWaitDiagnostics(item);
+ LOG.error(
+ "Waited too long for blocking send and then could not un-queue for {} to {}"
+ + " (sendWaitAgeMs={} uid={} external={} uidAgeMs={})",
+ req,
+ peer.selfPeerNode(),
+ secondDiagnostics.sendWaitAgeMs(),
+ secondDiagnostics.uidForLog(),
+ secondDiagnostics.externalIdentifierForLog(),
+ secondDiagnostics.uidAgeMsForLog(),
+ new Exception(STR_ERROR));
+ }
// Can't cancel yet, can't send it, something seriously wrong.
// Treat as fatal timeout as probably their fault.
// Note: We have already waited more than the no-messages timeout; do not wait again.
diff --git a/src/main/java/network/crypta/support/io/ReadOnlyFileSliceBucket.java b/src/main/java/network/crypta/support/io/ReadOnlyFileSliceBucket.java
index cd28a1fe530..552ce242229 100644
--- a/src/main/java/network/crypta/support/io/ReadOnlyFileSliceBucket.java
+++ b/src/main/java/network/crypta/support/io/ReadOnlyFileSliceBucket.java
@@ -136,6 +136,16 @@ public void setReadOnly() {
// Intentional no-op.
}
+ private static IOException closeOnInputStreamInitFailure(
+ RandomAccessFile randomAccessFile, IOException failure) {
+ try {
+ randomAccessFile.close();
+ } catch (IOException closeFailure) {
+ failure.addSuppressed(closeFailure);
+ }
+ return failure;
+ }
+
private final class MyInputStream extends InputStream {
private final RandomAccessFile f;
@@ -151,33 +161,29 @@ private final class MyInputStream extends InputStream {
* @throws IOException on other I/O errors.
*/
MyInputStream() throws IOException {
+ RandomAccessFile randomAccessFile;
try {
- RandomAccessFile raf = new RandomAccessFile(file, "r");
- try {
- raf.seek(startAt);
- long fileLength = raf.length();
- if (fileLength < (startAt + length))
- throw new ReadOnlyFileSliceBucketException(
- "File truncated? Length "
- + fileLength
- + " but start at "
- + startAt
- + " for "
- + length
- + " bytes");
- this.f = raf;
- } catch (IOException e) {
- try {
- raf.close();
- } catch (IOException closeFailure) {
- e.addSuppressed(closeFailure);
- }
- throw e;
- }
- ptr = 0;
+ randomAccessFile = new RandomAccessFile(file, "r");
} catch (FileNotFoundException e) {
throw new ReadOnlyFileSliceBucketException(e);
}
+ try {
+ randomAccessFile.seek(startAt);
+ long fileLength = randomAccessFile.length();
+ if (fileLength < (startAt + length))
+ throw new ReadOnlyFileSliceBucketException(
+ "File truncated? Length "
+ + fileLength
+ + " but start at "
+ + startAt
+ + " for "
+ + length
+ + " bytes");
+ } catch (IOException e) {
+ throw closeOnInputStreamInitFailure(randomAccessFile, e);
+ }
+ this.f = randomAccessFile;
+ ptr = 0;
}
/** Read a single byte from the slice or {@code -1} at end of slice. */
diff --git a/src/test/java/network/crypta/client/async/SplitFileFetcherTest.java b/src/test/java/network/crypta/client/async/SplitFileFetcherTest.java
index ed0bde12966..1ff41cef0e0 100644
--- a/src/test/java/network/crypta/client/async/SplitFileFetcherTest.java
+++ b/src/test/java/network/crypta/client/async/SplitFileFetcherTest.java
@@ -90,7 +90,7 @@ private static void cleanupFetcherTempFile(SplitFileFetcher fetcher, File tmp) {
raf.free();
}
rafField.set(fetcher, null);
- } catch (Exception ignored) {
+ } catch (Exception _) {
// Best-effort cleanup to avoid leaking mapped files on Windows.
}
if (!tmp.delete() && tmp.exists()) {
@@ -123,7 +123,7 @@ private SplitFileFetcher newResumedFetcherWithTruncation(File file, long size, l
@Test
void constructor_resumeWithTruncation_missingFile_throwsResumeFailedException() {
DataInputStream dis; // assigned below
- // Build a proper stream that points to a missing path so resume fails deterministically.
+ // Build a proper stream that points to a missing path, so resume fails deterministically.
File nonExistent = new File("/nonexistent-xyz-should-not-exist" + System.nanoTime());
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (DataOutputStream dos = new DataOutputStream(bos)) {
@@ -200,7 +200,7 @@ void onFetchedBlock_whenCounterReachesThreshold_expectNotifyFalse() throws Excep
setField(f, "context", clientContext);
setField(f, "getter", sendableGetter);
when(sendableGetter.hasQueued()).thenReturn(false);
- // Set counter to threshold so branch triggers
+ // Set counter to the threshold so branch triggers
setField(f, "storeFetchCounter", SplitFileFetcher.STORE_NOTIFY_BLOCKS);
f.onFetchedBlock();
@@ -435,7 +435,7 @@ void writeTrivialProgress_whenDone_returnsFalseAndWritesFalse() throws Exception
boolean ret = f.writeTrivialProgress(dos);
assertFalse(ret);
- // First byte should be boolean false
+ // The first byte should be boolean false
byte[] data = bos.toByteArray();
assertEquals(0, data[0]);
}
@@ -479,7 +479,7 @@ void writeTrivialProgress_withoutTruncation_callsStoreToOnRAF() throws Exception
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- // Also need a token write at the end; default token is 0 from the test ctor
+ // Also need a token writing at the end; the default token is 0 from the test ctor
boolean ret = f.writeTrivialProgress(dos);
assertTrue(ret);
diff --git a/src/test/java/network/crypta/node/CHKInsertHandlerTest.java b/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
index 55afde33054..40cd792c839 100644
--- a/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
+++ b/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
@@ -1,5 +1,8 @@
package network.crypta.node;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
import network.crypta.io.comm.ByteCounter;
import network.crypta.io.comm.DMT;
import network.crypta.io.comm.Message;
@@ -21,11 +24,13 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -65,6 +70,35 @@ private static CHKInsertHandler newHandler(
return new CHKInsertHandler(key, htl, context);
}
+ private static void setPrivateField(Object target, String fieldName, Object value)
+ throws Exception {
+ Field field = CHKInsertHandler.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private static void invokeWaitOnSender(CHKInsertHandler target) throws Exception {
+ Method method = CHKInsertHandler.class.getDeclaredMethod("waitOnSender");
+ method.setAccessible(true);
+ method.invoke(target);
+ }
+
+ private static void invokePrivateMethod(Object target, String methodName, int arg)
+ throws Exception {
+ Method method = CHKInsertHandler.class.getDeclaredMethod(methodName, int.class);
+ method.setAccessible(true);
+ method.invoke(target, arg);
+ }
+
+ private static void clearInterruptForTest() {
+ boolean wasInterrupted = Thread.interrupted();
+ if (wasInterrupted) {
+ assertFalse(
+ Thread.currentThread().isInterrupted(),
+ "interrupt flag should be clear after test cleanup");
+ }
+ }
+
@BeforeEach
void setUp() {
when(node.network().usm()).thenReturn(usm);
@@ -203,6 +237,161 @@ void run_whenDataInsertRejected_propagatesRejectionReason() throws Exception {
verify(tag, times(1)).unlockHandler();
}
+ @Test
+ @DisplayName(
+ "waitOnSender_whenInterruptedForReceiveFailure_expectNoInterruptLeakIntoCompletionSend")
+ void waitOnSender_whenInterruptedForReceiveFailure_expectNoInterruptLeakIntoCompletionSend()
+ throws Exception {
+ // Arrange
+ long uid = 8080L;
+ CHKInsertHandler handler =
+ newHandler(
+ node,
+ source,
+ tag,
+ nodeCHK,
+ (short) 5,
+ uid,
+ /* startTime= */ beyondHandshakeWindow(),
+ /* realTime= */ false);
+
+ CHKInsertSender downstreamSender = org.mockito.Mockito.mock(CHKInsertSender.class);
+ AtomicBoolean waitLoopEntered = new AtomicBoolean();
+ when(downstreamSender.getStatus())
+ .thenAnswer(
+ _ -> {
+ waitLoopEntered.set(true);
+ return CHKInsertSender.NOT_FINISHED;
+ });
+ when(downstreamSender.completed()).thenReturn(true);
+ when(downstreamSender.anyTransfersFailed()).thenReturn(false);
+ setPrivateField(handler, "sender", downstreamSender);
+
+ doAnswer(
+ invocation -> {
+ assertFalse(
+ Thread.currentThread().isInterrupted(),
+ "waitOnSender interrupt leaked into completion sendSync");
+ Message completion = invocation.getArgument(0);
+ assertEquals(DMT.FNPInsertTransfersCompleted, completion.getSpec());
+ assertEquals(uid, completion.getLong(DMT.UID));
+ assertFalse(completion.getBoolean(DMT.ANY_TIMED_OUT));
+ return null;
+ })
+ .when(transport)
+ .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean(), anyLong(), anyLong());
+
+ // Act
+ try {
+ Thread.currentThread().interrupt();
+ invokeWaitOnSender(handler);
+ assertTrue(waitLoopEntered.get(), "waitOnSender should observe sender status");
+ invokePrivateMethod(handler, "finish", CHKInsertSender.RECEIVE_FAILED);
+ } finally {
+ clearInterruptForTest();
+ }
+
+ // Assert
+ verify(transport, times(1))
+ .sendSync(
+ any(Message.class),
+ eq(handler),
+ eq(false),
+ eq(SECONDS.toMillis(15)),
+ eq(SECONDS.toMillis(2)));
+ }
+
+ @Test
+ @DisplayName("handleFatalOverload_whenInterrupted_expectSyncSendWithoutInterruptLeak")
+ void handleFatalOverload_whenInterrupted_expectSyncSendWithoutInterruptLeak() throws Exception {
+ // Arrange
+ long uid = 8081L;
+ CHKInsertHandler handler =
+ newHandler(
+ node,
+ source,
+ tag,
+ nodeCHK,
+ (short) 5,
+ uid,
+ /* startTime= */ beyondHandshakeWindow(),
+ /* realTime= */ false);
+
+ doAnswer(
+ invocation -> {
+ assertFalse(
+ Thread.currentThread().isInterrupted(),
+ "interrupt leaked into fatal-overload sendSync");
+ Message sent = invocation.getArgument(0);
+ assertEquals(DMT.FNPRejectedOverload, sent.getSpec());
+ assertEquals(uid, sent.getLong(DMT.UID));
+ return null;
+ })
+ .when(transport)
+ .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean());
+
+ // Act
+ try {
+ Thread.currentThread().interrupt();
+ invokePrivateMethod(handler, "handleFatalOverload", CHKInsertSender.INTERNAL_ERROR);
+ } finally {
+ clearInterruptForTest();
+ }
+
+ // Assert
+ verify(transport, times(1)).sendSync(any(Message.class), eq(handler), eq(false));
+ }
+
+ @Test
+ @DisplayName("handleRouteNotFound_whenInterrupted_expectSyncSendWithoutInterruptLeak")
+ void handleRouteNotFound_whenInterrupted_expectSyncSendWithoutInterruptLeak() throws Exception {
+ // Arrange
+ long uid = 8082L;
+ short expectedHtl = 7;
+ CHKInsertHandler handler =
+ newHandler(
+ node,
+ source,
+ tag,
+ nodeCHK,
+ (short) 5,
+ uid,
+ /* startTime= */ beyondHandshakeWindow(),
+ /* realTime= */ false);
+
+ CHKInsertSender downstreamSender = org.mockito.Mockito.mock(CHKInsertSender.class);
+ when(downstreamSender.getHTL()).thenReturn(expectedHtl);
+ setPrivateField(handler, "sender", downstreamSender);
+ // Prevent finish() from issuing an additional completion sending; this isolates the
+ // route-not-found synchronous sending path.
+ setPrivateField(handler, "sentCompletion", true);
+
+ doAnswer(
+ invocation -> {
+ assertFalse(
+ Thread.currentThread().isInterrupted(),
+ "interrupt leaked into route-not-found sendSync");
+ Message sent = invocation.getArgument(0);
+ assertEquals(DMT.FNPRouteNotFound, sent.getSpec());
+ assertEquals(uid, sent.getLong(DMT.UID));
+ assertEquals(expectedHtl, sent.getShort(DMT.HTL));
+ return null;
+ })
+ .when(transport)
+ .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean());
+
+ // Act
+ try {
+ Thread.currentThread().interrupt();
+ invokePrivateMethod(handler, "handleRouteNotFound", CHKInsertSender.ROUTE_REALLY_NOT_FOUND);
+ } finally {
+ clearInterruptForTest();
+ }
+
+ // Assert
+ verify(transport, times(1)).sendSync(any(Message.class), eq(handler), eq(false));
+ }
+
@Test
@DisplayName("byteCounters_sentAndReceived_accumulateAndReport")
void byteCounters_sentAndReceived_accumulateAndReport() {
From 367143a77ecabc3cd96cac5b8c0c3d42274e66f4 Mon Sep 17 00:00:00 2001
From: Leumor <116955025+leumor@users.noreply.github.com>
Date: Thu, 26 Feb 2026 09:39:49 +0000
Subject: [PATCH 3/3] fix(node): finalize CHK inserts after interrupts
Keep CHKInsertHandler sender-status processing on non-receive interrupts so terminal handling still runs and the insert lifecycle is finalized instead of returning early.
Capture and restore interrupt status after terminal handling, and clear interrupt state before synchronous sends that can otherwise be misclassified as transport timeouts.
Update CHKInsertHandlerTest with a regression covering interrupted sender-status polling and terminal finalization.
---
.../network/crypta/node/CHKInsertHandler.java | 68 +++++++++++--------
.../crypta/node/CHKInsertHandlerTest.java | 51 +++++++-------
2 files changed, 65 insertions(+), 54 deletions(-)
diff --git a/src/main/java/network/crypta/node/CHKInsertHandler.java b/src/main/java/network/crypta/node/CHKInsertHandler.java
index bdb26c80f8e..21e5c3cf1ea 100644
--- a/src/main/java/network/crypta/node/CHKInsertHandler.java
+++ b/src/main/java/network/crypta/node/CHKInsertHandler.java
@@ -183,6 +183,7 @@ private void realRun() {
private boolean sendAccepted() {
// Consider inserting rate limiting here if the acceptance path requires backpressure.
Message accepted = DMT.createFNPAccepted(uid);
+ clearInternalWakeupInterruptBeforeSyncSend("accepted");
try {
// Preserve send ordering with a bounded wait to avoid pinning handler threads for long
// transport-level timeouts.
@@ -271,23 +272,38 @@ private void setupForDataInsert(Message msg) {
private void processSenderStatuses() {
boolean receivedRejectedOverload = false;
- while (true) {
- if (waitOnSender()) {
- // A non-receive-failure interrupt (e.g. shutdown/cancellation) was preserved.
- // Stop status processing to avoid spin loops and interrupt leakage into sync sends.
- return;
- }
- if (receiveFailed()) {
- finish(CHKInsertSender.RECEIVE_FAILED);
- return;
- }
+ // Clear and remember any pre-existing interrupt (e.g., stale executor/cancellation state) so
+ // it cannot leak into synchronous sending; restore on exit.
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ waitOnSender();
+ } catch (InterruptedException _) {
+ if (receiveFailed()) {
+ finish(CHKInsertSender.RECEIVE_FAILED);
+ return;
+ }
+ // Preserve external interruption only after terminal handling so it does not leak into
+ // synchronous sending and cause false transport timeouts.
+ interrupted = true;
+ }
+ if (receiveFailed()) {
+ finish(CHKInsertSender.RECEIVE_FAILED);
+ return;
+ }
- receivedRejectedOverload = forwardNonTerminalOverloadIfNeeded(receivedRejectedOverload);
+ receivedRejectedOverload = forwardNonTerminalOverloadIfNeeded(receivedRejectedOverload);
- int status = sender.getStatus();
- if (status != CHKInsertSender.NOT_FINISHED) {
- handleTerminalStatus(status);
- return;
+ int status = sender.getStatus();
+ if (status != CHKInsertSender.NOT_FINISHED) {
+ handleTerminalStatus(status);
+ return;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
}
}
@@ -306,27 +322,16 @@ private boolean forwardNonTerminalOverloadIfNeeded(boolean alreadyForwarded) {
return alreadyForwarded;
}
- private boolean waitOnSender() {
+ private void waitOnSender() throws InterruptedException {
final CHKInsertSender s = sender;
synchronized (s) {
long deadline = System.currentTimeMillis() + 5000L;
long remaining = deadline - System.currentTimeMillis();
while (s.getStatus() == CHKInsertSender.NOT_FINISHED && remaining > 0L) {
- try {
- s.wait(remaining);
- } catch (InterruptedException _) {
- // Interrupts here are usually internal wake-ups from receive-failure handling.
- // Preserve non-internal interrupts to honor interruption policy (java:S2142).
- if (!receiveFailed()) {
- Thread.currentThread().interrupt();
- return true;
- }
- return false;
- }
+ s.wait(remaining);
remaining = deadline - System.currentTimeMillis();
}
}
- return false;
}
private void handleTerminalStatus(int status) {
@@ -393,6 +398,7 @@ private void handleReceiveFailed() {
/** Sends a success reply upstream and finalizes the insert. */
private void handleSuccess(int status) {
Message msg = DMT.createFNPInsertReply(uid);
+ clearInternalWakeupInterruptBeforeSyncSend("success");
try {
source.transport().sendSync(msg, this, realTimeFlag);
} catch (NotConnectedException _) {
@@ -413,6 +419,7 @@ private void handleSuccess(int status) {
private void handleUnknownStatus() {
LOG.error("Unknown status code: {}", sender.getStatusString());
Message msg = DMT.createFNPRejectedOverload(uid, true);
+ clearInternalWakeupInterruptBeforeSyncSend("unknown-status overload");
try {
source.transport().sendSync(msg, this, realTimeFlag);
} catch (NotConnectedException | SyncSendWaitedTooLongException _) {
@@ -714,8 +721,9 @@ private void sendCompletion(Message m) {
}
private void clearInternalWakeupInterruptBeforeSyncSend(String messageType) {
- // Internal wake-up interrupts are used for receive-failure coordination; clear them before
- // synchronous sending so transport wait logic does not misclassify healthy sends.
+ // Internal receive-failure wake-ups, stale pooled-thread interrupt flags, or preserved
+ // cancellation interrupts can otherwise leak into synchronous send wait logic and produce false
+ // timeout handling.
boolean interruptStatusCleared = Thread.interrupted();
if (interruptStatusCleared && LOG.isTraceEnabled()) {
LOG.trace("Cleared interrupt status before sending {} for {}", messageType, uid);
diff --git a/src/test/java/network/crypta/node/CHKInsertHandlerTest.java b/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
index 40cd792c839..ef916221662 100644
--- a/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
+++ b/src/test/java/network/crypta/node/CHKInsertHandlerTest.java
@@ -2,7 +2,6 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.util.concurrent.atomic.AtomicBoolean;
import network.crypta.io.comm.ByteCounter;
import network.crypta.io.comm.DMT;
import network.crypta.io.comm.Message;
@@ -77,8 +76,8 @@ private static void setPrivateField(Object target, String fieldName, Object valu
field.set(target, value);
}
- private static void invokeWaitOnSender(CHKInsertHandler target) throws Exception {
- Method method = CHKInsertHandler.class.getDeclaredMethod("waitOnSender");
+ private static void invokeProcessSenderStatuses(CHKInsertHandler target) throws Exception {
+ Method method = CHKInsertHandler.class.getDeclaredMethod("processSenderStatuses");
method.setAccessible(true);
method.invoke(target);
}
@@ -239,8 +238,8 @@ void run_whenDataInsertRejected_propagatesRejectionReason() throws Exception {
@Test
@DisplayName(
- "waitOnSender_whenInterruptedForReceiveFailure_expectNoInterruptLeakIntoCompletionSend")
- void waitOnSender_whenInterruptedForReceiveFailure_expectNoInterruptLeakIntoCompletionSend()
+ "processSenderStatuses_whenInterrupted_expectTerminalFinalizationAndInterruptRestored")
+ void processSenderStatuses_whenInterrupted_expectTerminalFinalizationAndInterruptRestored()
throws Exception {
// Arrange
long uid = 8080L;
@@ -256,49 +255,53 @@ void waitOnSender_whenInterruptedForReceiveFailure_expectNoInterruptLeakIntoComp
/* realTime= */ false);
CHKInsertSender downstreamSender = org.mockito.Mockito.mock(CHKInsertSender.class);
- AtomicBoolean waitLoopEntered = new AtomicBoolean();
+ final boolean[] firstStatusCheck = {true};
when(downstreamSender.getStatus())
.thenAnswer(
_ -> {
- waitLoopEntered.set(true);
- return CHKInsertSender.NOT_FINISHED;
+ if (firstStatusCheck[0]) {
+ firstStatusCheck[0] = false;
+ // Simulate a non-receive-failure interrupt arriving while status polling is active.
+ Thread.currentThread().interrupt();
+ return CHKInsertSender.NOT_FINISHED;
+ }
+ return CHKInsertSender.INTERNAL_ERROR;
});
+ when(downstreamSender.receivedRejectedOverload()).thenReturn(false);
when(downstreamSender.completed()).thenReturn(true);
when(downstreamSender.anyTransfersFailed()).thenReturn(false);
setPrivateField(handler, "sender", downstreamSender);
+ // Isolate terminal status handling; this test verifies finalization reaches terminal reply
+ // sending instead of returning early from processSenderStatuses().
+ setPrivateField(handler, "sentCompletion", true);
doAnswer(
invocation -> {
assertFalse(
Thread.currentThread().isInterrupted(),
- "waitOnSender interrupt leaked into completion sendSync");
- Message completion = invocation.getArgument(0);
- assertEquals(DMT.FNPInsertTransfersCompleted, completion.getSpec());
- assertEquals(uid, completion.getLong(DMT.UID));
- assertFalse(completion.getBoolean(DMT.ANY_TIMED_OUT));
+ "interrupt leaked into terminal sendSync");
+ Message terminal = invocation.getArgument(0);
+ assertEquals(DMT.FNPRejectedOverload, terminal.getSpec());
+ assertEquals(uid, terminal.getLong(DMT.UID));
return null;
})
.when(transport)
- .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean(), anyLong(), anyLong());
+ .sendSync(any(Message.class), any(ByteCounter.class), anyBoolean());
// Act
try {
+ // Pre-set an interrupt to simulate stale executor thread state.
Thread.currentThread().interrupt();
- invokeWaitOnSender(handler);
- assertTrue(waitLoopEntered.get(), "waitOnSender should observe sender status");
- invokePrivateMethod(handler, "finish", CHKInsertSender.RECEIVE_FAILED);
+ invokeProcessSenderStatuses(handler);
+ assertTrue(
+ Thread.currentThread().isInterrupted(),
+ "interrupt should be restored after terminal status handling");
} finally {
clearInterruptForTest();
}
// Assert
- verify(transport, times(1))
- .sendSync(
- any(Message.class),
- eq(handler),
- eq(false),
- eq(SECONDS.toMillis(15)),
- eq(SECONDS.toMillis(2)));
+ verify(transport, times(1)).sendSync(any(Message.class), eq(handler), eq(false));
}
@Test