diff --git a/.gitignore b/.gitignore index 7beb9649eb..677cc857f2 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,9 @@ out/jreleaser build-logic/build/ build-logic/bin/ tmp* +/changelogfull.md +/changelogfull.txt + # WiX/jpackage debug artifacts (decompiled MSI and relink outputs) /Crypta-*.wxs /.vscode diff --git a/src/main/java/network/crypta/client/async/ChosenBlock.java b/src/main/java/network/crypta/client/async/ChosenBlock.java index 0fd966c7ab..60f961453d 100644 --- a/src/main/java/network/crypta/client/async/ChosenBlock.java +++ b/src/main/java/network/crypta/client/async/ChosenBlock.java @@ -243,6 +243,18 @@ private record ConstructionState( */ public abstract short getPriority(); + /** + * Returns an optional external identifier used for diagnostics correlation. + * + *
The default implementation returns {@code null}. Implementations that can map a chosen block + * to a client-visible request identifier should override this method. + * + * @return external identifier string, or {@code null} if unavailable + */ + public String getExternalRequestIdentifier() { + return null; + } + private boolean sendIsBlocking; /** diff --git a/src/main/java/network/crypta/client/async/ChosenBlockImpl.java b/src/main/java/network/crypta/client/async/ChosenBlockImpl.java index 3c9465c1a1..712e938ca0 100644 --- a/src/main/java/network/crypta/client/async/ChosenBlockImpl.java +++ b/src/main/java/network/crypta/client/async/ChosenBlockImpl.java @@ -243,6 +243,15 @@ public short getPriority() { return request.getPriorityClass(); } + @Override + public String getExternalRequestIdentifier() { + ClientRequester clientRequester = request.getClientRequest(); + if (clientRequester == null) { + return null; + } + return clientRequester.getExternalRequestIdentifier(); + } + /** * Obtains the sender capable of executing this block under the provided context. The returned * sender defines whether sending blocks and performs the actual network/storage interaction. diff --git a/src/main/java/network/crypta/client/async/ClientRequester.java b/src/main/java/network/crypta/client/async/ClientRequester.java index 04d8fabd4c..89b8f02141 100644 --- a/src/main/java/network/crypta/client/async/ClientRequester.java +++ b/src/main/java/network/crypta/client/async/ClientRequester.java @@ -89,6 +89,14 @@ public abstract void onTransition( */ protected transient RequestClient client; + /** + * Optional external identifier used to correlate scheduler activity with client-visible requests. + * + *
Typical values include an FCP request identifier prefix such as {@code fcp:...}. The value + * is intentionally transient so persistence format remains unchanged. + */ + private transient volatile String externalRequestIdentifier; + /** * Returns the current scheduling priority class for this request. * @@ -571,6 +579,35 @@ public RequestClient getClient() { return client; } + /** + * Assigns an external correlation identifier for diagnostics. + * + *
This does not affect routing or scheduling; it only enriches logs when low-level stalls are + * reported. + * + * @param externalRequestIdentifier external identifier string, or {@code null} to clear + */ + public final void setExternalRequestIdentifier(String externalRequestIdentifier) { + this.externalRequestIdentifier = normalizeExternalRequestIdentifier(externalRequestIdentifier); + } + + /** + * Returns the optional external correlation identifier used for diagnostics. + * + * @return external identifier, or {@code null} if none has been assigned + */ + public final String getExternalRequestIdentifier() { + return externalRequestIdentifier; + } + + private static String normalizeExternalRequestIdentifier(String externalRequestIdentifier) { + if (externalRequestIdentifier == null) { + return null; + } + String normalized = externalRequestIdentifier.trim(); + return normalized.isEmpty() ? null : normalized; + } + /** * Changes the scheduling priority class of this request and re-registers all sub-requests. * diff --git a/src/main/java/network/crypta/client/async/SingleBlockInserter.java b/src/main/java/network/crypta/client/async/SingleBlockInserter.java index 9d369eb57f..4b24a17732 100644 --- a/src/main/java/network/crypta/client/async/SingleBlockInserter.java +++ b/src/main/java/network/crypta/client/async/SingleBlockInserter.java @@ -758,7 +758,8 @@ private void putToStore(NodeClientCore core, ChosenBlock req, KeyBlock b, Client req.forkOnCacheable, Node.PREFER_INSERT_DEFAULT, Node.IGNORE_LOW_BACKOFF_DEFAULT, - req.realTimeFlag); + req.realTimeFlag, + req.getExternalRequestIdentifier()); } private boolean handleCollision( diff --git a/src/main/java/network/crypta/client/async/SplitFileFetcher.java b/src/main/java/network/crypta/client/async/SplitFileFetcher.java index 470dcac643..a59cf82294 100644 --- a/src/main/java/network/crypta/client/async/SplitFileFetcher.java +++ b/src/main/java/network/crypta/client/async/SplitFileFetcher.java @@ -73,6 +73,27 @@ public final class SplitFileFetcher /** Simple holder for completion-via-truncation configuration. */ private record TruncationConfig(FileGetCompletionCallback callback, File tempFile) {} + /** + * Ensures a restored RAF is closed if constructor resume fails before ownership is transferred to + * this fetcher instance. + */ + private static final class ResumeRafCloseGuard implements AutoCloseable { + private LockableRandomAccessBuffer raf; + + void track(LockableRandomAccessBuffer raf) { + this.raf = raf; + } + + void release() { + raf = null; + } + + @Override + public void close() { + IOUtils.closeQuietly(raf); + } + } + /** * Stores the progress of the download, including the actual data, in a separate file. Created in * onResume() or in the constructor, so must be volatile. @@ -792,9 +813,8 @@ public boolean writeTrivialProgress(DataOutputStream dos) throws IOException { public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext context) throws StorageFormatException, ResumeFailedException, IOException { LOG.info("Resuming splitfile download for {}", this); - LockableRandomAccessBuffer restored = null; - boolean success = false; - try { + LockableRandomAccessBuffer restored; + try (ResumeRafCloseGuard closeGuard = new ResumeRafCloseGuard()) { boolean completeViaTruncation = dis.readBoolean(); if (completeViaTruncation) { fileCompleteViaTruncation = new File(dis.readUTF()); @@ -808,6 +828,7 @@ public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext // Note: Could verify against finalLength to finish immediately if it matches. restored = new PooledFileRandomAccessBuffer(fileCompleteViaTruncation, false, rafSize, -1, true); + closeGuard.track(restored); } else { restored = BucketTools.restoreRAFFrom( @@ -817,6 +838,7 @@ public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext context.getPersistentMasterSecret()); fileCompleteViaTruncation = null; callbackCompleteViaTruncation = null; + closeGuard.track(restored); } this.raf = restored; this.parent = getter; @@ -830,9 +852,7 @@ public SplitFileFetcher(ClientGetter getter, DataInputStream dis, ClientContext // onResume() will do the rest. LOG.info("Resumed splitfile download for {}", this); lastNotifiedStoreFetch = System.currentTimeMillis(); - success = true; - } finally { - if (!success) IOUtils.closeQuietly(restored); + closeGuard.release(); } } diff --git a/src/main/java/network/crypta/client/async/SplitFileInserterSender.java b/src/main/java/network/crypta/client/async/SplitFileInserterSender.java index 8c0be0ff8a..1b93f5c765 100644 --- a/src/main/java/network/crypta/client/async/SplitFileInserterSender.java +++ b/src/main/java/network/crypta/client/async/SplitFileInserterSender.java @@ -259,7 +259,8 @@ public boolean send( request.forkOnCacheable, Node.PREFER_INSERT_DEFAULT, Node.IGNORE_LOW_BACKOFF_DEFAULT, - request.realTimeFlag); + request.realTimeFlag, + request.getExternalRequestIdentifier()); } request.onInsertSuccess(key, context); } catch (final LowLevelPutException e) { diff --git a/src/main/java/network/crypta/clients/fcp/ClientGet.java b/src/main/java/network/crypta/clients/fcp/ClientGet.java index f5281d530d..dfa704e7a4 100644 --- a/src/main/java/network/crypta/clients/fcp/ClientGet.java +++ b/src/main/java/network/crypta/clients/fcp/ClientGet.java @@ -270,6 +270,7 @@ record ClientGetSetup( this.extensionCheck = returnSetup.extension(); this.initialMetadata = setup.initialMetadata(); getter = makeGetter(setup.core(), returnSetup.bucket()); + applyDiagnosticIdentifier(getter); initHelpers(); } @@ -290,6 +291,7 @@ record ClientGetSetup( this.extensionCheck = returnSetup.extension(); this.initialMetadata = setup.initialMetadata(); getter = makeGetter(setup.core(), returnSetup.bucket()); + applyDiagnosticIdentifier(getter); initHelpers(); } @@ -1183,6 +1185,7 @@ public void getClientDetail(DataOutputStream dos, ChecksumChecker checker) throw restoredGetter = makeGetterForPersistence(makeBucket(false)); } getter = restoredGetter; + applyDiagnosticIdentifier(getter); initHelpers(); } diff --git a/src/main/java/network/crypta/clients/fcp/ClientPut.java b/src/main/java/network/crypta/clients/fcp/ClientPut.java index 8e68b7e4de..be023287db 100644 --- a/src/main/java/network/crypta/clients/fcp/ClientPut.java +++ b/src/main/java/network/crypta/clients/fcp/ClientPut.java @@ -191,6 +191,7 @@ public ClientPut( options.overrideSplitfileCryptoKey(), -1); putter = ClientPutPutterFactory.create(putterRequest, putterOptions); + applyDiagnosticIdentifier(putter); } /** @@ -291,6 +292,7 @@ public ClientPut(FCPConnectionHandler handler, ClientPutMessage message, FCPServ message.overrideSplitfileCryptoKey, message.metadataThreshold); putter = ClientPutPutterFactory.create(putterRequest, putterOptions); + applyDiagnosticIdentifier(putter); } /** @@ -839,6 +841,7 @@ RequestStatus getStatus() { */ @Override public void innerResume(ClientContext context) throws ResumeFailedException { + applyDiagnosticIdentifier(putter); if (data != null) data.onResume(context); } diff --git a/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java b/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java index 3c70202248..cfc8111596 100644 --- a/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java +++ b/src/main/java/network/crypta/clients/fcp/ClientPutMessage.java @@ -144,7 +144,7 @@ public ClientPutMessage(SimpleFieldSet fs) throws MessageInvalidException { fileHash = fs.get(ClientPutBase.FILE_HASH); - UploadConfig uploadConfig = parseUploadSource(fs, identifier, global, filenameHint); + UploadConfig uploadConfig = parseUploadSource(fs, identifier, global, filenameHint, uri); payloadLength = uploadConfig.length(); uploadFromType = uploadConfig.type(); origFilename = uploadConfig.originalFile(); @@ -299,7 +299,11 @@ private static short parsePriorityClass(String priorityString, String identifier } private UploadConfig parseUploadSource( - SimpleFieldSet fs, String identifier, boolean global, String filenameHint) + SimpleFieldSet fs, + String identifier, + boolean global, + String filenameHint, + FreenetURI parsedUri) throws MessageInvalidException { String uploadFrom = fs.get(FIELD_UPLOAD_FROM); if (uploadFrom == null || uploadFrom.equalsIgnoreCase("direct")) { @@ -316,7 +320,10 @@ private UploadConfig parseUploadSource( throw new MessageInvalidException( ProtocolErrorMessage.FILE_NOT_FOUND, null, identifier, global); bucket = new FileBucket(f, true, false, false, false); - String resolvedName = filenameHint != null ? filenameHint : f.getName(); + String resolvedName = filenameHint; + if (resolvedName == null && shouldInferDiskFilenameHint(parsedUri)) { + resolvedName = f.getName(); + } return new UploadConfig(f.length(), UploadFrom.DISK, f, null, resolvedName); } if (uploadFrom.equalsIgnoreCase("redirect")) { @@ -405,6 +412,15 @@ private static String parseCompressorDescriptor(String codecs, String identifier } } + private static boolean shouldInferDiskFilenameHint(FreenetURI uri) { + // For bare CHK inserts we do not infer a target filename from the local disk path. + // Preserving a null filename keeps semantics aligned with direct payload uploads and avoids + // forcing a metadata wrapper when the client did not explicitly request one. + return !(uri.getRoutingKey() == null + && uri.getDocName() == null + && "CHK".equals(uri.getKeyType())); + } + private record UriParseResult(FreenetURI uri, String filenameHint) {} private record UploadConfig( diff --git a/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java b/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java index 7d86c34f33..b5b56342e4 100644 --- a/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java +++ b/src/main/java/network/crypta/clients/fcp/ClientPutMimeResolver.java @@ -3,6 +3,7 @@ import java.io.File; import network.crypta.client.DefaultMIMETypes; import network.crypta.client.async.BinaryBlob; +import network.crypta.keys.FreenetURI; /** * Resolves and validates MIME types for {@link ClientPut} requests. @@ -88,6 +89,24 @@ static String resolve( identifier, global); } + if (shouldSuppressMimeForDiskBareChk(message, targetFilename)) { + // Disk uploads for bare CHK@ without an explicit TargetFilename should follow direct-mode + // semantics and avoid forcing a two-block metadata wrapper solely for MIME information. + mimeType = null; + } return mimeType; } + + private static boolean shouldSuppressMimeForDiskBareChk( + ClientPutMessage message, String targetFilename) { + return message.uploadFromType == ClientPutBase.UploadFrom.DISK + && targetFilename == null + && isBareChkInsertUri(message.uri); + } + + private static boolean isBareChkInsertUri(FreenetURI uri) { + return uri.getRoutingKey() == null + && uri.getDocName() == null + && "CHK".equals(uri.getKeyType()); + } } diff --git a/src/main/java/network/crypta/clients/fcp/ClientRequest.java b/src/main/java/network/crypta/clients/fcp/ClientRequest.java index 6851b7c06b..1f38684d37 100644 --- a/src/main/java/network/crypta/clients/fcp/ClientRequest.java +++ b/src/main/java/network/crypta/clients/fcp/ClientRequest.java @@ -472,6 +472,32 @@ public String getIdentifier() { return identifier; } + /** + * Builds the diagnostics correlation identifier for this request. + * + *
The identifier is prefixed to make it explicit that the value originates from FCP. + * + * @return prefixed identifier string, or {@code null} when no identifier is available + */ + protected final String diagnosticIdentifier() { + if (identifier == null) { + return null; + } + return "fcp:" + identifier; + } + + /** + * Assigns this request's diagnostics identifier to a low-level requester. + * + * @param requester requester that should carry the diagnostics identifier + */ + protected final void applyDiagnosticIdentifier(ClientRequester requester) { + if (requester == null) { + return; + } + requester.setExternalRequestIdentifier(diagnosticIdentifier()); + } + /** * Returns the underlying {@link ClientRequester} that performs the actual network work. * diff --git a/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java b/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java index a9422a57c5..ce53fb60d7 100755 --- a/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java +++ b/src/main/java/network/crypta/crypt/EncryptedRandomAccessBuffer.java @@ -579,17 +579,30 @@ public static LockableRandomAccessBuffer create( if (type == null) throw new StorageFormatException("Unknown EncryptedRandomAccessBufferType"); LockableRandomAccessBuffer underlying = BucketTools.restoreRAFFrom(dis, fg, persistentFileTracker, masterKey); - boolean success = false; - try { + class UnderlyingCloseGuard implements AutoCloseable { + private LockableRandomAccessBuffer buffer; + + private UnderlyingCloseGuard(LockableRandomAccessBuffer buffer) { + this.buffer = buffer; + } + + private void release() { + buffer = null; + } + + @Override + public void close() { + IOUtils.closeQuietly(buffer); + } + } + try (UnderlyingCloseGuard closeGuard = new UnderlyingCloseGuard(underlying)) { EncryptedRandomAccessBuffer restored = new EncryptedRandomAccessBuffer(type, underlying, masterKey, false); - success = true; + closeGuard.release(); return restored; } catch (GeneralSecurityException e) { throw new ResumeFailedException( new GeneralSecurityException("Crypto error resuming EncryptedRandomAccessBuffer", e)); - } finally { - if (!success) IOUtils.closeQuietly(underlying); } } diff --git a/src/main/java/network/crypta/node/BaseSender.java b/src/main/java/network/crypta/node/BaseSender.java index d060898709..d6b9b950cf 100644 --- a/src/main/java/network/crypta/node/BaseSender.java +++ b/src/main/java/network/crypta/node/BaseSender.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; /** * Base class for request and insert senders. @@ -71,6 +72,13 @@ public abstract class BaseSender implements ByteCounter, HighHtlAware { long uid; static final long SEARCH_TIMEOUT_BULK = MINUTES.toMillis(10); static final long SEARCH_TIMEOUT_REALTIME = MINUTES.toMillis(1); + + /** Keep legacy-route request sends short so blocked peer queues fail fast and reroute. */ + static final long ROUTE_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15); + + /** Extra wait after un-queue failure for route-request sends. */ + static final long ROUTE_SEND_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2); + final int incomingSearchTimeout; BaseSender(Key key, boolean realTimeFlag, PeerNode source, Node node, short htl, long uid) { @@ -298,7 +306,9 @@ protected void innerRouteRequestsOld(PeerNode next, UIDTag origTag) { // waiting in the peer’s sending queue. // - sendAsync would increase ACCEPTED_TIMEOUT risk and leave many hanging requests, further // overloading peers. Hence, we do NOT use sendAsync here. - next.transport().sendSync(req, this, realTimeFlag); + next.transport() + .sendSync( + req, this, realTimeFlag, ROUTE_SEND_TIMEOUT_MILLIS, ROUTE_SEND_UNQUEUE_WAIT_MILLIS); PeerNodeRoutingReporter.reportRoutedTo( node, next, @@ -682,7 +692,11 @@ private boolean sendToPeer(NlmState state, UIDTag origTag, Message req) { return false; } try { - state.next.transport().sendSync(req, this, realTimeFlag); + state + .next + .transport() + .sendSync( + req, this, realTimeFlag, ROUTE_SEND_TIMEOUT_MILLIS, ROUTE_SEND_UNQUEUE_WAIT_MILLIS); PeerNodeRoutingReporter.reportRoutedTo( node, state.next, diff --git a/src/main/java/network/crypta/node/CHKInsertHandler.java b/src/main/java/network/crypta/node/CHKInsertHandler.java index f541c4e42a..21e5c3cf1e 100644 --- a/src/main/java/network/crypta/node/CHKInsertHandler.java +++ b/src/main/java/network/crypta/node/CHKInsertHandler.java @@ -67,6 +67,24 @@ public class CHKInsertHandler implements PrioRunnable, ByteCounter { static final long DATA_INSERT_TIMEOUT = SECONDS.toMillis(10); + /** + * Bound synchronous waiting for the initial {@code FNPAccepted} send so handler threads do not + * stay blocked on overloaded peers for the full transport default timeout. + */ + private static final long ACCEPTED_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15); + + /** Additional wait after an un-queue failure when sending {@code FNPAccepted}. */ + private static final long ACCEPTED_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2); + + /** + * Bound synchronous waiting for {@code FNPInsertTransfersCompleted} so byte accounting remains + * accurate without blocking handlers for the full transport default timeout. + */ + private static final long COMPLETION_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15); + + /** Additional wait after an un-queue failure when sending completion to the source peer. */ + private static final long COMPLETION_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2); + final Node node; final long uid; final PeerNode source; @@ -165,10 +183,18 @@ private void realRun() { private boolean sendAccepted() { // Consider inserting rate limiting here if the acceptance path requires backpressure. Message accepted = DMT.createFNPAccepted(uid); + clearInternalWakeupInterruptBeforeSyncSend("accepted"); try { - // Synchronous send here ensures the next message filter does not spuriously time out; we - // either block here, or inside the filter, but we prefer to fail early on sending. - source.transport().sendSync(accepted, this, realTimeFlag); + // Preserve send ordering with a bounded wait to avoid pinning handler threads for long + // transport-level timeouts. + source + .transport() + .sendSync( + accepted, + this, + realTimeFlag, + ACCEPTED_SEND_TIMEOUT_MILLIS, + ACCEPTED_UNQUEUE_WAIT_MILLIS); return true; } catch (NotConnectedException _) { if (LOG.isDebugEnabled()) LOG.debug("Lost connection to source while sending FNPAccepted"); @@ -246,19 +272,38 @@ private void setupForDataInsert(Message msg) { private void processSenderStatuses() { boolean receivedRejectedOverload = false; - while (true) { - waitOnSender(); - if (receiveFailed()) { - finish(CHKInsertSender.RECEIVE_FAILED); - return; - } + // Clear and remember any pre-existing interrupt (e.g., stale executor/cancellation state) so + // it cannot leak into synchronous sending; restore on exit. + boolean interrupted = Thread.interrupted(); + try { + while (true) { + try { + waitOnSender(); + } catch (InterruptedException _) { + if (receiveFailed()) { + finish(CHKInsertSender.RECEIVE_FAILED); + return; + } + // Preserve external interruption only after terminal handling so it does not leak into + // synchronous sending and cause false transport timeouts. + interrupted = true; + } + if (receiveFailed()) { + finish(CHKInsertSender.RECEIVE_FAILED); + return; + } - receivedRejectedOverload = forwardNonTerminalOverloadIfNeeded(receivedRejectedOverload); + receivedRejectedOverload = forwardNonTerminalOverloadIfNeeded(receivedRejectedOverload); - int status = sender.getStatus(); - if (status != CHKInsertSender.NOT_FINISHED) { - handleTerminalStatus(status); - return; + int status = sender.getStatus(); + if (status != CHKInsertSender.NOT_FINISHED) { + handleTerminalStatus(status); + return; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); } } } @@ -277,19 +322,14 @@ private boolean forwardNonTerminalOverloadIfNeeded(boolean alreadyForwarded) { return alreadyForwarded; } - private void waitOnSender() { + private void waitOnSender() throws InterruptedException { final CHKInsertSender s = sender; synchronized (s) { - try { - long deadline = System.currentTimeMillis() + 5000L; - long remaining = deadline - System.currentTimeMillis(); - while (s.getStatus() == CHKInsertSender.NOT_FINISHED && remaining > 0L) { - s.wait(remaining); - remaining = deadline - System.currentTimeMillis(); - } - } catch (InterruptedException _) { - // Restore interrupt status; likely set by receive failing. - Thread.currentThread().interrupt(); + long deadline = System.currentTimeMillis() + 5000L; + long remaining = deadline - System.currentTimeMillis(); + while (s.getStatus() == CHKInsertSender.NOT_FINISHED && remaining > 0L) { + s.wait(remaining); + remaining = deadline - System.currentTimeMillis(); } } } @@ -317,6 +357,7 @@ private void handleTerminalStatus(int status) { */ private void handleFatalOverload(int status) { Message msg = DMT.createFNPRejectedOverload(uid, true); + clearInternalWakeupInterruptBeforeSyncSend("fatal overload"); try { source.transport().sendSync(msg, this, realTimeFlag); } catch (NotConnectedException _) { @@ -334,6 +375,7 @@ private void handleFatalOverload(int status) { /** Sends a route-not-found response including the final HTL, then finalizes the insert. */ private void handleRouteNotFound(int status) { Message msg = DMT.createFNPRouteNotFound(uid, sender.getHTL()); + clearInternalWakeupInterruptBeforeSyncSend("route-not-found"); try { source.transport().sendSync(msg, this, realTimeFlag); } catch (NotConnectedException _) { @@ -356,6 +398,7 @@ private void handleReceiveFailed() { /** Sends a success reply upstream and finalizes the insert. */ private void handleSuccess(int status) { Message msg = DMT.createFNPInsertReply(uid); + clearInternalWakeupInterruptBeforeSyncSend("success"); try { source.transport().sendSync(msg, this, realTimeFlag); } catch (NotConnectedException _) { @@ -376,6 +419,7 @@ private void handleSuccess(int status) { private void handleUnknownStatus() { LOG.error("Unknown status code: {}", sender.getStatusString()); Message msg = DMT.createFNPRejectedOverload(uid, true); + clearInternalWakeupInterruptBeforeSyncSend("unknown-status overload"); try { source.transport().sendSync(msg, this, realTimeFlag); } catch (NotConnectedException | SyncSendWaitedTooLongException _) { @@ -556,8 +600,18 @@ private void waitForReceiveToComplete() { try { wait(SECONDS.toMillis(100)); } catch (InterruptedException _) { - // Restore interrupted status + // Interrupts are used to wake this loop when receive-failure completion runs. Thread.currentThread().interrupt(); + if (receiveCompleted) { + break; + } + // Clear before retrying the timed wait so we do not spin on immediate rethrow. + boolean interruptStatusCleared = Thread.interrupted(); + if (!interruptStatusCleared && LOG.isTraceEnabled()) { + LOG.trace( + "Interrupt status was already clear before retrying waitForReceiveToComplete on {}", + uid); + } } } } @@ -598,41 +652,64 @@ private Message awaitDownstreamAndBuildCompletionMsg( private boolean waitUntilSenderCompletedWithin(long deadlineMillis) { final CHKInsertSender s = sender; - while (true) { - synchronized (s) { - if (s.completed()) return false; - try { - long remaining = deadlineMillis - System.currentTimeMillis(); - int t = (int) Math.clamp(remaining, 0L, Integer.MAX_VALUE); - if (t > 0) s.wait(t); - else return true; // took too long - } catch (InterruptedException _) { - // Restore interrupted status and loop - Thread.currentThread().interrupt(); + boolean interrupted = false; + try { + while (true) { + synchronized (s) { + if (s.completed()) return false; + try { + long remaining = deadlineMillis - System.currentTimeMillis(); + int t = (int) Math.clamp(remaining, 0L, Integer.MAX_VALUE); + if (t > 0) s.wait(t); + else return true; // took too long + } catch (InterruptedException _) { + // Interrupts here are used as wake-ups; consume and continue checking sender state. + interrupted = true; + } } } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } private void waitUntilSenderCompletedNoTimeout() { final CHKInsertSender s = sender; - while (true) { - synchronized (s) { - if (s.completed()) return; - try { - s.wait(SECONDS.toMillis(10)); - } catch (InterruptedException _) { - // Restore interrupted status and loop - Thread.currentThread().interrupt(); + boolean interrupted = false; + try { + while (true) { + synchronized (s) { + if (s.completed()) return; + try { + s.wait(SECONDS.toMillis(10)); + } catch (InterruptedException _) { + // Interrupts here are used as wake-ups; consume and continue checking sender state. + interrupted = true; + } } } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } private void sendCompletion(Message m) { + clearInternalWakeupInterruptBeforeSyncSend("completion"); try { - // We do need to sendSync here so we have accurate byte counter totals. - source.transport().sendSync(m, this, realTimeFlag); + // Keep completion synchronous so handler/sender byte totals include this message before we + // report aggregate insert costs. + source + .transport() + .sendSync( + m, + this, + realTimeFlag, + COMPLETION_SEND_TIMEOUT_MILLIS, + COMPLETION_UNQUEUE_WAIT_MILLIS); if (LOG.isDebugEnabled()) LOG.debug("Sent completion: {}" + FOR_STRING + "{}", m, this); } catch (NotConnectedException _) { if (LOG.isDebugEnabled()) LOG.debug("Not connected: {}" + FOR_STRING + "{}", source, this); @@ -643,6 +720,16 @@ private void sendCompletion(Message m) { } } + private void clearInternalWakeupInterruptBeforeSyncSend(String messageType) { + // Internal receive-failure wake-ups, stale pooled-thread interrupt flags, or preserved + // cancellation interrupts can otherwise leak into synchronous send wait logic and produce false + // timeout handling. + boolean interruptStatusCleared = Thread.interrupted(); + if (interruptStatusCleared && LOG.isTraceEnabled()) { + LOG.trace("Cleared interrupt status before sending {} for {}", messageType, uid); + } + } + private void reportStatsIfNeeded(int code) { if (code == CHKInsertSender.TIMED_OUT || code == CHKInsertSender.GENERATED_REJECTED_OVERLOAD diff --git a/src/main/java/network/crypta/node/CHKInsertSender.java b/src/main/java/network/crypta/node/CHKInsertSender.java index 8327990b05..e7ef6d0c02 100644 --- a/src/main/java/network/crypta/node/CHKInsertSender.java +++ b/src/main/java/network/crypta/node/CHKInsertSender.java @@ -419,6 +419,13 @@ public void start() { // Constants static final long ACCEPTED_TIMEOUT = SECONDS.toMillis(10); + + /** Bound waiting for initial {@code FNPDataInsert} queueing so routing can fail over quickly. */ + static final long DATA_INSERT_SEND_TIMEOUT_MILLIS = SECONDS.toMillis(15); + + /** Additional wait after un-queue failure for {@code FNPDataInsert}. */ + static final long DATA_INSERT_UNQUEUE_WAIT_MILLIS = SECONDS.toMillis(2); + static final long TRANSFER_COMPLETION_ACK_TIMEOUT_REALTIME = MINUTES.toMillis(1); static final long TRANSFER_COMPLETION_ACK_TIMEOUT_BULK = MINUTES.toMillis(5); @@ -1502,7 +1509,13 @@ protected void onAccepted(PeerNode next) { if (LOG.isDebugEnabled()) LOG.debug("Sending DataInsert"); try { - next.transport().sendSync(dataInsert, this, realTimeFlag); + next.transport() + .sendSync( + dataInsert, + this, + realTimeFlag, + DATA_INSERT_SEND_TIMEOUT_MILLIS, + DATA_INSERT_UNQUEUE_WAIT_MILLIS); } catch (NotConnectedException _) { if (LOG.isDebugEnabled()) LOG.debug("Not connected sending DataInsert: {}" + FOR + "{}", next, uid); diff --git a/src/main/java/network/crypta/node/NodeClientCoreTransfers.java b/src/main/java/network/crypta/node/NodeClientCoreTransfers.java index 2f09f9c896..f9c2eb028a 100644 --- a/src/main/java/network/crypta/node/NodeClientCoreTransfers.java +++ b/src/main/java/network/crypta/node/NodeClientCoreTransfers.java @@ -92,10 +92,28 @@ public void asyncGet( final Key key, final RequestCompletionListener listener, NodeRoutingSubsystem.RequestSenderOptions options) { + asyncGet(key, listener, options, null); + } + + /** + * Schedules a non-blocking fetch and records an optional external diagnostics identifier. + * + * @param key key to fetch; typically a {@link Key} or {@link NodeSSK}. + * @param listener callback notified for success, failure, or local-store hits. + * @param options request flags controlling store access, cache usage, offered-key handling, and + * real-time scheduling. + * @param externalRequestIdentifier optional external identifier used for diagnostics correlation + */ + public void asyncGet( + final Key key, + final RequestCompletionListener listener, + NodeRoutingSubsystem.RequestSenderOptions options, + String externalRequestIdentifier) { final long uid = makeUID(); final boolean isSSK = key instanceof NodeSSK; final RequestTag tag = new RequestTag(isSSK, RequestTag.START.ASYNC_GET, null, options.realTimeFlag(), uid, node); + tag.setExternalRequestIdentifier(externalRequestIdentifier); if (!node.routing() .tracker() .lockUID( @@ -654,6 +672,38 @@ public void realPut( boolean ignoreLowBackoff, boolean realTimeFlag) throws LowLevelPutException { + realPut( + block, + canWriteClientCache, + forkOnCacheable, + preferInsert, + ignoreLowBackoff, + realTimeFlag, + null); + } + + /** + * Synchronously inserts a CHK or SSK block into the network with optional diagnostics metadata. + * + * @param block block to insert; must be a {@link CHKBlock} or {@link SSKBlock}. + * @param canWriteClientCache whether the insert may update the client cache. + * @param forkOnCacheable true to fork inserts when the block is cacheable. + * @param preferInsert true to prefer insert strategies when alternatives exist. + * @param ignoreLowBackoff true to ignore low backoff during routing decisions. + * @param realTimeFlag true for latency-optimized inserts; false for bulk routing. + * @param externalRequestIdentifier optional external identifier used for diagnostics correlation + * @throws LowLevelPutException when routing fails, overload occurs, or internal errors arise. + * @throws IllegalArgumentException if the block type is unsupported for insert. + */ + public void realPut( + KeyBlock block, + boolean canWriteClientCache, + boolean forkOnCacheable, + boolean preferInsert, + boolean ignoreLowBackoff, + boolean realTimeFlag, + String externalRequestIdentifier) + throws LowLevelPutException { switch (block) { case CHKBlock kBlock1 -> realPutCHK( @@ -662,7 +712,8 @@ public void realPut( forkOnCacheable, preferInsert, ignoreLowBackoff, - realTimeFlag); + realTimeFlag, + externalRequestIdentifier); case SSKBlock kBlock -> realPutSSK( kBlock, @@ -670,7 +721,8 @@ public void realPut( forkOnCacheable, preferInsert, ignoreLowBackoff, - realTimeFlag); + realTimeFlag, + externalRequestIdentifier); default -> throw new IllegalArgumentException("Unknown put type " + block.getClass()); } } @@ -700,6 +752,38 @@ public void realPutCHK( boolean ignoreLowBackoff, boolean realTimeFlag) throws LowLevelPutException { + realPutCHK( + block, + canWriteClientCache, + forkOnCacheable, + preferInsert, + ignoreLowBackoff, + realTimeFlag, + null); + } + + /** + * Synchronously inserts a CHK block and applies local caching rules. + * + * @param block CHK block to insert and potentially store locally. + * @param canWriteClientCache whether the insert may update the client cache. + * @param forkOnCacheable true to fork inserts when the block is cacheable. + * @param preferInsert true to prefer insert strategies when alternatives exist. + * @param ignoreLowBackoff true to ignore low backoff during routing decisions. + * @param realTimeFlag true for latency-optimized inserts; false for bulk routing. + * @param externalRequestIdentifier optional external identifier used for diagnostics correlation + * @throws LowLevelPutException when routing fails, overload is reported, or internal errors + * arise. + */ + public void realPutCHK( + CHKBlock block, + boolean canWriteClientCache, + boolean forkOnCacheable, + boolean preferInsert, + boolean ignoreLowBackoff, + boolean realTimeFlag, + String externalRequestIdentifier) + throws LowLevelPutException { byte[] data = block.getData(); byte[] headers = block.getHeaders(); NodeRoutingSubsystem.ChkInsertOptions options = @@ -714,6 +798,7 @@ public void realPutCHK( CHKInsertSender is; long uid = makeUID(); InsertTag tag = new InsertTag(false, InsertTag.START.LOCAL, null, realTimeFlag, uid, node); + tag.setExternalRequestIdentifier(externalRequestIdentifier); if (!node.routing() .tracker() .lockUID(uid, RequestAdmissionMode.of(true, false, true, false, realTimeFlag), tag)) { @@ -886,9 +971,41 @@ public void realPutSSK( boolean ignoreLowBackoff, boolean realTimeFlag) throws LowLevelPutException { + realPutSSK( + block, + canWriteClientCache, + forkOnCacheable, + preferInsert, + ignoreLowBackoff, + realTimeFlag, + null); + } + + /** + * Synchronously inserts an SSK block and handles collision checks. + * + * @param block SSK block to insert and potentially store locally. + * @param canWriteClientCache whether the insert may update the client cache. + * @param forkOnCacheable true to fork inserts when the block is cacheable. + * @param preferInsert true to prefer insert strategies when alternatives exist. + * @param ignoreLowBackoff true to ignore low backoff during routing decisions. + * @param realTimeFlag true for latency-optimized inserts; false for bulk routing. + * @param externalRequestIdentifier optional external identifier used for diagnostics correlation + * @throws LowLevelPutException on collision, routing failure, overload, or internal errors. + */ + public void realPutSSK( + SSKBlock block, + boolean canWriteClientCache, + boolean forkOnCacheable, + boolean preferInsert, + boolean ignoreLowBackoff, + boolean realTimeFlag, + String externalRequestIdentifier) + throws LowLevelPutException { SSKInsertSender is; long uid = makeUID(); InsertTag tag = new InsertTag(true, InsertTag.START.LOCAL, null, realTimeFlag, uid, node); + tag.setExternalRequestIdentifier(externalRequestIdentifier); if (!node.routing() .tracker() .lockUID(uid, RequestAdmissionMode.of(true, true, true, false, realTimeFlag), tag)) { diff --git a/src/main/java/network/crypta/node/PeerNodeTransport.java b/src/main/java/network/crypta/node/PeerNodeTransport.java index a1ce5e03ec..4cf0212c93 100644 --- a/src/main/java/network/crypta/node/PeerNodeTransport.java +++ b/src/main/java/network/crypta/node/PeerNodeTransport.java @@ -54,6 +54,19 @@ final class PeerNodeTransport implements PeerTransport { /** Separator used in log messages to keep peer identifiers readable. */ private static final String STR_FOR = " for "; + /** + * Default upper bound for synchronous send waits before attempting un-queue. + * + *
Kept at one minute to preserve historical behavior for callers that do not provide explicit + * timeout bounds. + */ + private static final long SEND_SYNC_WAIT_MILLIS_DEFAULT = MINUTES.toMillis(1); + + /** + * Default extra wait after un-queue failure in {@link #sendSync(Message, ByteCounter, boolean)}. + */ + private static final long SEND_SYNC_UNQUEUE_WAIT_MILLIS_DEFAULT = SECONDS.toMillis(10); + /** Owning peer for this transport; provides queues, stats, and crypto context. */ private final PeerNode peer; @@ -188,21 +201,54 @@ public MessageItem sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter c @Override public void sendSync(Message req, ByteCounter ctr, boolean realTime) throws NotConnectedException, SyncSendWaitedTooLongException { + sendSync( + req, ctr, realTime, SEND_SYNC_WAIT_MILLIS_DEFAULT, SEND_SYNC_UNQUEUE_WAIT_MILLIS_DEFAULT); + } + + @Override + public void sendSync( + Message req, + ByteCounter ctr, + boolean realTime, + long sendTimeoutMillis, + long unqueueWaitMillis) + throws NotConnectedException, SyncSendWaitedTooLongException { SyncMessageCallback cb = new SyncMessageCallback(); MessageItem item = sendAsync(req, cb, ctr); - cb.waitForSend(MINUTES.toMillis(1)); + long boundedSendTimeout = Math.max(1L, sendTimeoutMillis); + long boundedUnqueueTimeout = Math.max(1L, unqueueWaitMillis); + cb.waitForSend(boundedSendTimeout); if (!cb.done) { - LOG.warn("Waited too long for a blocking send for {} to {}", req, peer.selfPeerNode()); + if (LOG.isWarnEnabled()) { + SendWaitDiagnostics diagnostics = buildSendWaitDiagnostics(item); + LOG.warn( + "Waited too long for a blocking send for {} to {} (sendWaitAgeMs={} uid={} external={}" + + " uidAgeMs={})", + req, + peer.selfPeerNode(), + diagnostics.sendWaitAgeMs(), + diagnostics.uidForLog(), + diagnostics.externalIdentifierForLog(), + diagnostics.uidAgeMsForLog()); + } peer.localRejectedOverload("SendSyncTimeout", realTime); // Try to un-queue it, since it presumably won't be of any use now. if (!peer.getMessageQueue().removeMessage(item)) { - cb.waitForSend(SECONDS.toMillis(10)); + cb.waitForSend(boundedUnqueueTimeout); if (!cb.done) { - LOG.error( - "Waited too long for blocking send and then could not un-queue for {} to {}", - req, - peer.selfPeerNode(), - new Exception(STR_ERROR)); + if (LOG.isErrorEnabled()) { + SendWaitDiagnostics secondDiagnostics = buildSendWaitDiagnostics(item); + LOG.error( + "Waited too long for blocking send and then could not un-queue for {} to {}" + + " (sendWaitAgeMs={} uid={} external={} uidAgeMs={})", + req, + peer.selfPeerNode(), + secondDiagnostics.sendWaitAgeMs(), + secondDiagnostics.uidForLog(), + secondDiagnostics.externalIdentifierForLog(), + secondDiagnostics.uidAgeMsForLog(), + new Exception(STR_ERROR)); + } // Can't cancel yet, can't send it, something seriously wrong. // Treat as fatal timeout as probably their fault. // Note: We have already waited more than the no-messages timeout; do not wait again. @@ -216,6 +262,34 @@ public void sendSync(Message req, ByteCounter ctr, boolean realTime) } } + private SendWaitDiagnostics buildSendWaitDiagnostics(MessageItem item) { + long sendWaitAgeMs = Math.max(0L, System.currentTimeMillis() - item.submitted); + long uid = item.getID(); + if (uid < 0) { + return new SendWaitDiagnostics(sendWaitAgeMs, null, null, null); + } + UIDTag tag = peer.node.routing().tracker().findTagByUid(uid); + String externalIdentifier = tag == null ? null : tag.getExternalRequestIdentifier(); + Long uidAgeMs = tag == null ? null : tag.age(); + return new SendWaitDiagnostics(sendWaitAgeMs, uid, externalIdentifier, uidAgeMs); + } + + private record SendWaitDiagnostics( + long sendWaitAgeMs, Long uid, String externalIdentifier, Long uidAgeMs) { + + String uidForLog() { + return uid == null ? "unknown" : Long.toString(uid); + } + + String externalIdentifierForLog() { + return externalIdentifier == null ? "n/a" : externalIdentifier; + } + + String uidAgeMsForLog() { + return uidAgeMs == null ? "n/a" : Long.toString(uidAgeMs); + } + } + /** * Sends a ping and waits briefly for a matching pong. * diff --git a/src/main/java/network/crypta/node/PeerTransport.java b/src/main/java/network/crypta/node/PeerTransport.java index db44fa665e..54d62b7a99 100644 --- a/src/main/java/network/crypta/node/PeerTransport.java +++ b/src/main/java/network/crypta/node/PeerTransport.java @@ -79,6 +79,32 @@ MessageItem sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter ctr) void sendSync(Message req, ByteCounter ctr, boolean realTime) throws NotConnectedException, SyncSendWaitedTooLongException; + /** + * Sends a message synchronously using explicit timeout bounds. + * + *
Implementations may override this to provide tuned timeouts for specific call sites while + * preserving the same synchronous semantics as {@link #sendSync(Message, ByteCounter, boolean)}. + * The default implementation delegates to the legacy method and ignores the timeout parameters to + * keep backward compatibility for transports that do not need custom bounds. + * + * @param req message to send; must be a fully populated, encodable instance + * @param ctr byte counter used for bandwidth accounting; may be {@code null} + * @param realTime {@code true} to request real-time scheduling; {@code false} otherwise + * @param sendTimeoutMillis primary wait before trying to un-queue a blocked send + * @param unqueueWaitMillis additional wait after un-queue failure before giving up + * @throws NotConnectedException if the peer disconnects before the send completes + * @throws SyncSendWaitedTooLongException if the acknowledgement does not arrive in time + */ + default void sendSync( + Message req, + ByteCounter ctr, + boolean realTime, + long sendTimeoutMillis, + long unqueueWaitMillis) + throws NotConnectedException, SyncSendWaitedTooLongException { + sendSync(req, ctr, realTime); + } + /** * Sends a low-level ping and waits for a corresponding pong. * diff --git a/src/main/java/network/crypta/node/RequestTracker.java b/src/main/java/network/crypta/node/RequestTracker.java index ef8dbf7ce5..b824565058 100644 --- a/src/main/java/network/crypta/node/RequestTracker.java +++ b/src/main/java/network/crypta/node/RequestTracker.java @@ -736,6 +736,46 @@ else if (mode.isInsert()) else return getRequestTracker(mode.isSSK(), mode.isLocal(), mode.realTimeFlag()); } + /** + * Finds the currently tracked tag for the given UID across all request categories. + * + *
This lookup is intended for diagnostics paths and therefore favors clarity over micro
+ * optimizations.
+ *
+ * @param uid uid to search for
+ * @return matching in-flight tag, or {@code null} when no active tag owns the uid
+ */
+ public UIDTag findTagByUid(long uid) {
+ return findTagInMaps(
+ uid,
+ runningCHKGetUIDsRT,
+ runningSSKGetUIDsRT,
+ runningCHKGetUIDsBulk,
+ runningSSKGetUIDsBulk,
+ runningCHKPutUIDsRT,
+ runningSSKPutUIDsRT,
+ runningCHKPutUIDsBulk,
+ runningSSKPutUIDsBulk,
+ runningCHKOfferReplyUIDsRT,
+ runningSSKOfferReplyUIDsRT,
+ runningCHKOfferReplyUIDsBulk,
+ runningSSKOfferReplyUIDsBulk);
+ }
+
+ @SafeVarargs
+ private static UIDTag findTagInMaps(long uid, Map This is best-effort metadata and does not affect routing decisions.
+ *
+ * @param externalRequestIdentifier optional external identifier (for example {@code fcp: