From e61718e9f4e5c58c0855679a367de8b93eb116cb Mon Sep 17 00:00:00 2001 From: Alexei Frolov Date: Wed, 25 May 2022 10:57:36 -0700 Subject: [PATCH] pw_transfer: Add handshake chunk types This adds new chunk type values for the opening transfer handshake: START_ACK and START_ACK_CONFIRMATION. Additionally, the redundant TRANSFER_ prefix is removed from chunk types which previously had it. Change-Id: I3da384b8d09cca99c05a6b6e916b9e55e6737e6b Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/96001 Commit-Queue: Alexei Frolov Reviewed-by: Wyatt Hepler --- .../dev/pigweed/pw_transfer/Transfer.java | 2 +- .../pigweed/pw_transfer/WriteTransfer.java | 10 +- .../pw_transfer/TransferClientTest.java | 144 +++++++----------- pw_transfer/py/pw_transfer/transfer.py | 22 ++- pw_transfer/py/tests/transfer_test.py | 32 ++-- pw_transfer/transfer.proto | 18 ++- pw_transfer/ts/transfer.ts | 10 +- pw_transfer/ts/transfer_test.ts | 6 +- 8 files changed, 102 insertions(+), 142 deletions(-) diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java index 7f1b702ece..6e75d19459 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java @@ -309,7 +309,7 @@ final synchronized void sendFinalChunk(Status status) { // Only call finish() if the sendChunk was successful. If it wasn't, the exception would have // already terminated the transfer. - if (sendChunk(newChunk(Chunk.Type.TRANSFER_COMPLETION).setStatus(status.code()))) { + if (sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status.code()))) { cleanUp(status); } } diff --git a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java index 40feed24b3..4daef2d382 100644 --- a/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java +++ b/pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java @@ -68,10 +68,7 @@ protected WriteTransfer(int id, @Override synchronized Chunk getInitialChunk() { - return newChunk(Chunk.Type.TRANSFER_START) - .setResourceId(getId()) - .setRemainingBytes(data.length) - .build(); + return newChunk(Chunk.Type.START).setResourceId(getId()).setRemainingBytes(data.length).build(); } @Override @@ -221,7 +218,7 @@ private static boolean isRetransmit(Chunk chunk) { // have a type field. return !chunk.hasType() || (chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT) - || chunk.getType().equals(Chunk.Type.TRANSFER_START)); + || chunk.getType().equals(Chunk.Type.START)); } private static int getWindowEndOffset(Chunk chunk, int dataLength) { @@ -234,8 +231,7 @@ private static int getWindowEndOffset(Chunk chunk, int dataLength) { } private Chunk buildDataChunk(ByteString chunkData) { - Chunk.Builder chunk = - newChunk(Chunk.Type.TRANSFER_DATA).setOffset(sentOffset).setData(chunkData); + Chunk.Builder chunk = newChunk(Chunk.Type.DATA).setOffset(sentOffset).setData(chunkData); // If this is the last data chunk, setRemainingBytes to 0. if (sentOffset + chunkData.size() == data.length) { diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java index a54d218fe1..bca7c0b5f3 100644 --- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java +++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java @@ -73,10 +73,8 @@ public void read_singleChunk_successful() throws Exception { ListenableFuture future = manager.read(1); assertThat(future.isDone()).isFalse(); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) - .setOffset(0) - .setData(TEST_DATA_SHORT) - .setRemainingBytes(0)); + receiveReadChunks( + newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); assertThat(future.isDone()).isTrue(); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); @@ -102,10 +100,8 @@ public void read_failedPreconditionError_retries() throws Exception { .setMaxChunkSizeBytes(TRANSFER_PARAMETERS.maxChunkSizeBytes()) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) - .setOffset(0) - .setData(TEST_DATA_SHORT) - .setRemainingBytes(0)); + receiveReadChunks( + newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); assertThat(future.isDone()).isTrue(); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); @@ -142,30 +138,16 @@ public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { assertThat(future.isDone()).isFalse(); receiveReadChunks(finalChunk(2, Status.OK), - newChunk(Chunk.Type.TRANSFER_DATA, 0) - .setOffset(0) - .setData(TEST_DATA_100B) - .setRemainingBytes(0), - newChunk(Chunk.Type.TRANSFER_DATA, 3) - .setOffset(0) - .setData(TEST_DATA_100B) - .setRemainingBytes(0)); + newChunk(Chunk.Type.DATA, 0).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), + newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); receiveWriteChunks(finalChunk(1, Status.OK), - newChunk(Chunk.Type.TRANSFER_DATA, 1) - .setOffset(0) - .setData(TEST_DATA_100B) - .setRemainingBytes(0), - newChunk(Chunk.Type.TRANSFER_DATA, 2) - .setOffset(0) - .setData(TEST_DATA_100B) - .setRemainingBytes(0)); + newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), + newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); assertThat(future.isDone()).isFalse(); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) - .setOffset(0) - .setData(TEST_DATA_SHORT) - .setRemainingBytes(0)); + receiveReadChunks( + newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -175,7 +157,7 @@ public void read_empty() throws Exception { ListenableFuture future = manager.read(2); lastChunks(); // Discard initial chunk (tested elsewhere) - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 2).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, 2).setRemainingBytes(0)); assertThat(lastChunks()).containsExactly(finalChunk(2, Status.OK)); @@ -209,11 +191,9 @@ public void read_severalChunks() throws Exception { .setOffset(0) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(0) - .setData(range(0, 20)) - .setRemainingBytes(70), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(20).setData(range(20, 40))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20)).setRemainingBytes(70), + newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 40))); assertThat(lastChunks()) .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) @@ -223,7 +203,7 @@ public void read_severalChunks() throws Exception { .setWindowEndOffset(90) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(40).setData(range(40, 70))); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(40).setData(range(40, 70))); assertThat(lastChunks()) .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) @@ -233,10 +213,8 @@ public void read_severalChunks() throws Exception { .setWindowEndOffset(120) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(70) - .setData(range(70, 100)) - .setRemainingBytes(0)); + receiveReadChunks( + newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 100)).setRemainingBytes(0)); assertThat(lastChunks()).containsExactly(finalChunk(ID, Status.OK)); @@ -247,22 +225,13 @@ public void read_severalChunks() throws Exception { public void read_progressCallbackIsCalled() { ListenableFuture future = manager.read(ID, TRANSFER_PARAMETERS, progressCallback); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)), - newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(50) - .setData(range(50, 60)) - .setRemainingBytes(5), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(60).setData(range(60, 70)), - newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(70) - .setData(range(70, 80)) - .setRemainingBytes(20), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(90).setData(range(90, 100)), - newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(80) - .setData(range(80, 100)) - .setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)), + newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 60)).setRemainingBytes(5), + newChunk(Chunk.Type.DATA, ID).setOffset(60).setData(range(60, 70)), + newChunk(Chunk.Type.DATA, ID).setOffset(70).setData(range(70, 80)).setRemainingBytes(20), + newChunk(Chunk.Type.DATA, ID).setOffset(90).setData(range(90, 100)), + newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -287,7 +256,7 @@ public void read_rewindWhenPacketsSkipped() throws Exception { .setOffset(0) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(30, 50))); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(30, 50))); assertThat(lastChunks()) .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) @@ -297,8 +266,8 @@ public void read_rewindWhenPacketsSkipped() throws Exception { .setOffset(0) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50))); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50))); assertThat(lastChunks()) .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) @@ -308,10 +277,8 @@ public void read_rewindWhenPacketsSkipped() throws Exception { .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(80) - .setData(range(80, 100)) - .setRemainingBytes(0)); + receiveReadChunks( + newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); assertThat(lastChunks()) .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) @@ -321,11 +288,8 @@ public void read_rewindWhenPacketsSkipped() throws Exception { .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 80)), - newChunk(Chunk.Type.TRANSFER_DATA, ID) - .setOffset(80) - .setData(range(80, 100)) - .setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)), + newChunk(Chunk.Type.DATA, ID).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); assertThat(lastChunks()) .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) @@ -344,10 +308,8 @@ public void read_multipleWithSameId_sequentially_successful() throws Exception { for (int i = 0; i < 3; ++i) { ListenableFuture future = manager.read(1); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, 1) - .setOffset(0) - .setData(TEST_DATA_SHORT) - .setRemainingBytes(0)); + receiveReadChunks( + newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -381,12 +343,12 @@ public void read_sendErrorOnFirstPacket_failsImmediately() { public void read_sendErrorOnLaterPacket_aborts() { ListenableFuture future = manager.read(ID, TRANSFER_PARAMETERS); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 20))); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 20))); ChannelOutputException exception = new ChannelOutputException("blah"); client.setChannelOutputException(exception); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(20).setData(range(20, 50))); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(20).setData(range(20, 50))); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -399,7 +361,7 @@ public void read_cancelFuture_abortsTransfer() { assertThat(future.cancel(true)).isTrue(); - receiveReadChunks(newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50))); + receiveReadChunks(newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50))); assertThat(lastChunks()).contains(finalChunk(ID, Status.CANCELLED)); } @@ -554,9 +516,8 @@ public void write_severalChunks() throws Exception { .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build()); receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID) .setOffset(50) @@ -564,15 +525,14 @@ public void write_severalChunks() throws Exception { .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 75)).build(), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(75).setData(range(75, 90)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 75)).build(), + newChunk(Chunk.Type.DATA, ID).setOffset(75).setData(range(75, 90)).build()); receiveWriteChunks( newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, ID).setOffset(90).setPendingBytes(50)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .containsExactly(newChunk(Chunk.Type.DATA, ID) .setOffset(90) .setData(range(90, 100)) .setRemainingBytes(0) @@ -599,9 +559,8 @@ public void write_parametersContinue() throws Exception { .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 30)).build(), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(30).setData(range(30, 50)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 30)).build(), + newChunk(Chunk.Type.DATA, ID).setOffset(30).setData(range(30, 50)).build()); receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) .setOffset(30) @@ -611,7 +570,7 @@ public void write_parametersContinue() throws Exception { // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. assertThat(lastChunks()) .containsExactly( - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(50).setData(range(50, 80)).build()); + newChunk(Chunk.Type.DATA, ID).setOffset(50).setData(range(50, 80)).build()); receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID) .setOffset(80) @@ -619,7 +578,7 @@ public void write_parametersContinue() throws Exception { .setWindowEndOffset(130)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .containsExactly(newChunk(Chunk.Type.DATA, ID) .setOffset(80) .setData(range(80, 100)) .setRemainingBytes(0) @@ -646,8 +605,7 @@ public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exce .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(0).setData(range(0, 90)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, ID).setOffset(0).setData(range(0, 90)).build()); receiveWriteChunks( // This stale packet with a window end before the offset should be ignored. @@ -659,7 +617,7 @@ public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exce newChunk(Chunk.Type.PARAMETERS_CONTINUE, ID).setOffset(80).setWindowEndOffset(100)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.TRANSFER_DATA, ID) + .containsExactly(newChunk(Chunk.Type.DATA, ID) .setOffset(90) .setData(range(90, 100)) .setRemainingBytes(0) @@ -704,7 +662,7 @@ public void write_asksForFinalOffset_sendsFinalPacket() { assertThat(lastChunks()) .containsExactly(initialWriteChunk(ID, ID, TEST_DATA_100B.size()), - newChunk(Chunk.Type.TRANSFER_DATA, ID).setOffset(100).setRemainingBytes(0).build()); + newChunk(Chunk.Type.DATA, ID).setOffset(100).setRemainingBytes(0).build()); assertThat(future.isDone()).isFalse(); } @@ -856,7 +814,7 @@ public void write_timeoutAfterIntermediateChunk() { ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); - Chunk data = newChunk(Chunk.Type.TRANSFER_DATA, ID) + Chunk data = newChunk(Chunk.Type.DATA, ID) .setOffset(0) .setData(TEST_DATA_SHORT) .setRemainingBytes(0) @@ -885,14 +843,14 @@ private static Chunk.Builder newChunk(Chunk.Type type, int resourceId) { } private static Chunk initialWriteChunk(int sessionId, int resourceId, int size) { - return newChunk(Chunk.Type.TRANSFER_START, sessionId) + return newChunk(Chunk.Type.START, sessionId) .setResourceId(resourceId) .setRemainingBytes(size) .build(); } private static Chunk finalChunk(int sessionId, Status status) { - return newChunk(Chunk.Type.TRANSFER_COMPLETION, sessionId).setStatus(status.code()).build(); + return newChunk(Chunk.Type.COMPLETION, sessionId).setStatus(status.code()).build(); } private void receiveReadChunks(ChunkOrBuilder... chunks) { diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py index 000452259c..22e650e3c0 100644 --- a/pw_transfer/py/pw_transfer/transfer.py +++ b/pw_transfer/py/pw_transfer/transfer.py @@ -201,10 +201,9 @@ def _update_progress(self, bytes_sent: int, bytes_confirmed_received: int, def _send_error(self, error: Status) -> None: """Sends an error chunk to the server and finishes the transfer.""" self._send_chunk( - transfer_pb2.Chunk( - transfer_id=self.id, - status=error.value, - type=transfer_pb2.Chunk.Type.TRANSFER_COMPLETION)) + transfer_pb2.Chunk(transfer_id=self.id, + status=error.value, + type=transfer_pb2.Chunk.Type.COMPLETION)) self.finish(error) @@ -248,7 +247,7 @@ def _initial_chunk(self) -> transfer_pb2.Chunk: # server during an initial handshake. return transfer_pb2.Chunk(transfer_id=self.id, resource_id=self.id, - type=transfer_pb2.Chunk.Type.TRANSFER_START) + type=transfer_pb2.Chunk.Type.START) async def _handle_data_chunk(self, chunk: transfer_pb2.Chunk) -> None: """Processes an incoming chunk from the server. @@ -298,9 +297,9 @@ def _handle_parameters_update(self, chunk: transfer_pb2.Chunk) -> bool: retransmit = True if chunk.HasField('type'): - retransmit = ( - chunk.type == transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT - or chunk.type == transfer_pb2.Chunk.Type.TRANSFER_START) + retransmit = (chunk.type + == transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT + or chunk.type == transfer_pb2.Chunk.Type.START) if chunk.offset > len(self.data): # Bad offset; terminate the transfer. @@ -355,7 +354,7 @@ def _next_chunk(self) -> transfer_pb2.Chunk: """Returns the next Chunk message to send in the data transfer.""" chunk = transfer_pb2.Chunk(transfer_id=self.id, offset=self._offset, - type=transfer_pb2.Chunk.Type.TRANSFER_DATA) + type=transfer_pb2.Chunk.Type.DATA) max_bytes_in_chunk = min(self._max_chunk_size, self._window_end_offset - self._offset) @@ -416,8 +415,7 @@ def data(self) -> bytes: return bytes(self._data) def _initial_chunk(self) -> transfer_pb2.Chunk: - return self._transfer_parameters( - transfer_pb2.Chunk.Type.TRANSFER_START) + return self._transfer_parameters(transfer_pb2.Chunk.Type.START) async def _handle_data_chunk(self, chunk: transfer_pb2.Chunk) -> None: """Processes an incoming chunk from the server. @@ -446,7 +444,7 @@ async def _handle_data_chunk(self, chunk: transfer_pb2.Chunk) -> None: transfer_pb2.Chunk( transfer_id=self.id, status=Status.OK.value, - type=transfer_pb2.Chunk.Type.TRANSFER_COMPLETION)) + type=transfer_pb2.Chunk.Type.COMPLETION)) self.finish(Status.OK) return diff --git a/pw_transfer/py/tests/transfer_test.py b/pw_transfer/py/tests/transfer_test.py index 4c6ea5ae32..63a73ee460 100644 --- a/pw_transfer/py/tests/transfer_test.py +++ b/pw_transfer/py/tests/transfer_test.py @@ -473,18 +473,18 @@ def test_write_transfer_timeout_after_initial_chunk(self) -> None: self.assertEqual( self._sent_chunks, [ - transfer_pb2.Chunk(transfer_id=22, - resource_id=22, - type=transfer_pb2.Chunk.Type.TRANSFER_START - ), # initial chunk transfer_pb2.Chunk( transfer_id=22, resource_id=22, - type=transfer_pb2.Chunk.Type.TRANSFER_START), # retry 1 + type=transfer_pb2.Chunk.Type.START), # initial chunk + transfer_pb2.Chunk( + transfer_id=22, + resource_id=22, + type=transfer_pb2.Chunk.Type.START), # retry 1 transfer_pb2.Chunk( transfer_id=22, resource_id=22, - type=transfer_pb2.Chunk.Type.TRANSFER_START), # retry 2 + type=transfer_pb2.Chunk.Type.START), # retry 2 ]) exception = context.exception @@ -506,23 +506,21 @@ def test_write_transfer_timeout_after_intermediate_chunk(self) -> None: with self.assertRaises(pw_transfer.Error) as context: manager.write(22, b'0123456789') - last_data_chunk = transfer_pb2.Chunk( - transfer_id=22, - data=b'56789', - offset=5, - remaining_bytes=0, - type=transfer_pb2.Chunk.Type.TRANSFER_DATA) + last_data_chunk = transfer_pb2.Chunk(transfer_id=22, + data=b'56789', + offset=5, + remaining_bytes=0, + type=transfer_pb2.Chunk.Type.DATA) self.assertEqual( self._sent_chunks, [ - transfer_pb2.Chunk( - transfer_id=22, - resource_id=22, - type=transfer_pb2.Chunk.Type.TRANSFER_START), + transfer_pb2.Chunk(transfer_id=22, + resource_id=22, + type=transfer_pb2.Chunk.Type.START), transfer_pb2.Chunk(transfer_id=22, data=b'01234', - type=transfer_pb2.Chunk.Type.TRANSFER_DATA), + type=transfer_pb2.Chunk.Type.DATA), last_data_chunk, # last chunk last_data_chunk, # retry 1 last_data_chunk, # retry 2 diff --git a/pw_transfer/transfer.proto b/pw_transfer/transfer.proto index e4bb5a88ff..b5eb3abafb 100644 --- a/pw_transfer/transfer.proto +++ b/pw_transfer/transfer.proto @@ -143,10 +143,10 @@ message Chunk { enum Type { // Chunk containing transfer data. - TRANSFER_DATA = 0; + DATA = 0; // First chunk of a transfer (only sent by the client). - TRANSFER_START = 1; + START = 1; // Transfer parameters indicating that the transmitter should retransmit // from the specified offset. @@ -158,11 +158,21 @@ message Chunk { PARAMETERS_CONTINUE = 3; // Sender of the chunk is terminating the transfer. - TRANSFER_COMPLETION = 4; + COMPLETION = 4; // Acknowledge the completion of a transfer. Currently unused. // TODO(konkers): Implement this behavior. - TRANSFER_COMPLETION_ACK = 5; + COMPLETION_ACK = 5; + + // Acknowledges a transfer start request, assigning a session ID for the + // transfer and optionally negotiating the protocol version. Sent from + // server to client. + START_ACK = 6; + + // Confirmation of a START_ACK's assigned session ID and negotiated + // parameters, sent by the client to the server. Initiates the data transfer + // proper. + START_ACK_CONFIRMATION = 7; }; // The type of this chunk. This field should only be processed when present. diff --git a/pw_transfer/ts/transfer.ts b/pw_transfer/ts/transfer.ts index 8264f0aab0..e71f9bff5f 100644 --- a/pw_transfer/ts/transfer.ts +++ b/pw_transfer/ts/transfer.ts @@ -138,7 +138,7 @@ export abstract class Transfer { const chunk = new Chunk(); chunk.setStatus(error); chunk.setTransferId(this.id); - chunk.setType(Chunk.Type.TRANSFER_COMPLETION); + chunk.setType(Chunk.Type.COMPLETION); this.sendChunk(chunk); this.finish(error); } @@ -253,7 +253,7 @@ export class ReadTransfer extends Transfer { } protected get initialChunk(): Chunk { - return this.transferParameters(Chunk.Type.TRANSFER_START); + return this.transferParameters(Chunk.Type.START); } /** Builds an updated transfer parameters chunk to send the server. */ @@ -305,7 +305,7 @@ export class ReadTransfer extends Transfer { const endChunk = new Chunk(); endChunk.setTransferId(this.id); endChunk.setStatus(Status.OK); - endChunk.setType(Chunk.Type.TRANSFER_COMPLETION); + endChunk.setType(Chunk.Type.COMPLETION); this.sendChunk(endChunk); this.finish(Status.OK); return; @@ -407,7 +407,7 @@ export class WriteTransfer extends Transfer { const chunk = new Chunk(); chunk.setTransferId(this.id); chunk.setResourceId(this.id); - chunk.setType(Chunk.Type.TRANSFER_START); + chunk.setType(Chunk.Type.START); return chunk; } @@ -518,7 +518,7 @@ export class WriteTransfer extends Transfer { const chunk = new Chunk(); chunk.setTransferId(this.id); chunk.setOffset(this.offset); - chunk.setType(Chunk.Type.TRANSFER_DATA); + chunk.setType(Chunk.Type.DATA); const maxBytesInChunk = Math.min( this.maxChunkSize, diff --git a/pw_transfer/ts/transfer_test.ts b/pw_transfer/ts/transfer_test.ts index 584df4271d..a252dd962a 100644 --- a/pw_transfer/ts/transfer_test.ts +++ b/pw_transfer/ts/transfer_test.ts @@ -614,17 +614,17 @@ describe('Transfer client', () => { const expectedChunk1 = new Chunk(); expectedChunk1.setTransferId(22); expectedChunk1.setResourceId(22); - expectedChunk1.setType(Chunk.Type.TRANSFER_START); + expectedChunk1.setType(Chunk.Type.START); const expectedChunk2 = new Chunk(); expectedChunk2.setTransferId(22); expectedChunk2.setData(textEncoder.encode('01234')); - expectedChunk2.setType(Chunk.Type.TRANSFER_DATA); + expectedChunk2.setType(Chunk.Type.DATA); const lastChunk = new Chunk(); lastChunk.setTransferId(22); lastChunk.setData(textEncoder.encode('56789')); lastChunk.setOffset(5); lastChunk.setRemainingBytes(0); - lastChunk.setType(Chunk.Type.TRANSFER_DATA); + lastChunk.setType(Chunk.Type.DATA); const expectedChunks = [ expectedChunk1,