Skip to content

Commit

Permalink
pw_rpc: Add TryFinish API for pw_rpc stream
Browse files Browse the repository at this point in the history
Bug: 328462705

pw_rpc stream when calling `Finish()` will terminate the
call regardless if the final packet gets send out successfully. Adding
a `TryFinish()` to try to not terminate the call if the final packet
fails to send, allows user to resend the final packet.

Change-Id: Ice59031d9789906afeb4e4663587541287503104
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/195126
Reviewed-by: Alexei Frolov <[email protected]>
Commit-Queue: Xiaofan Jiang <[email protected]>
  • Loading branch information
Xiaofan Jiang authored and CQ Bot Account committed Mar 11, 2024
1 parent 7df8e94 commit c382dcf
Show file tree
Hide file tree
Showing 12 changed files with 499 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pw_rpc/call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ Status Call::CloseAndSendFinalPacketLocked(PacketType type,
return send_status;
}

Status Call::TryCloseAndSendFinalPacketLocked(PacketType type,
ConstByteSpan response,
Status status) {
const Status send_status = SendPacket(type, response, status);
// Only close the call if the final packet gets sent out successfully.
if (send_status.ok()) {
UnregisterAndMarkClosed();
}
return send_status;
}

Status Call::WriteLocked(ConstByteSpan payload) {
return SendPacket(properties_.call_type() == kServerCall
? PacketType::SERVER_STREAM
Expand Down
16 changes: 16 additions & 0 deletions pw_rpc/nanopb/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,20 @@ Status SendFinalResponse(NanopbServerCall& call,
return call.CloseAndSendResponseLocked(*result, status);
}

Status TrySendFinalResponse(NanopbServerCall& call,
const void* payload,
const Status status) {
RpcLockGuard lock;
if (!call.active_locked()) {
return Status::FailedPrecondition();
}

Result<ByteSpan> result =
EncodeToPayloadBuffer(payload, call.serde().response());
if (!result.ok()) {
return call.TryCloseAndSendServerErrorLocked(Status::Internal());
}
return call.TryCloseAndSendResponseLocked(*result, status);
}

} // namespace pw::rpc::internal
5 changes: 5 additions & 0 deletions pw_rpc/nanopb/docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ closed or goes out of scope. The writer has a simple API to return responses:

Closes the stream and sends back the RPC's overall status to the client.

.. cpp:function:: Status ServerWriter::TryFinish(Status status = OkStatus())

Closes the stream and sends back the RPC's overall status to the client only
if the final packet is successfully sent.

Once a ``ServerWriter`` has been closed, all future ``Write`` calls will fail.

.. attention::
Expand Down
4 changes: 4 additions & 0 deletions pw_rpc/nanopb/public/pw_rpc/nanopb/internal/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ Status SendFinalResponse(NanopbServerCall& call,
const void* payload,
Status status) PW_LOCKS_EXCLUDED(rpc_lock());

Status TrySendFinalResponse(NanopbServerCall& call,
const void* payload,
Status status) PW_LOCKS_EXCLUDED(rpc_lock());

} // namespace pw::rpc::internal
17 changes: 17 additions & 0 deletions pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class NanopbServerCall : public ServerCall {
return SendFinalResponse(*this, payload, status);
}

Status TrySendUnaryResponse(const void* payload, Status status)
PW_LOCKS_EXCLUDED(rpc_lock()) {
return TrySendFinalResponse(*this, payload, status);
}

const NanopbMethodSerde& serde() const
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return *serde_;
Expand Down Expand Up @@ -194,6 +199,10 @@ class NanopbServerReaderWriter
return internal::Call::CloseAndSendResponse(status);
}

Status TryFinish(Status status = OkStatus()) {
return internal::Call::TryCloseAndSendResponse(status);
}

// Functions for setting RPC event callbacks.
using internal::Call::set_on_error;
using internal::ServerCall::set_on_completion_requested;
Expand Down Expand Up @@ -262,6 +271,10 @@ class NanopbServerReader : private internal::BaseNanopbServerReader<Request> {
return internal::NanopbServerCall::SendUnaryResponse(&response, status);
}

Status TryFinish(const Response& response, Status status = OkStatus()) {
return internal::NanopbServerCall::TrySendUnaryResponse(&response, status);
}

private:
friend class internal::NanopbMethod;
friend class Server;
Expand Down Expand Up @@ -327,6 +340,10 @@ class NanopbServerWriter : private internal::NanopbServerCall {
return internal::Call::CloseAndSendResponse(status);
}

Status TryFinish(Status status = OkStatus()) {
return internal::Call::TryCloseAndSendResponse(status);
}

using internal::Call::set_on_error;
using internal::ServerCall::set_on_completion_requested;
using internal::ServerCall::set_on_completion_requested_if_enabled;
Expand Down
104 changes: 104 additions & 0 deletions pw_rpc/nanopb/server_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,40 @@ TEST(NanopbServerWriter, Closed) {
call.set_on_error([](Status) {});
}

TEST(NanopbServerWriter, TryClosedFailed) {
ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
NanopbServerWriter call =
NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
TestService::TestServerStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
// Sets ChannelOutput to always return false.
ctx.output.set_send_status(Status::Unknown());
ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));

// Call should be still alive.
ASSERT_TRUE(call.active());
}

TEST(NanopbServerWriter, TryCloseSuccessful) {
ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
NanopbServerWriter call =
NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
TestService::TestServerStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
// Sets ChannelOutput to always return false.
ctx.output.set_send_status(Status::Unknown());
ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));

// Call should be still alive.
ASSERT_TRUE(call.active());

// Tries to close the call again, with ChannelOutput set to return ok.
ctx.output.set_send_status(OkStatus());
ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
// Call should be closed.
ASSERT_FALSE(call.active());
}

TEST(NanopbServerReader, Closed) {
ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
Expand All @@ -166,6 +200,40 @@ TEST(NanopbServerReader, Closed) {
call.set_on_error([](Status) {});
}

TEST(NanopbServerReader, TryClosedFailed) {
ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
pw_rpc_test_TestStreamResponse>::
Open<TestService::TestClientStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
// Sets ChannelOutput to always return false.
ctx.output.set_send_status(Status::Unknown());
ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));

// Call should be still alive.
ASSERT_TRUE(call.active());
}

TEST(NanopbServerReader, TryCloseSuccessful) {
ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
pw_rpc_test_TestStreamResponse>::
Open<TestService::TestClientStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
// Sets ChannelOutput to always return false.
ctx.output.set_send_status(Status::Unknown());
ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));

// Call should be still alive.
ASSERT_TRUE(call.active());

// Tries to close the call again, with ChannelOutput set to return ok.
ctx.output.set_send_status(OkStatus());
ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
// Call should be closed.
ASSERT_FALSE(call.active());
}

TEST(NanopbServerReaderWriter, Closed) {
ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
NanopbServerReaderWriter call =
Expand All @@ -185,6 +253,42 @@ TEST(NanopbServerReaderWriter, Closed) {
call.set_on_error([](Status) {});
}

TEST(NanopbServerReaderWriter, TryClosedFailed) {
ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
NanopbServerReaderWriter call =
NanopbServerReaderWriter<pw_rpc_test_TestRequest,
pw_rpc_test_TestStreamResponse>::
Open<TestService::TestBidirectionalStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
// Sets ChannelOutput to always return false.
ctx.output.set_send_status(Status::Unknown());
ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));

// Call should be still alive.
ASSERT_TRUE(call.active());
}

TEST(NanopbServerReaderWriter, TryCloseSuccessful) {
ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
NanopbServerReaderWriter call =
NanopbServerReaderWriter<pw_rpc_test_TestRequest,
pw_rpc_test_TestStreamResponse>::
Open<TestService::TestBidirectionalStreamRpc>(
ctx.server, ctx.channel.id(), ctx.service);
// Sets ChannelOutput to always return false.
ctx.output.set_send_status(Status::Unknown());
ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));

// Call should be still alive.
ASSERT_TRUE(call.active());

// Tries to close the call again, with ChannelOutput set to return ok.
ctx.output.set_send_status(OkStatus());
ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
// Call should be closed.
ASSERT_FALSE(call.active());
}

TEST(NanopbUnaryResponder, Open_ReturnsUsableResponder) {
ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
NanopbUnaryResponder responder =
Expand Down
30 changes: 30 additions & 0 deletions pw_rpc/public/pw_rpc/internal/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,31 @@ class Call : public IntrusiveList<Call>::Item, private rpc::Writer {
pwpb::PacketType::SERVER_ERROR, {}, error);
}

// Closes the Call and sends a RESPONSE packet, if the RESPONSE packet failed
// to send , keep the call alive and return error. This API allows user to
// resend RESPONSE packet when transmission failed.
Status TryCloseAndSendResponse(ConstByteSpan response, Status status)
PW_LOCKS_EXCLUDED(rpc_lock()) {
RpcLockGuard lock;
return TryCloseAndSendResponseLocked(response, status);
}

Status TryCloseAndSendResponseLocked(ConstByteSpan response, Status status)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return TryCloseAndSendFinalPacketLocked(
pwpb::PacketType::RESPONSE, response, status);
}

Status TryCloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
return TryCloseAndSendResponse({}, status);
}

Status TryCloseAndSendServerErrorLocked(Status error)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return TryCloseAndSendFinalPacketLocked(
pwpb::PacketType::SERVER_ERROR, {}, error);
}

// Public function that indicates that the client requests completion of the
// RPC, but is still active and listening to responses. For client streaming
// and bi-directional streaming RPCs, this also closes the client stream. If
Expand Down Expand Up @@ -539,6 +564,11 @@ class Call : public IntrusiveList<Call>::Item, private rpc::Writer {
Status status)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

Status TryCloseAndSendFinalPacketLocked(pwpb::PacketType type,
ConstByteSpan response,
Status status)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
return callbacks_executing_ != 0u;
}
Expand Down
5 changes: 5 additions & 0 deletions pw_rpc/pwpb/docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ closed or goes out of scope. The writer has a simple API to return responses:

Closes the stream and sends back the RPC's overall status to the client.

.. cpp:function:: Status PwpbServerWriter::TryFinish(Status status = OkStatus())

Closes the stream and sends back the RPC's overall status to the client only
if the final packet is successfully sent.

Once a ``ServerWriter`` has been closed, all future ``Write`` calls will fail.

.. attention::
Expand Down
34 changes: 34 additions & 0 deletions pw_rpc/pwpb/public/pw_rpc/pwpb/server_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ class PwpbServerCall : public ServerCall {
return CloseAndSendResponseLocked(*buffer, status);
}

template <typename Response>
Status TrySendUnaryResponse(const Response& response,
Status status = OkStatus())
PW_LOCKS_EXCLUDED(rpc_lock()) {
RpcLockGuard lock;
if (!active_locked()) {
return Status::FailedPrecondition();
}

Result<ByteSpan> buffer =
EncodeToPayloadBuffer(response, serde_->response());
if (!buffer.ok()) {
return TryCloseAndSendServerErrorLocked(Status::Internal());
}

return TryCloseAndSendResponseLocked(*buffer, status);
}

protected:
// Derived classes allow default construction so that users can declare a
// variable into which to move server reader/writers from RPC calls.
Expand Down Expand Up @@ -251,6 +269,10 @@ class PwpbServerReaderWriter : private internal::BasePwpbServerReader<Request> {
return internal::Call::CloseAndSendResponse(status);
}

Status TryFinish(Status status = OkStatus()) {
return internal::Call::TryCloseAndSendResponse(status);
}

private:
friend class internal::PwpbMethod;
friend class Server;
Expand Down Expand Up @@ -323,6 +345,10 @@ class PwpbServerReader : private internal::BasePwpbServerReader<Request> {
return internal::PwpbServerCall::SendUnaryResponse(response, status);
}

Status TryFinish(const Response& response, Status status = OkStatus()) {
return internal::PwpbServerCall::TrySendUnaryResponse(response, status);
}

private:
friend class internal::PwpbMethod;
friend class Server;
Expand Down Expand Up @@ -395,6 +421,10 @@ class PwpbServerWriter : private internal::PwpbServerCall {
return internal::Call::CloseAndSendResponse(status);
}

Status TryFinish(Status status = OkStatus()) {
return internal::Call::TryCloseAndSendResponse(status);
}

private:
friend class internal::PwpbMethod;
friend class Server;
Expand Down Expand Up @@ -460,6 +490,10 @@ class PwpbUnaryResponder : private internal::PwpbServerCall {
return internal::PwpbServerCall::SendUnaryResponse(response, status);
}

Status TryFinish(const Response& response, Status status = OkStatus()) {
return internal::PwpbServerCall::TrySendUnaryResponse(response, status);
}

private:
friend class internal::PwpbMethod;
friend class Server;
Expand Down
Loading

0 comments on commit c382dcf

Please sign in to comment.