Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/main/java/network/crypta/io/xfer/BulkTransmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public interface AllSentCallback {
*/
final BitArray blocksNotSentButPresent;

/**
* Number of blocks observed by this transmitter (initial snapshot + {@link #blockReceived(int)}).
*/
private int blocksReceivedByTransmitter;

private boolean cancelled;

/** Peer boot identifier observed when the transfer started; used to detect restarts. */
Expand Down Expand Up @@ -159,6 +164,7 @@ public BulkTransmitter(
synchronized (this.prb) {
// We can just clone it.
blocksNotSentButPresent = prb.cloneBlocksReceived();
blocksReceivedByTransmitter = countSetBits(blocksNotSentButPresent);
prb.add(this);
}
try {
Expand Down Expand Up @@ -250,6 +256,9 @@ public void onRestarted(PeerContext ctx) {
* @param block The block number that has been received.
*/
synchronized void blockReceived(int block) {
if (!blocksNotSentButPresent.bitAt(block)) {
blocksReceivedByTransmitter++;
}
blocksNotSentButPresent.setBit(block, true);
notifyAll();
}
Expand Down Expand Up @@ -432,13 +441,28 @@ private Outcome handleNoBlockAvailable(long lastSentPacket) {
}

private Outcome fastCompleteIfNoWait() {
if (noWait && prb.hasWholeFile()) {
completed();
return Outcome.SUCCEEDED;
if (!noWait) {
return null;
}
synchronized (this) {
// Complete in no-wait mode only after this transmitter has observed every block and there
// are no locally pending blocks left to queue.
if (blocksReceivedByTransmitter >= prb.blocks && blocksNotSentButPresent.firstOne() < 0) {
completed();
return Outcome.SUCCEEDED;
}
}
return null;
}

private static int countSetBits(BitArray bits) {
int count = 0;
for (int idx = bits.firstOne(); idx >= 0; idx = bits.firstOne(idx + 1)) {
count++;
}
return count;
}

/**
* Wait while there are packets in flight, but wake opportunistically.
*
Expand Down
133 changes: 133 additions & 0 deletions src/test/java/network/crypta/io/xfer/BulkTransmitterTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package network.crypta.io.xfer;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import network.crypta.io.comm.AsyncMessageCallback;
Expand Down Expand Up @@ -84,6 +86,46 @@ public int getWaitingThreadsCount() {
}
}

/**
* Buffer that can block one write offset to force a deterministic ordering in race-condition
* tests.
*/
private static final class BlockingWriteBuffer extends ByteArrayRandomAccessBuffer {
private final long blockedOffset;
private final CountDownLatch writeStarted = new CountDownLatch(1);
private final CountDownLatch releaseWrite = new CountDownLatch(1);

BlockingWriteBuffer(byte[] initialData, long blockedOffset) {
super(initialData);
this.blockedOffset = blockedOffset;
}

@Override
public synchronized void pwrite(long fileOffset, byte[] buf, int bufOffset, int length)
throws IOException {
if (fileOffset == blockedOffset) {
writeStarted.countDown();
try {
if (!releaseWrite.await(10, TimeUnit.SECONDS)) {
throw new IOException("Timed out waiting to release blocked write");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting to release blocked write", e);
}
}
super.pwrite(fileOffset, buf, bufOffset, length);
}

boolean awaitWriteStarted(long timeout, TimeUnit unit) throws InterruptedException {
return writeStarted.await(timeout, unit);
}

void releaseBlockedWrite() {
releaseWrite.countDown();
}
}

private MessageCore usm;

@BeforeEach
Expand Down Expand Up @@ -321,6 +363,97 @@ void send_whenNewBlockArrivesWithPacketsInFlight_expectOpportunisticSecondSend()
verify(ctr, times(2)).sentPayload(BLOCK_SIZE);
}

@Test
void send_whenNoWaitAndWholeFileSetBeforeNotification_expectNoPrematureCompletion()
throws Exception {
// Prepare a PRB with two blocks but no initial data available.
int blocks = 2;
int size = blocks * BLOCK_SIZE;
byte[] backing = new byte[size];
byte[] pattern = new byte[BLOCK_SIZE];
for (int i = 0; i < BLOCK_SIZE; i++) pattern[i] = (byte) (i & 0xFF);
System.arraycopy(pattern, 0, backing, 0, BLOCK_SIZE);
System.arraycopy(pattern, 0, backing, BLOCK_SIZE, BLOCK_SIZE);
BlockingWriteBuffer rab = new BlockingWriteBuffer(backing, BLOCK_SIZE);
PartiallyReceivedBulk prb = new PartiallyReceivedBulk(usm, size, BLOCK_SIZE, rab, false);

PeerContext peer = mock(PeerContext.class);
PeerTransport transport = mock(PeerTransport.class);
when(peer.transport()).thenReturn(transport);
when(peer.getBootID()).thenReturn(223L);
when(peer.isConnected()).thenReturn(true);
when(peer.getThrottleWindowSize()).thenReturn(2);
when(peer.shortToString()).thenReturn("peerRace");

CountDownLatch firstSent = new CountDownLatch(1);
CountDownLatch allowFirstSendReturn = new CountDownLatch(1);
CountDownLatch secondSent = new CountDownLatch(1);
AtomicReference<AsyncMessageCallback> firstCallback = new AtomicReference<>();
AtomicInteger calls = new AtomicInteger();

doAnswer(
inv -> {
AsyncMessageCallback cb = inv.getArgument(1);
int idx = calls.getAndIncrement();
if (idx == 0) {
firstCallback.set(cb);
cb.sent();
firstSent.countDown();
if (!allowFirstSendReturn.await(10, TimeUnit.SECONDS)) {
throw new AssertionError("first send did not get release signal");
}
} else if (idx == 1) {
cb.sent();
cb.acknowledged();
secondSent.countDown();
}
return mock(MessageItem.class);
})
.when(transport)
.sendAsync(any(Message.class), any(AsyncMessageCallback.class), eq(ctr));

BulkTransmitter bt = new BulkTransmitter(prb, peer, UID, true, ctr, false, null);

CompletableFuture<Boolean> result =
CompletableFuture.supplyAsync(
() -> {
try {
return bt.send();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

// Send the first block and hold sendAsync return to control sender progress.
prb.received(0, pattern, 0, BLOCK_SIZE);
assertTrue(firstSent.await(5, TimeUnit.SECONDS), "first packet sent");

// Receive block 1 on another thread; this sets hasWholeFile=true but blocks before notifying
// transmitter.blockReceived(), creating the historical race window.
Thread receiveSecond =
Thread.ofPlatform()
.name("bt-race-receive-second")
.start(() -> prb.received(1, pattern, 0, BLOCK_SIZE));
assertTrue(
rab.awaitWriteStarted(5, TimeUnit.SECONDS), "second block write reached blocked section");

// Let the first send return and give the sender a chance to evaluate no-wait completion.
allowFirstSendReturn.countDown();
assertThrows(
TimeoutException.class,
() -> result.get(300, TimeUnit.MILLISECONDS),
"sender must not complete before it has observed block 1");

// Now release block 1 write so PartiallyReceivedBulk can notify transmitter.blockReceived().
rab.releaseBlockedWrite();
receiveSecond.join(5_000);

assertTrue(secondSent.await(10, TimeUnit.SECONDS), "second packet sent after notification");
firstCallback.get().acknowledged();
assertTrue(result.get(10, TimeUnit.SECONDS), "send() should succeed");
verify(ctr, times(2)).sentPayload(BLOCK_SIZE);
}

@Test
void send_whenQueueEmpty_thenNewBlockArrives_expectWakeFromAckWait() throws Exception {
// Prepare a PRB with one block but initially no data available.
Expand Down
Loading