Skip to content

Commit

Permalink
Add RpcClientProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
yufengwangca committed Jun 3, 2024
1 parent 3075bcb commit d691a6b
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 221 deletions.
3 changes: 3 additions & 0 deletions examples/fabric-admin/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ config("config") {
include_dirs = [
".",
"${chip_root}/examples/common",
"${chip_root}/examples/platform/linux",
"${chip_root}/zzz_generated/app-common/app-common",
"${chip_root}/zzz_generated/chip-tool",
"${chip_root}/src/lib",
Expand Down Expand Up @@ -114,6 +115,8 @@ static_library("fabric-admin-utils") {
]

sources += [
"${chip_root}/examples/platform/linux/RpcClientProcessor.cpp",
"${chip_root}/examples/platform/linux/RpcClientProcessor.h",
"${chip_root}/examples/platform/linux/system_rpc_server.cc",
"rpc/RpcClient.cpp",
"rpc/RpcClient.h",
Expand Down
134 changes: 29 additions & 105 deletions examples/fabric-admin/rpc/RpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,149 +17,73 @@
*/

#include "RpcClient.h"
#include "RpcClientProcessor.h"

#include <string>
#include <thread>
#include <unistd.h>

#include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h"
#include "pw_assert/check.h"
#include "pw_function/function.h"
#include "pw_hdlc/decoder.h"
#include "pw_hdlc/default_addresses.h"
#include "pw_hdlc/rpc_channel.h"
#include "pw_rpc/client.h"
#include "pw_stream/socket_stream.h"

namespace {

constexpr size_t kMaxTransmissionUnit = 256;
constexpr uint32_t kRpcTimeoutMs = 1000;
const char * rpcServerAddress = "127.0.0.1";

pw::stream::SocketStream rpcSocketStream;

// Set up the output channel for the pw_rpc client to use.
pw::hdlc::RpcChannelOutput hdlc_channel_output(rpcSocketStream, pw::hdlc::kDefaultRpcAddress, "HDLC channel");

// An array of RPC channels (channels) is created, each associated with an HDLC channel output.
// This sets up the communication channels for RPC calls.
pw::rpc::Channel channels[] = { pw::rpc::Channel::Create<1>(&hdlc_channel_output) };

// Initialize the RPC client with the channels.
pw::rpc::Client client(channels);
using namespace chip;

// Generated clients are namespaced with their proto library.
using FabricBridgeClient = chip::rpc::pw_rpc::nanopb::FabricBridge::Client;
namespace {

// RPC channel ID on which to make client calls. RPC calls cannot be made on
// channel 0 (Channel::kUnassignedChannelId).
// Constants
constexpr uint32_t kDefaultChannelId = 1;

// Function to process incoming packets
void ProcessPackets()
{
std::array<std::byte, kMaxTransmissionUnit> inputBuf;
pw::hdlc::Decoder decoder(inputBuf);
// Fabric Bridge Client
rpc::pw_rpc::nanopb::FabricBridge::Client fabricBridgeClient(rpc::client::GetDefaultRpcClient(), kDefaultChannelId);
pw::rpc::NanopbUnaryReceiver<::pw_protobuf_Empty> addSynchronizedDeviceCall;

while (true)
{
std::array<std::byte, kMaxTransmissionUnit> data;
auto ret = rpcSocketStream.Read(data);
if (!ret.ok())
{
if (ret.status() == pw::Status::OutOfRange())
{
// Handle remote disconnect
rpcSocketStream.Close();
return;
}
continue;
}

for (std::byte byte : ret.value())
{
auto result = decoder.Process(byte);
if (!result.ok())
{
// Wait for more bytes that form a complete packet
continue;
}
pw::hdlc::Frame & frame = result.value();
if (frame.address() != pw::hdlc::kDefaultRpcAddress)
{
// Wrong address; ignore the packet
continue;
}

client.ProcessPacket(frame.data()).IgnoreError();
}
}
}

template <typename CallType>
CHIP_ERROR WaitForResponse(CallType & call)
{
if (!call.active())
{
return CHIP_ERROR_INTERNAL;
}

// Wait for the response or timeout
uint32_t elapsedTimeMs = 0;
const uint32_t sleepTimeMs = 100;

while (call.active() && elapsedTimeMs < kRpcTimeoutMs)
{
usleep(sleepTimeMs * 1000);
elapsedTimeMs += sleepTimeMs;
}

if (elapsedTimeMs >= kRpcTimeoutMs)
{
fprintf(stderr, "RPC Response timed out!");
return CHIP_ERROR_TIMEOUT;
}

return CHIP_NO_ERROR;
}

void AddDeviceResponse(const pw_protobuf_Empty & response, pw::Status status)
// Callback function to be called when the RPC response is received
void OnAddDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status status)
{
if (status.ok())
{
printf("RPC call succeeded\n");
ChipLogProgress(NotSpecified, "AddSynchronizedDevice RPC call succeeded!");
}
else
{
printf("RPC call failed with status: %d\n", status.code());
ChipLogProgress(NotSpecified, "AddSynchronizedDevice RPC call failed with status: %d\n", status.code());
}
}

} // namespace

CHIP_ERROR InitRpcClient(uint16_t rpcServerPort)
{
if (rpcSocketStream.Connect(rpcServerAddress, rpcServerPort) != PW_STATUS_OK)
{
return CHIP_ERROR_NOT_CONNECTED;
}

// Start a thread to process incoming packets
std::thread packet_processor(ProcessPackets);
packet_processor.detach();

return CHIP_NO_ERROR;
rpc::client::SetRpcServerPort(rpcServerPort);
return rpc::client::StartPacketProcessing();
}

CHIP_ERROR AddSynchronizedDevice(chip::NodeId nodeId)
{
ChipLogProgress(NotSpecified, "AddSynchronizedDevice");

FabricBridgeClient fabric_bridge_client(client, kDefaultChannelId);
if (addSynchronizedDeviceCall.active())
{
ChipLogError(NotSpecified, "OpenCommissioningWindow is in progress\n");
return CHIP_ERROR_BUSY;
}

chip_rpc_SynchronizedDevice device;
device.node_id = nodeId;

// The RPC will remain active as long as `call` is alive.
auto call = fabric_bridge_client.AddSynchronizedDevice(device, AddDeviceResponse);
return WaitForResponse(call);
// The RPC will remain active as long as `addSynchronizedDeviceCall` is alive.
addSynchronizedDeviceCall = fabricBridgeClient.AddSynchronizedDevice(device, OnAddDeviceResponseCompleted);

if (!addSynchronizedDeviceCall.active())
{
return CHIP_ERROR_INTERNAL;
}

return CHIP_NO_ERROR;
}
19 changes: 19 additions & 0 deletions examples/fabric-admin/rpc/RpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@

constexpr uint16_t kFabricBridgeServerPort = 33002;

/**
* @brief Initializes the RPC client with the specified server port.
*
* This function sets the RPC server port and starts packet processing for the RPC client.
*
* @param rpcServerPort The port number on which the RPC server is running.
* @return CHIP_NO_ERROR on successful initialization, or an appropriate CHIP_ERROR on failure.
*/
CHIP_ERROR InitRpcClient(uint16_t rpcServerPort);

/**
* @brief Adds a synchronized device to the RPC client.
*
* This function attempts to add a device identified by its `nodeId` to the synchronized device list.
* It logs the progress and checks if an `OpenCommissioningWindow` operation is already in progress.
* If an operation is in progress, it returns `CHIP_ERROR_BUSY`.
*
* @param nodeId The Node ID of the device to be added.
* @return CHIP_NO_ERROR on success, `CHIP_ERROR_BUSY` if an operation is already in progress,
* or `CHIP_ERROR_INTERNAL` if there is an internal error while activating the RPC call.
*/
CHIP_ERROR AddSynchronizedDevice(chip::NodeId nodeId);
2 changes: 1 addition & 1 deletion examples/fabric-admin/rpc/RpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FabricAdmin final : public chip::rpc::FabricAdmin
pw::Status OpenCommissioningWindow(const chip_rpc_DeviceInfo & request, chip_rpc_OperationStatus & response) override
{
chip::NodeId nodeId = request.node_id;
printf("Received OpenCommissioningWindow request: 0x%lx\n", nodeId);
ChipLogProgress(NotSpecified, "Received OpenCommissioningWindow request: 0x%lx", nodeId);
response.success = false;

return pw::OkStatus();
Expand Down
9 changes: 7 additions & 2 deletions examples/fabric-bridge-app/linux/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ executable("fabric-bridge-app") {
sources = [
"${chip_root}/examples/fabric-bridge-app/fabric-bridge-common/include/CHIPProjectAppConfig.h",
"Device.cpp",
"include/Device.h",
"DeviceManager.cpp",
"include/Device.h",
"include/DeviceManager.h",
"main.cpp",
]
Expand All @@ -57,6 +57,8 @@ executable("fabric-bridge-app") {
]

sources += [
"${chip_root}/examples/platform/linux/RpcClientProcessor.cpp",
"${chip_root}/examples/platform/linux/RpcClientProcessor.h",
"${chip_root}/examples/platform/linux/system_rpc_server.cc",
"RpcClient.cpp",
"RpcServer.cpp",
Expand All @@ -81,7 +83,10 @@ executable("fabric-bridge-app") {

deps += pw_build_LINK_DEPS

include_dirs += [ "${chip_root}/examples/common" ]
include_dirs += [
"${chip_root}/examples/common",
"${chip_root}/examples/platform/linux",
]
}

output_dir = root_out_dir
Expand Down
Loading

0 comments on commit d691a6b

Please sign in to comment.