diff --git a/src/main/java/network/crypta/io/xfer/BulkTransmitter.java b/src/main/java/network/crypta/io/xfer/BulkTransmitter.java index f828e66e3d..f685afb333 100644 --- a/src/main/java/network/crypta/io/xfer/BulkTransmitter.java +++ b/src/main/java/network/crypta/io/xfer/BulkTransmitter.java @@ -78,6 +78,11 @@ public interface AllSentCallback { */ final BitArray blocksNotSentButPresent; + /** + * Number of blocks observed by this transmitter (initial snapshot + {@link #blockReceived(int)}). + */ + private int blocksReceivedByTransmitter; + private boolean cancelled; /** Peer boot identifier observed when the transfer started; used to detect restarts. */ @@ -159,6 +164,7 @@ public BulkTransmitter( synchronized (this.prb) { // We can just clone it. blocksNotSentButPresent = prb.cloneBlocksReceived(); + blocksReceivedByTransmitter = countSetBits(blocksNotSentButPresent); prb.add(this); } try { @@ -250,6 +256,9 @@ public void onRestarted(PeerContext ctx) { * @param block The block number that has been received. */ synchronized void blockReceived(int block) { + if (!blocksNotSentButPresent.bitAt(block)) { + blocksReceivedByTransmitter++; + } blocksNotSentButPresent.setBit(block, true); notifyAll(); } @@ -432,13 +441,28 @@ private Outcome handleNoBlockAvailable(long lastSentPacket) { } private Outcome fastCompleteIfNoWait() { - if (noWait && prb.hasWholeFile()) { - completed(); - return Outcome.SUCCEEDED; + if (!noWait) { + return null; + } + synchronized (this) { + // Complete in no-wait mode only after this transmitter has observed every block and there + // are no locally pending blocks left to queue. + if (blocksReceivedByTransmitter >= prb.blocks && blocksNotSentButPresent.firstOne() < 0) { + completed(); + return Outcome.SUCCEEDED; + } } return null; } + private static int countSetBits(BitArray bits) { + int count = 0; + for (int idx = bits.firstOne(); idx >= 0; idx = bits.firstOne(idx + 1)) { + count++; + } + return count; + } + /** * Wait while there are packets in flight, but wake opportunistically. * diff --git a/src/test/java/network/crypta/io/xfer/BulkTransmitterTest.java b/src/test/java/network/crypta/io/xfer/BulkTransmitterTest.java index b93c930ef8..046b4dfed4 100644 --- a/src/test/java/network/crypta/io/xfer/BulkTransmitterTest.java +++ b/src/test/java/network/crypta/io/xfer/BulkTransmitterTest.java @@ -1,8 +1,10 @@ package network.crypta.io.xfer; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import network.crypta.io.comm.AsyncMessageCallback; @@ -84,6 +86,46 @@ public int getWaitingThreadsCount() { } } + /** + * Buffer that can block one write offset to force a deterministic ordering in race-condition + * tests. + */ + private static final class BlockingWriteBuffer extends ByteArrayRandomAccessBuffer { + private final long blockedOffset; + private final CountDownLatch writeStarted = new CountDownLatch(1); + private final CountDownLatch releaseWrite = new CountDownLatch(1); + + BlockingWriteBuffer(byte[] initialData, long blockedOffset) { + super(initialData); + this.blockedOffset = blockedOffset; + } + + @Override + public synchronized void pwrite(long fileOffset, byte[] buf, int bufOffset, int length) + throws IOException { + if (fileOffset == blockedOffset) { + writeStarted.countDown(); + try { + if (!releaseWrite.await(10, TimeUnit.SECONDS)) { + throw new IOException("Timed out waiting to release blocked write"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to release blocked write", e); + } + } + super.pwrite(fileOffset, buf, bufOffset, length); + } + + boolean awaitWriteStarted(long timeout, TimeUnit unit) throws InterruptedException { + return writeStarted.await(timeout, unit); + } + + void releaseBlockedWrite() { + releaseWrite.countDown(); + } + } + private MessageCore usm; @BeforeEach @@ -321,6 +363,97 @@ void send_whenNewBlockArrivesWithPacketsInFlight_expectOpportunisticSecondSend() verify(ctr, times(2)).sentPayload(BLOCK_SIZE); } + @Test + void send_whenNoWaitAndWholeFileSetBeforeNotification_expectNoPrematureCompletion() + throws Exception { + // Prepare a PRB with two blocks but no initial data available. + int blocks = 2; + int size = blocks * BLOCK_SIZE; + byte[] backing = new byte[size]; + byte[] pattern = new byte[BLOCK_SIZE]; + for (int i = 0; i < BLOCK_SIZE; i++) pattern[i] = (byte) (i & 0xFF); + System.arraycopy(pattern, 0, backing, 0, BLOCK_SIZE); + System.arraycopy(pattern, 0, backing, BLOCK_SIZE, BLOCK_SIZE); + BlockingWriteBuffer rab = new BlockingWriteBuffer(backing, BLOCK_SIZE); + PartiallyReceivedBulk prb = new PartiallyReceivedBulk(usm, size, BLOCK_SIZE, rab, false); + + PeerContext peer = mock(PeerContext.class); + PeerTransport transport = mock(PeerTransport.class); + when(peer.transport()).thenReturn(transport); + when(peer.getBootID()).thenReturn(223L); + when(peer.isConnected()).thenReturn(true); + when(peer.getThrottleWindowSize()).thenReturn(2); + when(peer.shortToString()).thenReturn("peerRace"); + + CountDownLatch firstSent = new CountDownLatch(1); + CountDownLatch allowFirstSendReturn = new CountDownLatch(1); + CountDownLatch secondSent = new CountDownLatch(1); + AtomicReference firstCallback = new AtomicReference<>(); + AtomicInteger calls = new AtomicInteger(); + + doAnswer( + inv -> { + AsyncMessageCallback cb = inv.getArgument(1); + int idx = calls.getAndIncrement(); + if (idx == 0) { + firstCallback.set(cb); + cb.sent(); + firstSent.countDown(); + if (!allowFirstSendReturn.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("first send did not get release signal"); + } + } else if (idx == 1) { + cb.sent(); + cb.acknowledged(); + secondSent.countDown(); + } + return mock(MessageItem.class); + }) + .when(transport) + .sendAsync(any(Message.class), any(AsyncMessageCallback.class), eq(ctr)); + + BulkTransmitter bt = new BulkTransmitter(prb, peer, UID, true, ctr, false, null); + + CompletableFuture result = + CompletableFuture.supplyAsync( + () -> { + try { + return bt.send(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Send the first block and hold sendAsync return to control sender progress. + prb.received(0, pattern, 0, BLOCK_SIZE); + assertTrue(firstSent.await(5, TimeUnit.SECONDS), "first packet sent"); + + // Receive block 1 on another thread; this sets hasWholeFile=true but blocks before notifying + // transmitter.blockReceived(), creating the historical race window. + Thread receiveSecond = + Thread.ofPlatform() + .name("bt-race-receive-second") + .start(() -> prb.received(1, pattern, 0, BLOCK_SIZE)); + assertTrue( + rab.awaitWriteStarted(5, TimeUnit.SECONDS), "second block write reached blocked section"); + + // Let the first send return and give the sender a chance to evaluate no-wait completion. + allowFirstSendReturn.countDown(); + assertThrows( + TimeoutException.class, + () -> result.get(300, TimeUnit.MILLISECONDS), + "sender must not complete before it has observed block 1"); + + // Now release block 1 write so PartiallyReceivedBulk can notify transmitter.blockReceived(). + rab.releaseBlockedWrite(); + receiveSecond.join(5_000); + + assertTrue(secondSent.await(10, TimeUnit.SECONDS), "second packet sent after notification"); + firstCallback.get().acknowledged(); + assertTrue(result.get(10, TimeUnit.SECONDS), "send() should succeed"); + verify(ctr, times(2)).sentPayload(BLOCK_SIZE); + } + @Test void send_whenQueueEmpty_thenNewBlockArrives_expectWakeFromAckWait() throws Exception { // Prepare a PRB with one block but initially no data available.