Skip to content

Commit

Permalink
pw_rpc: Support opening and closing RPC channels in Java client
Browse files Browse the repository at this point in the history
Fixes: b/250065568
Change-Id: I8a467755d1a9f67dcc9c60d0fda2c9e326b783fb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/119570
Commit-Queue: Wyatt Hepler <[email protected]>
Reviewed-by: Alexei Frolov <[email protected]>
  • Loading branch information
255 authored and CQ Bot Account committed Jan 6, 2023
1 parent 034d827 commit 94c5d0f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pw_rpc/java/main/dev/pigweed/pw_rpc/AbstractCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public final boolean finish() throws ChannelOutputException {
return rpcs.clientStreamEnd(this);
}

final int getChannelId() {
return rpc.channel().id();
}

final void sendPacket(byte[] packet) throws ChannelOutputException {
rpc.channel().send(packet);
}
Expand Down
19 changes: 19 additions & 0 deletions pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ public void onError(Status status) {
});
}

/**
* Adds a new channel to this RPC client.
*
* @throws InvalidRpcChannelException if the channel's ID is already in use
*/
public void openChannel(Channel channel) {
rpcs.openChannel(channel);
}

/**
* Closes a channel and aborts and RPCs using it.
*
* @param id the channel ID to close
* @return true if the channel was closed; false if the channel was not found
*/
public boolean closeChannel(int id) {
return rpcs.closeChannel(id);
}

/**
* Returns a MethodClient with the given name for the provided channelID
*
Expand Down
17 changes: 17 additions & 0 deletions pw_rpc/java/main/dev/pigweed/pw_rpc/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ private boolean sendPacket(AbstractCall<?, ?> call, byte[] packet) throws Channe
return true;
}

public synchronized void openChannel(Channel channel) {
if (channels.putIfAbsent(channel.id(), channel) != null) {
throw InvalidRpcChannelException.duplicate(channel.id());
}
}

public synchronized boolean closeChannel(int id) {
if (channels.remove(id) == null) {
return false;
}
pending.values()
.stream()
.filter(call -> call.getChannelId() == id)
.forEach(call -> call.handleError(Status.ABORTED));
return true;
}

public synchronized boolean handleNext(PendingRpc rpc, ByteString payload) {
AbstractCall<?, ?> call = pending.get(rpc);
if (call == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ static InvalidRpcChannelException unknown(int channelId) {
return new InvalidRpcChannelException("Invalid or closed RPC channel " + channelId);
}

static InvalidRpcChannelException duplicate(int channelId) {
return new InvalidRpcChannelException("A channel with ID " + channelId + " already exists!");
}

private InvalidRpcChannelException(String message) {
super(message);
}
Expand Down
55 changes: 55 additions & 0 deletions pw_rpc/java/test/dev/pigweed/pw_rpc/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,59 @@ public void streamObserverClient_create_invokeMethod() throws Exception {
verify(mockChannelOutput)
.send(requestPacket("pw.rpc.test1.TheTestService", "SomeUnary", payload).toByteArray());
}

@Test
public void closeChannel_abortsExisting() throws Exception {
MethodClient serverStreamMethod =
client.method(CHANNEL_ID, "pw.rpc.test1.TheTestService", "SomeServerStreaming");

Call call1 = serverStreamMethod.invokeServerStreaming(REQUEST_PAYLOAD, observer);
Call call2 = client.method(CHANNEL_ID, "pw.rpc.test1.TheTestService", "SomeClientStreaming")
.invokeClientStreaming(observer);
assertThat(call1.active()).isTrue();
assertThat(call2.active()).isTrue();

assertThat(client.closeChannel(CHANNEL_ID)).isTrue();

assertThat(call1.active()).isFalse();
assertThat(call2.active()).isFalse();

verify(observer, times(2)).onError(Status.ABORTED);

assertThrows(InvalidRpcChannelException.class,
() -> serverStreamMethod.invokeServerStreaming(REQUEST_PAYLOAD, observer));
}

@Test
public void closeChannel_noCalls() {
assertThat(client.closeChannel(CHANNEL_ID)).isTrue();
}

@Test
public void closeChannel_knownChannel() {
assertThat(client.closeChannel(CHANNEL_ID + 100)).isFalse();
}

@Test
public void openChannel_uniqueChannel() throws Exception {
int newChannelId = CHANNEL_ID + 100;
Channel.Output channelOutput = Mockito.mock(Channel.Output.class);
client.openChannel(new Channel(newChannelId, channelOutput));

client.method(newChannelId, "pw.rpc.test1.TheTestService", "SomeUnary")
.invokeUnary(REQUEST_PAYLOAD, observer);

verify(channelOutput)
.send(requestPacket("pw.rpc.test1.TheTestService", "SomeUnary", REQUEST_PAYLOAD)
.toBuilder()
.setChannelId(newChannelId)
.build()
.toByteArray());
}

@Test
public void openChannel_alreadyExists_throwsException() {
assertThrows(InvalidRpcChannelException.class,
() -> client.openChannel(new Channel(CHANNEL_ID, packet -> {})));
}
}
1 change: 0 additions & 1 deletion pw_rpc/java/test/dev/pigweed/pw_rpc/EndpointTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

Expand Down

0 comments on commit 94c5d0f

Please sign in to comment.