From 4ed7050b8481e07d60a780506c4878e9d6623d8e Mon Sep 17 00:00:00 2001 From: Wyatt Hepler Date: Tue, 26 Apr 2022 15:50:22 -0700 Subject: [PATCH] pw_transfer: Always set chunk type in Java client - Set the chunk type for all packets. - Support passing either proto messages or builders to the TestClient. Change-Id: I53077c968a9b58b7f44bb22a33371da6ab1bfc7d Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/92644 Pigweed-Auto-Submit: Wyatt Hepler Reviewed-by: Alexei Frolov Commit-Queue: Wyatt Hepler --- .../test/dev/pigweed/pw_rpc/TestClient.java | 44 +- .../dev/pigweed/pw_transfer/ReadTransfer.java | 5 +- .../dev/pigweed/pw_transfer/Transfer.java | 6 +- .../pigweed/pw_transfer/WriteTransfer.java | 16 +- .../dev/pigweed/pw_transfer/ManagerTest.java | 448 ++++++++++-------- 5 files changed, 295 insertions(+), 224 deletions(-) diff --git a/pw_rpc/java/test/dev/pigweed/pw_rpc/TestClient.java b/pw_rpc/java/test/dev/pigweed/pw_rpc/TestClient.java index 6688e24228..bd828e94b0 100644 --- a/pw_rpc/java/test/dev/pigweed/pw_rpc/TestClient.java +++ b/pw_rpc/java/test/dev/pigweed/pw_rpc/TestClient.java @@ -14,11 +14,10 @@ package dev.pigweed.pw_rpc; -import static java.util.Arrays.stream; - import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; +import com.google.protobuf.MessageLiteOrBuilder; import dev.pigweed.pw_rpc.internal.Packet.PacketType; import dev.pigweed.pw_rpc.internal.Packet.RpcPacket; import java.util.ArrayList; @@ -86,19 +85,13 @@ public List lastClientStreams(Class payloadType) { } /** Simulates receiving SERVER_STREAM packets from the server. */ - public void receiveServerStream(String service, String method, MessageLite... payloads) { + public void receiveServerStream(String service, String method, MessageLiteOrBuilder... payloads) { RpcPacket base = startPacket(service, method, PacketType.SERVER_STREAM).build(); - for (MessageLite payload : payloads) { - processPacket(RpcPacket.newBuilder(base).setPayload(payload.toByteString())); + for (MessageLiteOrBuilder payload : payloads) { + processPacket(RpcPacket.newBuilder(base).setPayload(getMessage(payload).toByteString())); } } - public void receiveServerStream(String service, String method, MessageLite.Builder... builders) { - receiveServerStream(service, - method, - stream(builders).map(MessageLite.Builder::build).toArray(MessageLite[] ::new)); - } - /** * Enqueues a SERVER_STREAM packet so that the client receives it after a packet is sent. * @@ -106,7 +99,7 @@ public void receiveServerStream(String service, String method, MessageLite.Build * these stream packets. The minimum value (and the default) is 1. */ public void enqueueServerStream( - String service, String method, int afterPackets, MessageLite... payloads) { + String service, String method, int afterPackets, MessageLiteOrBuilder... payloads) { if (afterPackets < 1) { throw new IllegalArgumentException("afterPackets must be at least 1"); } @@ -117,23 +110,12 @@ public void enqueueServerStream( receiveEnqueuedPacketsAfter = afterPackets; RpcPacket base = startPacket(service, method, PacketType.SERVER_STREAM).build(); - for (MessageLite payload : payloads) { - enqueuedPackets.add(RpcPacket.newBuilder(base).setPayload(payload.toByteString()).build()); + for (MessageLiteOrBuilder payload : payloads) { + enqueuedPackets.add( + RpcPacket.newBuilder(base).setPayload(getMessage(payload).toByteString()).build()); } } - public void enqueueServerStream(String service, String method, MessageLite... payloads) { - enqueueServerStream(service, method, 1, payloads); - } - - public void enqueueServerStream( - String service, String method, int afterPackets, MessageLite.Builder... builders) { - enqueueServerStream(service, - method, - afterPackets, - stream(builders).map(MessageLite.Builder::build).toArray(MessageLite[] ::new)); - } - /** Simulates receiving a SERVER_ERROR packet from the server. */ public void receiveServerError(String service, String method, Status error) { processPacket(startPacket(service, method, PacketType.SERVER_ERROR).setStatus(error.code())); @@ -192,4 +174,14 @@ private T parseRequestPayload(Class payloadType, RpcP throw new AssertionError("Decoding sent packet payload failed", e); } } + + private MessageLite getMessage(MessageLiteOrBuilder messageOrBuilder) { + if (messageOrBuilder instanceof MessageLite.Builder) { + return ((MessageLite.Builder) messageOrBuilder).build(); + } + if (messageOrBuilder instanceof MessageLite) { + return (MessageLite) messageOrBuilder; + } + throw new AssertionError("Unexpected MessageLiteOrBuilder class"); + } } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java index 75b2c03d23..208dbfb3fa 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java @@ -153,12 +153,11 @@ private synchronized Chunk.Builder prepareTransferParameters(boolean extend) { Chunk.Type type = extend ? Chunk.Type.PARAMETERS_CONTINUE : Chunk.Type.PARAMETERS_RETRANSMIT; - Chunk.Builder chunk = newChunk() + Chunk.Builder chunk = newChunk(type) .setPendingBytes(pendingBytes) .setMaxChunkSizeBytes(parameters.maxChunkSizeBytes()) .setOffset(offset) - .setWindowEndOffset(windowEndOffset) - .setType(type); + .setWindowEndOffset(windowEndOffset); if (parameters.chunkDelayMicroseconds() > 0) { chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds()); } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java index 64e8555713..40b9a1edc0 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java @@ -248,9 +248,9 @@ final synchronized boolean handleCancellation() { return false; } - final Chunk.Builder newChunk() { + final Chunk.Builder newChunk(Chunk.Type type) { // TODO(frolv): Properly set the session ID after it is configured by the server. - return Chunk.newBuilder().setSessionId(getId()); + return Chunk.newBuilder().setSessionId(getId()).setType(type); } /** Sends a chunk. Returns true if sent, false if sending failed and the transfer was aborted. */ @@ -309,7 +309,7 @@ final synchronized void sendFinalChunk(Status status) { // Only call finish() if the sendChunk was successful. If it wasn't, the exception would have // already terminated the transfer. - if (sendChunk(newChunk().setStatus(status.code()))) { + if (sendChunk(newChunk(Chunk.Type.TRANSFER_COMPLETION).setStatus(status.code()))) { cleanUp(status); } } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java index d01a8b12e2..c4901c81a4 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java @@ -73,7 +73,10 @@ protected WriteTransfer(int id, @Override synchronized Chunk getInitialChunk() { - return newChunk().setResourceId(getId()).setRemainingBytes(data.length).build(); + return newChunk(Chunk.Type.TRANSFER_START) + .setResourceId(getId()) + .setRemainingBytes(data.length) + .build(); } @Override @@ -191,7 +194,7 @@ private synchronized boolean doHandleDataChunk(Chunk chunk) { chunkData = chunkData.substring(0, newChunkSize); - chunkToSend = buildChunk(chunkData); + chunkToSend = buildDataChunk(chunkData); // If there's a timeout, resending this will trigger a transfer parameters update. lastChunk = chunkToSend; @@ -236,7 +239,9 @@ void setFutureResult() { private static boolean isRetransmit(Chunk chunk) { // Retransmit is the default behavior for older versions of the transfer protocol, which don't // have a type field. - return !chunk.hasType() || chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT); + return !chunk.hasType() + || (chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT) + || chunk.getType().equals(Chunk.Type.TRANSFER_START)); } private static int getWindowEndOffset(Chunk chunk, int dataLength) { @@ -248,8 +253,9 @@ private static int getWindowEndOffset(Chunk chunk, int dataLength) { return min(chunk.getWindowEndOffset(), dataLength); } - private Chunk buildChunk(ByteString chunkData) { - Chunk.Builder chunk = newChunk().setOffset(sentOffset).setData(chunkData); + private Chunk buildDataChunk(ByteString chunkData) { + Chunk.Builder chunk = + newChunk(Chunk.Type.TRANSFER_DATA).setOffset(sentOffset).setData(chunkData); // If this is the last data chunk, setRemainingBytes to 0. if (sentOffset + chunkData.size() == data.length) { diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/ManagerTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/ManagerTest.java index 53839442c3..3fef9fdc4f 100644 --- a/pw_transfer/java/test/dev/pigweed/pw_transfer/ManagerTest.java +++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/ManagerTest.java @@ -74,7 +74,10 @@ public void read_singleChunk_successful() throws Exception { ListenableFuture future = manager.read(1); assertThat(future.isDone()).isFalse(); - receiveReadChunks(newChunk(1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.isDone()).isTrue(); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); @@ -85,24 +88,25 @@ public void read_failedPreconditionError_retries() throws Exception { ListenableFuture future = manager.read(1, TRANSFER_PARAMETERS); assertThat(lastChunks()) - .containsExactly(newChunk(1) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1) .setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes()) .setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes()) .setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes()) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); client.receiveServerError(SERVICE, "Read", Status.FAILED_PRECONDITION); assertThat(lastChunks()) - .containsExactly(newChunk(1) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1) .setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes()) .setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes()) .setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes()) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); - receiveReadChunks(newChunk(1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.isDone()).isTrue(); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); @@ -116,11 +120,10 @@ public void read_failedPreconditionErrorMaxRetriesTimes_aborts() { client.receiveServerError(SERVICE, "Read", Status.FAILED_PRECONDITION); } - Chunk initialChunk = newChunk(1) + Chunk initialChunk = newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 1) .setPendingBytes(TRANSFER_PARAMETERS.maxPendingBytes()) .setWindowEndOffset(TRANSFER_PARAMETERS.maxPendingBytes()) .setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes()) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build(); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -139,16 +142,31 @@ public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { ListenableFuture future = manager.read(1); assertThat(future.isDone()).isFalse(); - receiveReadChunks(newChunk(2).setOffset(0).setStatus(Status.OK.code()), - newChunk(0).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), - newChunk(3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); - receiveWriteChunks(newChunk(1).setOffset(0).setStatus(Status.OK.code()), - newChunk(1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), - newChunk(2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); + receiveReadChunks(finalChunk(2, Status.OK), + newChunk(Chunk.Type.TRANSFER_DATA, 0) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0), + newChunk(Chunk.Type.TRANSFER_DATA, 3) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0)); + receiveWriteChunks(finalChunk(1, Status.OK), + newChunk(Chunk.Type.TRANSFER_DATA, 1) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0), + newChunk(Chunk.Type.TRANSFER_DATA, 2) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0)); assertThat(future.isDone()).isFalse(); - receiveReadChunks(newChunk(1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -158,9 +176,9 @@ public void read_empty() throws Exception { ListenableFuture future = manager.read(2); lastChunks(); // Discard initial chunk (tested elsewhere) - receiveReadChunks(newChunk(2).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 2).setRemainingBytes(0)); - assertThat(lastChunks()).containsExactly(newChunk(2).setStatus(Status.OK.code()).build()); + assertThat(lastChunks()).containsExactly(finalChunk(2, Status.OK)); assertThat(future.get()).isEqualTo(new byte[] {}); } @@ -170,13 +188,12 @@ public void read_sendsTransferParametersFirst() { ListenableFuture future = manager.read(99, TransferParameters.create(3, 2, 1)); assertThat(lastChunks()) - .containsExactly(newChunk(99) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) .setPendingBytes(3) .setWindowEndOffset(3) .setMaxChunkSizeBytes(2) .setMinDelayMicroseconds(1) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); assertThat(future.cancel(true)).isTrue(); } @@ -186,40 +203,43 @@ public void read_severalChunks() throws Exception { ListenableFuture future = manager.read(ID, TRANSFER_PARAMETERS); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); - receiveReadChunks(newChunk(ID).setOffset(0).setData(range(0, 20)).setRemainingBytes(70), - newChunk(ID).setOffset(20).setData(range(20, 40))); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(0) + .setData(range(0, 20)) + .setRemainingBytes(70), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(20).setData(range(20, 40))); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) .setOffset(40) .setPendingBytes(50) .setMaxChunkSizeBytes(30) .setWindowEndOffset(90) - .setType(Chunk.Type.PARAMETERS_CONTINUE) .build()); - receiveReadChunks(newChunk(ID).setOffset(40).setData(range(40, 70))); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(40).setData(range(40, 70))); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) .setOffset(70) .setPendingBytes(50) .setMaxChunkSizeBytes(30) .setWindowEndOffset(120) - .setType(Chunk.Type.PARAMETERS_CONTINUE) .build()); - receiveReadChunks(newChunk(ID).setOffset(70).setData(range(70, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(70) + .setData(range(70, 100)) + .setRemainingBytes(0)); - assertThat(lastChunks()).containsExactly(newChunk(ID).setStatus(Status.OK.code()).build()); + assertThat(lastChunks()).containsExactly(finalChunk(ID, Status.OK)); assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); } @@ -228,13 +248,22 @@ public void read_severalChunks() throws Exception { public void read_progressCallbackIsCalled() { ListenableFuture future = manager.read(ID, TRANSFER_PARAMETERS, progressCallback); - receiveReadChunks(newChunk(ID).setOffset(0).setData(range(0, 30)), - newChunk(ID).setOffset(30).setData(range(30, 50)), - newChunk(ID).setOffset(50).setData(range(50, 60)).setRemainingBytes(5), - newChunk(ID).setOffset(60).setData(range(60, 70)), - newChunk(ID).setOffset(70).setData(range(70, 80)).setRemainingBytes(20), - newChunk(ID).setOffset(90).setData(range(90, 100)), - newChunk(ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)), + newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(50) + .setData(range(50, 60)) + .setRemainingBytes(5), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(60).setData(range(60, 70)), + newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(70) + .setData(range(70, 80)) + .setRemainingBytes(20), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(90).setData(range(90, 100)), + newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -252,60 +281,61 @@ public void read_rewindWhenPacketsSkipped() throws Exception { ListenableFuture future = manager.read(ID, TRANSFER_PARAMETERS); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); - receiveReadChunks(newChunk(ID).setOffset(50).setData(range(30, 50))); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(30, 50))); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); - receiveReadChunks(newChunk(ID).setOffset(0).setData(range(0, 30)), - newChunk(ID).setOffset(30).setData(range(30, 50))); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50))); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) .setOffset(30) .setPendingBytes(50) .setWindowEndOffset(80) .setMaxChunkSizeBytes(30) - .setType(Chunk.Type.PARAMETERS_CONTINUE) .build()); - receiveReadChunks(newChunk(ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setOffset(50) .setPendingBytes(50) .setWindowEndOffset(100) .setMaxChunkSizeBytes(30) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build()); - receiveReadChunks(newChunk(ID).setOffset(50).setData(range(50, 80)), - newChunk(ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 80)), + newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) .setOffset(80) .setPendingBytes(50) .setWindowEndOffset(130) .setMaxChunkSizeBytes(30) - .setType(Chunk.Type.PARAMETERS_CONTINUE) .build(), - newChunk(ID).setStatus(Status.OK.code()).build()); + finalChunk(ID, Status.OK)); assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); } @@ -315,7 +345,10 @@ public void read_multipleWithSameId_sequentially_successful() throws Exception { for (int i = 0; i < 3; ++i) { ListenableFuture future = manager.read(1); - receiveReadChunks(newChunk(1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -349,12 +382,12 @@ public void read_sendErrorOnFirstPacket_failsImmediately() { public void read_sendErrorOnLaterPacket_aborts() { ListenableFuture future = manager.read(ID, TRANSFER_PARAMETERS); - receiveReadChunks(newChunk(ID).setOffset(0).setData(range(0, 20))); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 20))); ChannelOutputException exception = new ChannelOutputException("blah"); client.setChannelOutputException(exception); - receiveReadChunks(newChunk(ID).setOffset(20).setData(range(20, 50))); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(20).setData(range(20, 50))); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -367,15 +400,15 @@ public void read_cancelFuture_abortsTransfer() { assertThat(future.cancel(true)).isTrue(); - receiveReadChunks(newChunk(ID).setOffset(30).setData(range(30, 50))); - assertThat(lastChunks()).contains(newChunk(ID).setStatus(Status.CANCELLED.code()).build()); + receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50))); + assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED)); } @Test public void read_protocolError_aborts() { ListenableFuture future = manager.read(ID); - receiveReadChunks(newChunk(ID).setStatus(Status.ALREADY_EXISTS.code())); + receiveReadChunks(finalChunk(ID, Status.ALREADY_EXISTS)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -404,28 +437,25 @@ public void read_timeout() { // read should have retried sending the transfer parameters 2 times, for a total of 3 assertThat(lastChunks()) - .containsExactly(newChunk(ID) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build(), - newChunk(ID) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build(), - newChunk(ID) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT) .build(), - newChunk(ID).setStatus(Status.DEADLINE_EXCEEDED.code()).build()); + finalChunk(ID, Status.DEADLINE_EXCEEDED)); } @Test @@ -433,8 +463,11 @@ public void write_singleChunk() throws Exception { ListenableFuture future = manager.write(2, TEST_DATA_SHORT.toByteArray()); assertThat(future.isDone()).isFalse(); - receiveWriteChunks(newChunk(2).setOffset(0).setPendingBytes(1024).setMaxChunkSizeBytes(128), - newChunk(2).setStatus(Status.OK.code())); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + .setOffset(0) + .setPendingBytes(1024) + .setMaxChunkSizeBytes(128), + finalChunk(2, Status.OK)); assertThat(future.isDone()).isTrue(); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. @@ -446,8 +479,11 @@ public void write_platformTransferDisabled_aborted() { assertThat(future.isDone()).isFalse(); setPlatformTransferDisabled(); - receiveWriteChunks(newChunk(2).setOffset(0).setPendingBytes(1024).setMaxChunkSizeBytes(128), - newChunk(2).setStatus(Status.OK.code())); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + .setOffset(0) + .setPendingBytes(1024) + .setMaxChunkSizeBytes(128), + finalChunk(2, Status.OK)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -458,18 +494,17 @@ public void write_platformTransferDisabled_aborted() { public void write_failedPreconditionError_retries() throws Exception { ListenableFuture future = manager.write(2, TEST_DATA_SHORT.toByteArray()); - assertThat(lastChunks()) - .containsExactly( - newChunk(2).setResourceId(2).setRemainingBytes(TEST_DATA_SHORT.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, TEST_DATA_SHORT.size())); client.receiveServerError(SERVICE, "Write", Status.FAILED_PRECONDITION); - assertThat(lastChunks()) - .containsExactly( - newChunk(2).setResourceId(2).setRemainingBytes(TEST_DATA_SHORT.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, TEST_DATA_SHORT.size())); - receiveWriteChunks(newChunk(2).setOffset(0).setPendingBytes(1024).setMaxChunkSizeBytes(128), - newChunk(2).setStatus(Status.OK.code())); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) + .setOffset(0) + .setPendingBytes(1024) + .setMaxChunkSizeBytes(128), + finalChunk(2, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -482,8 +517,7 @@ public void write_failedPreconditionErrorMaxRetriesTimes_aborts() { client.receiveServerError(SERVICE, "Write", Status.FAILED_PRECONDITION); } - Chunk initialChunk = - newChunk(2).setResourceId(2).setRemainingBytes(TEST_DATA_SHORT.size()).build(); + Chunk initialChunk = initialWriteChunk(2, 2, TEST_DATA_SHORT.size()); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -501,10 +535,9 @@ public void write_empty() throws Exception { ListenableFuture future = manager.write(2, new byte[] {}); assertThat(future.isDone()).isFalse(); - receiveWriteChunks(newChunk(2).setStatus(Status.OK.code())); + receiveWriteChunks(finalChunk(2, Status.OK)); - assertThat(lastChunks()) - .containsExactly(newChunk(2).setResourceId(2).setRemainingBytes(0).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(2, 2, 0)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -513,35 +546,42 @@ public void write_empty() throws Exception { public void write_severalChunks() throws Exception { ListenableFuture future = manager.write(ID, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(ID) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setOffset(0) .setPendingBytes(50) .setMaxChunkSizeBytes(30) .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(ID).setOffset(30).setData(range(30, 50)).build()); + .containsExactly( + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)).build()); - receiveWriteChunks(newChunk(ID).setOffset(50).setPendingBytes(40).setMaxChunkSizeBytes(25)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(50) + .setPendingBytes(40) + .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly(newChunk(ID).setOffset(50).setData(range(50, 75)).build(), - newChunk(ID).setOffset(75).setData(range(75, 90)).build()); + .containsExactly( + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 75)).build(), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(75).setData(range(75, 90)).build()); - receiveWriteChunks(newChunk(ID).setOffset(90).setPendingBytes(50)); + receiveWriteChunks( + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(90).setPendingBytes(50)); assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setOffset(90).setData(range(90, 100)).setRemainingBytes(0).build()); + .containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(90) + .setData(range(90, 100)) + .setRemainingBytes(0) + .build()); assertThat(future.isDone()).isFalse(); - receiveWriteChunks(newChunk(ID).setStatus(Status.OK.code())); + receiveWriteChunks(finalChunk(ID, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -551,19 +591,25 @@ public void write_adjustChunkSize() throws Exception { ListenableFuture future = // Always request 30-byte chunks manager.write(ID, TEST_DATA_100B.toByteArray(), progress -> {}, (chunk, maxSize) -> 30); - assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(ID).setOffset(0).setPendingBytes(1024).setMaxChunkSizeBytes(100)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(1024) + .setMaxChunkSizeBytes(100)); assertThat(lastChunks()) - .containsExactly(newChunk(ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(ID).setOffset(30).setData(range(30, 60)).build(), - newChunk(ID).setOffset(60).setData(range(60, 90)).build(), - newChunk(ID).setOffset(90).setData(range(90, 100)).setRemainingBytes(0).build()); + .containsExactly( + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 60)).build(), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(60).setData(range(60, 90)).build(), + newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(90) + .setData(range(90, 100)) + .setRemainingBytes(0) + .build()); - receiveWriteChunks(newChunk(ID).setStatus(Status.OK.code())); + receiveWriteChunks(finalChunk(ID, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -573,11 +619,12 @@ public void write_adjustChunkSize_zeroLengthAdjustment_abortsTransfer() { ListenableFuture future = // Always request 0-byte chunks, which is invalid. manager.write(ID, TEST_DATA_100B.toByteArray(), progress -> {}, (chunk, maxSize) -> 0); - assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(ID).setOffset(0).setPendingBytes(1024).setMaxChunkSizeBytes(100)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(1024) + .setMaxChunkSizeBytes(100)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -589,11 +636,12 @@ public void write_adjustChunkSize_negativeAdjustment_abortsTransfer() { ListenableFuture future = // Always request negative chunks, which is invalid. manager.write(ID, TEST_DATA_100B.toByteArray(), progress -> {}, (chunk, maxSize) -> - 1); - assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(ID).setOffset(0).setPendingBytes(1024).setMaxChunkSizeBytes(100)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(1024) + .setMaxChunkSizeBytes(100)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -604,41 +652,45 @@ public void write_adjustChunkSize_negativeAdjustment_abortsTransfer() { public void write_parametersContinue() throws Exception { ListenableFuture future = manager.write(ID, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(ID) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setOffset(0) .setPendingBytes(50) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) - .setMinDelayMicroseconds(1) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT)); + .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(ID).setOffset(30).setData(range(30, 50)).build()); + .containsExactly( + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)).build()); - receiveWriteChunks( - newChunk(ID).setOffset(30).setPendingBytes(50).setWindowEndOffset(80).setType( - Chunk.Type.PARAMETERS_CONTINUE)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .setOffset(30) + .setPendingBytes(50) + .setWindowEndOffset(80)); // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. assertThat(lastChunks()) - .containsExactly(newChunk(ID).setOffset(50).setData(range(50, 80)).build()); + .containsExactly( + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 80)).build()); - receiveWriteChunks( - newChunk(ID).setOffset(80).setPendingBytes(50).setWindowEndOffset(130).setType( - Chunk.Type.PARAMETERS_CONTINUE)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .setOffset(80) + .setPendingBytes(50) + .setWindowEndOffset(130)); assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0).build()); + .containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0) + .build()); assertThat(future.isDone()).isFalse(); - receiveWriteChunks(newChunk(ID).setStatus(Status.OK.code())); + receiveWriteChunks(finalChunk(ID, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -647,33 +699,36 @@ public void write_parametersContinue() throws Exception { public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { ListenableFuture future = manager.write(ID, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build()); + assertThat(lastChunks()).containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(ID) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setOffset(0) .setPendingBytes(90) .setWindowEndOffset(90) .setMaxChunkSizeBytes(90) - .setMinDelayMicroseconds(1) - .setType(Chunk.Type.PARAMETERS_RETRANSMIT)); + .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(ID).setOffset(0).setData(range(0, 90)).build()); + .containsExactly( + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 90)).build()); receiveWriteChunks( // This stale packet with a window end before the offset should be ignored. - newChunk(ID).setOffset(25).setPendingBytes(25).setWindowEndOffset(50).setType( - Chunk.Type.PARAMETERS_CONTINUE), + newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) + .setOffset(25) + .setPendingBytes(25) + .setWindowEndOffset(50), // Start from an arbitrary offset before the current, but extend the window to the end. - newChunk(ID).setOffset(80).setWindowEndOffset(100).setType(Chunk.Type.PARAMETERS_CONTINUE)); + newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID).setOffset(80).setWindowEndOffset(100)); assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setOffset(90).setData(range(90, 100)).setRemainingBytes(0).build()); + .containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(90) + .setData(range(90, 100)) + .setRemainingBytes(0) + .build()); - receiveWriteChunks(newChunk(ID).setStatus(Status.OK.code())); + receiveWriteChunks(finalChunk(ID, Status.OK)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -683,9 +738,12 @@ public void write_progressCallbackIsCalled() { ListenableFuture future = manager.write(ID, TEST_DATA_100B.toByteArray(), progressCallback); - receiveWriteChunks(newChunk(ID).setOffset(0).setPendingBytes(90).setMaxChunkSizeBytes(30), - newChunk(ID).setOffset(50).setPendingBytes(50), - newChunk(ID).setStatus(Status.OK.code())); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(90) + .setMaxChunkSizeBytes(30), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(50).setPendingBytes(50), + finalChunk(ID, Status.OK)); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -702,12 +760,14 @@ public void write_progressCallbackIsCalled() { public void write_asksForFinalOffset_sendsFinalPacket() { ListenableFuture future = manager.write(ID, TEST_DATA_100B.toByteArray()); - receiveWriteChunks(newChunk(ID).setOffset(100).setPendingBytes(40).setMaxChunkSizeBytes(25)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(100) + .setPendingBytes(40) + .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build(), - newChunk(ID).setOffset(100).setRemainingBytes(0).build()); + .containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()), + newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(100).setRemainingBytes(0).build()); assertThat(future.isDone()).isFalse(); } @@ -717,7 +777,8 @@ public void write_multipleWithSameId_sequentially_successful() throws Exception ListenableFuture future = manager.write(ID, TEST_DATA_SHORT.toByteArray()); receiveWriteChunks( - newChunk(ID).setOffset(0).setPendingBytes(50), newChunk(ID).setStatus(Status.OK.code())); + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0).setPendingBytes(50), + finalChunk(ID, Status.OK)); future.get(); } @@ -751,7 +812,7 @@ public void write_sendErrorOnFirstPacket_failsImmediately() { public void write_serviceRequestsNoData_aborts() throws Exception { ListenableFuture future = manager.write(ID, TEST_DATA_SHORT.toByteArray()); - receiveWriteChunks(newChunk(ID).setOffset(0)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(0)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); @@ -761,12 +822,14 @@ public void write_serviceRequestsNoData_aborts() throws Exception { public void write_invalidOffset_aborts() { ListenableFuture future = manager.write(ID, TEST_DATA_100B.toByteArray()); - receiveWriteChunks(newChunk(ID).setOffset(101).setPendingBytes(40).setMaxChunkSizeBytes(25)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(101) + .setPendingBytes(40) + .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) .containsExactly( - newChunk(ID).setResourceId(ID).setRemainingBytes(TEST_DATA_100B.size()).build(), - newChunk(ID).setStatus(Status.OUT_OF_RANGE.code()).build()); + initialWriteChunk(ID, ID, TEST_DATA_100B.size()), finalChunk(ID, Status.OUT_OF_RANGE)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE); @@ -779,7 +842,10 @@ public void write_sendErrorOnLaterPacket_aborts() { ChannelOutputException exception = new ChannelOutputException("blah"); client.setChannelOutputException(exception); - receiveWriteChunks(newChunk(ID).setOffset(0).setPendingBytes(50).setMaxChunkSizeBytes(30)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(50) + .setMaxChunkSizeBytes(30)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -792,15 +858,18 @@ public void write_cancelFuture_abortsTransfer() { assertThat(future.cancel(true)).isTrue(); - receiveWriteChunks(newChunk(ID).setOffset(0).setPendingBytes(50).setMaxChunkSizeBytes(50)); - assertThat(lastChunks()).contains(newChunk(ID).setStatus(Status.CANCELLED.code()).build()); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(50) + .setMaxChunkSizeBytes(50)); + assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED)); } @Test public void write_protocolError_aborts() { ListenableFuture future = manager.write(ID, TEST_DATA_SHORT.toByteArray()); - receiveWriteChunks(newChunk(ID).setStatus(Status.NOT_FOUND.code())); + receiveWriteChunks(finalChunk(ID, Status.NOT_FOUND)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -829,21 +898,10 @@ public void write_timeoutAfterInitialChunk() { // Client should have resent the last chunk (the initial chunk in this case) for each timeout. assertThat(lastChunks()) - .containsExactly(newChunk(ID) - .setResourceId(ID) - .setRemainingBytes(TEST_DATA_SHORT.size()) - .build(), // initial - newChunk(ID) - .setResourceId(ID) - .setRemainingBytes(TEST_DATA_SHORT.size()) - .build(), // retry - // 1 - newChunk(ID) - .setResourceId(ID) - .setRemainingBytes(TEST_DATA_SHORT.size()) - .build(), // retry - // 2 - newChunk(ID).setStatus(Status.DEADLINE_EXCEEDED.code()).build()); // abort + .containsExactly(initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // initial + initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // retry 1 + initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // retry 2 + finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort } @Test @@ -851,22 +909,27 @@ public void write_timeoutAfterIntermediateChunk() { manager = createManager(1, 1); // Create a manager with a very short timeout. // Wait for two outgoing packets (Write RPC request and first chunk), then send the parameters. - enqueueWriteChunks(2, newChunk(ID).setOffset(0).setPendingBytes(90).setMaxChunkSizeBytes(30)); + enqueueWriteChunks(2, + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) + .setOffset(0) + .setPendingBytes(90) + .setMaxChunkSizeBytes(30)); ListenableFuture future = manager.write(ID, TEST_DATA_SHORT.toByteArray()); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); - Chunk data = newChunk(ID).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0).build(); + Chunk data = newChunk(Chunk.Type.TRANSFER_DATA, ID) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0) + .build(); assertThat(lastChunks()) - .containsExactly(newChunk(ID) - .setResourceId(ID) - .setRemainingBytes(TEST_DATA_SHORT.size()) - .build(), // initial + .containsExactly(initialWriteChunk(ID, ID, TEST_DATA_SHORT.size()), // initial data, // data chunk data, // retry 1 data, // retry 2 - newChunk(ID).setStatus(Status.DEADLINE_EXCEEDED.code()).build()); // abort + finalChunk(ID, Status.DEADLINE_EXCEEDED)); // abort } private static ByteString range(int startInclusive, int endExclusive) { @@ -880,15 +943,26 @@ private static ByteString range(int startInclusive, int endExclusive) { return ByteString.copyFrom(bytes); } - private static Chunk.Builder newChunk(int resourceId) { - return Chunk.newBuilder().setSessionId(resourceId); + private static Chunk.Builder newChunk(Chunk.Type type, int resourceId) { + return Chunk.newBuilder().setType(type).setSessionId(resourceId); + } + + private static Chunk initialWriteChunk(int sessionId, int resourceId, int size) { + return newChunk(Chunk.Type.TRANSFER_START, sessionId) + .setResourceId(resourceId) + .setRemainingBytes(size) + .build(); + } + + private static Chunk finalChunk(int sessionId, Status status) { + return newChunk(Chunk.Type.TRANSFER_COMPLETION, sessionId).setStatus(status.code()).build(); } - private void receiveReadChunks(Chunk.Builder... chunks) { + private void receiveReadChunks(ChunkOrBuilder... chunks) { client.receiveServerStream(SERVICE, "Read", chunks); } - private void receiveWriteChunks(Chunk.Builder... chunks) { + private void receiveWriteChunks(ChunkOrBuilder... chunks) { client.receiveServerStream(SERVICE, "Write", chunks); }