From 383768380dcbdf9068551bc2866d2a75730035cd Mon Sep 17 00:00:00 2001 From: Yufeng Wang Date: Fri, 12 Jul 2024 01:28:13 -0700 Subject: [PATCH] Make RPC call synchronous in Fabric Sync (#34187) * Make RPC call synchronous in Fabric Sync * Address review comments --- examples/fabric-admin/rpc/RpcClient.cpp | 42 +++++++++++++++++-- examples/fabric-admin/rpc/RpcClient.h | 4 +- .../fabric-bridge-app/linux/RpcClient.cpp | 36 +++++++++++++++- .../linux/include/RpcClient.h | 2 +- 4 files changed, 76 insertions(+), 8 deletions(-) diff --git a/examples/fabric-admin/rpc/RpcClient.cpp b/examples/fabric-admin/rpc/RpcClient.cpp index f03c4f3f699838..40d2e8ba09aa2e 100644 --- a/examples/fabric-admin/rpc/RpcClient.cpp +++ b/examples/fabric-admin/rpc/RpcClient.cpp @@ -19,9 +19,11 @@ #include "RpcClient.h" #include "RpcClientProcessor.h" +#include +#include +#include #include #include -#include #include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h" #include "pw_assert/check.h" @@ -36,6 +38,7 @@ using namespace chip; namespace { // Constants +constexpr uint32_t kRpcTimeoutMs = 1000; constexpr uint32_t kDefaultChannelId = 1; // Fabric Bridge Client @@ -43,9 +46,37 @@ rpc::pw_rpc::nanopb::FabricBridge::Client fabricBridgeClient(rpc::client::GetDef pw::rpc::NanopbUnaryReceiver<::pw_protobuf_Empty> addSynchronizedDeviceCall; pw::rpc::NanopbUnaryReceiver<::pw_protobuf_Empty> removeSynchronizedDeviceCall; +std::mutex responseMutex; +std::condition_variable responseCv; +bool responseReceived = false; +CHIP_ERROR responseError = CHIP_NO_ERROR; + +template +CHIP_ERROR WaitForResponse(CallType & call) +{ + std::unique_lock lock(responseMutex); + responseReceived = false; + responseError = CHIP_NO_ERROR; + + if (responseCv.wait_for(lock, std::chrono::milliseconds(kRpcTimeoutMs), [] { return responseReceived; })) + { + return responseError; + } + else + { + fprintf(stderr, "RPC Response timed out!"); + return CHIP_ERROR_TIMEOUT; + } +} + // Callback function to be called when the RPC response is received void OnAddDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status status) { + std::lock_guard lock(responseMutex); + responseReceived = true; + responseError = status.ok() ? CHIP_NO_ERROR : CHIP_ERROR_INTERNAL; + responseCv.notify_one(); + if (status.ok()) { ChipLogProgress(NotSpecified, "AddSynchronizedDevice RPC call succeeded!"); @@ -59,6 +90,11 @@ void OnAddDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status // Callback function to be called when the RPC response is received void OnRemoveDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status status) { + std::lock_guard lock(responseMutex); + responseReceived = true; + responseError = status.ok() ? CHIP_NO_ERROR : CHIP_ERROR_INTERNAL; + responseCv.notify_one(); + if (status.ok()) { ChipLogProgress(NotSpecified, "RemoveSynchronizedDevice RPC call succeeded!"); @@ -101,7 +137,7 @@ CHIP_ERROR AddSynchronizedDevice(chip::NodeId nodeId) return CHIP_ERROR_INTERNAL; } - return CHIP_NO_ERROR; + return WaitForResponse(addSynchronizedDeviceCall); } CHIP_ERROR RemoveSynchronizedDevice(chip::NodeId nodeId) @@ -128,5 +164,5 @@ CHIP_ERROR RemoveSynchronizedDevice(chip::NodeId nodeId) return CHIP_ERROR_INTERNAL; } - return CHIP_NO_ERROR; + return WaitForResponse(removeSynchronizedDeviceCall); } diff --git a/examples/fabric-admin/rpc/RpcClient.h b/examples/fabric-admin/rpc/RpcClient.h index eb28c19bee7d3e..a1754b3846d508 100644 --- a/examples/fabric-admin/rpc/RpcClient.h +++ b/examples/fabric-admin/rpc/RpcClient.h @@ -41,7 +41,7 @@ CHIP_ERROR InitRpcClient(uint16_t rpcServerPort); * * @param nodeId The Node ID of the device to be added. * @return CHIP_ERROR An error code indicating the success or failure of the operation. - * - CHIP_NO_ERROR: The RPC command was successfully sent. + * - CHIP_NO_ERROR: The RPC command was successfully processed. * - CHIP_ERROR_BUSY: Another operation is currently in progress. * - CHIP_ERROR_INTERNAL: An internal error occurred while activating the RPC call. */ @@ -56,7 +56,7 @@ CHIP_ERROR AddSynchronizedDevice(chip::NodeId nodeId); * * @param nodeId The Node ID of the device to be removed. * @return CHIP_ERROR An error code indicating the success or failure of the operation. - * - CHIP_NO_ERROR: The RPC command was successfully sent. + * - CHIP_NO_ERROR: The RPC command was successfully processed. * - CHIP_ERROR_BUSY: Another operation is currently in progress. * - CHIP_ERROR_INTERNAL: An internal error occurred while activating the RPC call. */ diff --git a/examples/fabric-bridge-app/linux/RpcClient.cpp b/examples/fabric-bridge-app/linux/RpcClient.cpp index 815250f12bc328..c26d2b8f0aad31 100644 --- a/examples/fabric-bridge-app/linux/RpcClient.cpp +++ b/examples/fabric-bridge-app/linux/RpcClient.cpp @@ -19,9 +19,11 @@ #include "RpcClient.h" #include "RpcClientProcessor.h" +#include +#include +#include #include #include -#include #include "fabric_admin_service/fabric_admin_service.rpc.pb.h" #include "pw_assert/check.h" @@ -36,15 +38,44 @@ using namespace chip; namespace { // Constants +constexpr uint32_t kRpcTimeoutMs = 1000; constexpr uint32_t kDefaultChannelId = 1; // Fabric Admin Client rpc::pw_rpc::nanopb::FabricAdmin::Client fabricAdminClient(rpc::client::GetDefaultRpcClient(), kDefaultChannelId); pw::rpc::NanopbUnaryReceiver<::chip_rpc_OperationStatus> openCommissioningWindowCall; +std::mutex responseMutex; +std::condition_variable responseCv; +bool responseReceived = false; +CHIP_ERROR responseError = CHIP_NO_ERROR; + +template +CHIP_ERROR WaitForResponse(CallType & call) +{ + std::unique_lock lock(responseMutex); + responseReceived = false; + responseError = CHIP_NO_ERROR; + + if (responseCv.wait_for(lock, std::chrono::milliseconds(kRpcTimeoutMs), [] { return responseReceived; })) + { + return responseError; + } + else + { + ChipLogError(NotSpecified, "RPC Response timed out!"); + return CHIP_ERROR_TIMEOUT; + } +} + // Callback function to be called when the RPC response is received void OnOpenCommissioningWindowCompleted(const chip_rpc_OperationStatus & response, pw::Status status) { + std::lock_guard lock(responseMutex); + responseReceived = true; + responseError = status.ok() ? CHIP_NO_ERROR : CHIP_ERROR_INTERNAL; + responseCv.notify_one(); + if (status.ok()) { ChipLogProgress(NotSpecified, "OpenCommissioningWindow received operation status: %d", response.success); @@ -81,8 +112,9 @@ CHIP_ERROR OpenCommissioningWindow(NodeId nodeId) if (!openCommissioningWindowCall.active()) { + // The RPC call was not sent. This could occur due to, for example, an invalid channel ID. Handle if necessary. return CHIP_ERROR_INTERNAL; } - return CHIP_NO_ERROR; + return WaitForResponse(openCommissioningWindowCall); } diff --git a/examples/fabric-bridge-app/linux/include/RpcClient.h b/examples/fabric-bridge-app/linux/include/RpcClient.h index bd424e9d275910..34fa5c19de9349 100644 --- a/examples/fabric-bridge-app/linux/include/RpcClient.h +++ b/examples/fabric-bridge-app/linux/include/RpcClient.h @@ -37,7 +37,7 @@ CHIP_ERROR InitRpcClient(uint16_t rpcServerPort); * * @param nodeId The identifier of the node for which the commissioning window should be opened. * @return CHIP_ERROR An error code indicating the success or failure of the operation. - * - CHIP_NO_ERROR: The RPC command was successfully sent. + * - CHIP_NO_ERROR: The RPC command was successfully processed. * - CHIP_ERROR_BUSY: Another commissioning window is currently in progress. * - CHIP_ERROR_INTERNAL: An internal error occurred. */