Skip to content

Commit

Permalink
pw_transfer: Always set chunk type in Java client
Browse files Browse the repository at this point in the history
- Set the chunk type for all packets.
- Support passing either proto messages or builders to the TestClient.

Change-Id: I53077c968a9b58b7f44bb22a33371da6ab1bfc7d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/92644
Pigweed-Auto-Submit: Wyatt Hepler <[email protected]>
Reviewed-by: Alexei Frolov <[email protected]>
Commit-Queue: Wyatt Hepler <[email protected]>
  • Loading branch information
255 authored and CQ Bot Account committed May 2, 2022
1 parent 2e14574 commit 4ed7050
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 224 deletions.
44 changes: 18 additions & 26 deletions pw_rpc/java/test/dev/pigweed/pw_rpc/TestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

package dev.pigweed.pw_rpc;

import static java.util.Arrays.stream;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import dev.pigweed.pw_rpc.internal.Packet.PacketType;
import dev.pigweed.pw_rpc.internal.Packet.RpcPacket;
import java.util.ArrayList;
Expand Down Expand Up @@ -86,27 +85,21 @@ public <T extends MessageLite> List<T> lastClientStreams(Class<T> payloadType) {
}

/** Simulates receiving SERVER_STREAM packets from the server. */
public void receiveServerStream(String service, String method, MessageLite... payloads) {
public void receiveServerStream(String service, String method, MessageLiteOrBuilder... payloads) {
RpcPacket base = startPacket(service, method, PacketType.SERVER_STREAM).build();
for (MessageLite payload : payloads) {
processPacket(RpcPacket.newBuilder(base).setPayload(payload.toByteString()));
for (MessageLiteOrBuilder payload : payloads) {
processPacket(RpcPacket.newBuilder(base).setPayload(getMessage(payload).toByteString()));
}
}

public void receiveServerStream(String service, String method, MessageLite.Builder... builders) {
receiveServerStream(service,
method,
stream(builders).map(MessageLite.Builder::build).toArray(MessageLite[] ::new));
}

/**
* Enqueues a SERVER_STREAM packet so that the client receives it after a packet is sent.
*
* @param afterPackets Wait until this many packets have been sent before the client receives
* these stream packets. The minimum value (and the default) is 1.
*/
public void enqueueServerStream(
String service, String method, int afterPackets, MessageLite... payloads) {
String service, String method, int afterPackets, MessageLiteOrBuilder... payloads) {
if (afterPackets < 1) {
throw new IllegalArgumentException("afterPackets must be at least 1");
}
Expand All @@ -117,23 +110,12 @@ public void enqueueServerStream(
receiveEnqueuedPacketsAfter = afterPackets;

RpcPacket base = startPacket(service, method, PacketType.SERVER_STREAM).build();
for (MessageLite payload : payloads) {
enqueuedPackets.add(RpcPacket.newBuilder(base).setPayload(payload.toByteString()).build());
for (MessageLiteOrBuilder payload : payloads) {
enqueuedPackets.add(
RpcPacket.newBuilder(base).setPayload(getMessage(payload).toByteString()).build());
}
}

public void enqueueServerStream(String service, String method, MessageLite... payloads) {
enqueueServerStream(service, method, 1, payloads);
}

public void enqueueServerStream(
String service, String method, int afterPackets, MessageLite.Builder... builders) {
enqueueServerStream(service,
method,
afterPackets,
stream(builders).map(MessageLite.Builder::build).toArray(MessageLite[] ::new));
}

/** Simulates receiving a SERVER_ERROR packet from the server. */
public void receiveServerError(String service, String method, Status error) {
processPacket(startPacket(service, method, PacketType.SERVER_ERROR).setStatus(error.code()));
Expand Down Expand Up @@ -192,4 +174,14 @@ private <T extends MessageLite> T parseRequestPayload(Class<T> payloadType, RpcP
throw new AssertionError("Decoding sent packet payload failed", e);
}
}

private MessageLite getMessage(MessageLiteOrBuilder messageOrBuilder) {
if (messageOrBuilder instanceof MessageLite.Builder) {
return ((MessageLite.Builder) messageOrBuilder).build();
}
if (messageOrBuilder instanceof MessageLite) {
return (MessageLite) messageOrBuilder;
}
throw new AssertionError("Unexpected MessageLiteOrBuilder class");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,11 @@ private synchronized Chunk.Builder prepareTransferParameters(boolean extend) {

Chunk.Type type = extend ? Chunk.Type.PARAMETERS_CONTINUE : Chunk.Type.PARAMETERS_RETRANSMIT;

Chunk.Builder chunk = newChunk()
Chunk.Builder chunk = newChunk(type)
.setPendingBytes(pendingBytes)
.setMaxChunkSizeBytes(parameters.maxChunkSizeBytes())
.setOffset(offset)
.setWindowEndOffset(windowEndOffset)
.setType(type);
.setWindowEndOffset(windowEndOffset);
if (parameters.chunkDelayMicroseconds() > 0) {
chunk.setMinDelayMicroseconds(parameters.chunkDelayMicroseconds());
}
Expand Down
6 changes: 3 additions & 3 deletions pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ final synchronized boolean handleCancellation() {
return false;
}

final Chunk.Builder newChunk() {
final Chunk.Builder newChunk(Chunk.Type type) {
// TODO(frolv): Properly set the session ID after it is configured by the server.
return Chunk.newBuilder().setSessionId(getId());
return Chunk.newBuilder().setSessionId(getId()).setType(type);
}

/** Sends a chunk. Returns true if sent, false if sending failed and the transfer was aborted. */
Expand Down Expand Up @@ -309,7 +309,7 @@ final synchronized void sendFinalChunk(Status status) {

// Only call finish() if the sendChunk was successful. If it wasn't, the exception would have
// already terminated the transfer.
if (sendChunk(newChunk().setStatus(status.code()))) {
if (sendChunk(newChunk(Chunk.Type.TRANSFER_COMPLETION).setStatus(status.code()))) {
cleanUp(status);
}
}
Expand Down
16 changes: 11 additions & 5 deletions pw_transfer/java/main/dev/pigweed/pw_transfer/WriteTransfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ protected WriteTransfer(int id,

@Override
synchronized Chunk getInitialChunk() {
return newChunk().setResourceId(getId()).setRemainingBytes(data.length).build();
return newChunk(Chunk.Type.TRANSFER_START)
.setResourceId(getId())
.setRemainingBytes(data.length)
.build();
}

@Override
Expand Down Expand Up @@ -191,7 +194,7 @@ private synchronized boolean doHandleDataChunk(Chunk chunk) {

chunkData = chunkData.substring(0, newChunkSize);

chunkToSend = buildChunk(chunkData);
chunkToSend = buildDataChunk(chunkData);

// If there's a timeout, resending this will trigger a transfer parameters update.
lastChunk = chunkToSend;
Expand Down Expand Up @@ -236,7 +239,9 @@ void setFutureResult() {
private static boolean isRetransmit(Chunk chunk) {
// Retransmit is the default behavior for older versions of the transfer protocol, which don't
// have a type field.
return !chunk.hasType() || chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT);
return !chunk.hasType()
|| (chunk.getType().equals(Chunk.Type.PARAMETERS_RETRANSMIT)
|| chunk.getType().equals(Chunk.Type.TRANSFER_START));
}

private static int getWindowEndOffset(Chunk chunk, int dataLength) {
Expand All @@ -248,8 +253,9 @@ private static int getWindowEndOffset(Chunk chunk, int dataLength) {
return min(chunk.getWindowEndOffset(), dataLength);
}

private Chunk buildChunk(ByteString chunkData) {
Chunk.Builder chunk = newChunk().setOffset(sentOffset).setData(chunkData);
private Chunk buildDataChunk(ByteString chunkData) {
Chunk.Builder chunk =
newChunk(Chunk.Type.TRANSFER_DATA).setOffset(sentOffset).setData(chunkData);

// If this is the last data chunk, setRemainingBytes to 0.
if (sentOffset + chunkData.size() == data.length) {
Expand Down
Loading

0 comments on commit 4ed7050

Please sign in to comment.