Skip to content

Commit

Permalink
pw_thread: Update service with breaking apart responses
Browse files Browse the repository at this point in the history
Requires: pigweed-internal:31501
Change-Id: I29205f00b889b2c07d85fd3f419f236f14c3e1a4
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/109256
Commit-Queue: Medha Kini <[email protected]>
Reviewed-by: Armando Montanez <[email protected]>
  • Loading branch information
Medha Kini authored and CQ Bot Account committed Sep 9, 2022
1 parent 302b1b6 commit 413cc71
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 35 deletions.
16 changes: 11 additions & 5 deletions pw_system/py/pw_system/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pw_rpc import callback_client, console_tools
from pw_status import Status
from pw_thread.thread_analyzer import ThreadSnapshotAnalyzer
from pw_thread_protos import thread_pb2
from pw_tokenizer import detokenize
from pw_tokenizer.proto import decode_optionally_tokenized
import pw_unit_test.rpc
Expand Down Expand Up @@ -190,8 +191,13 @@ def print_metrics(metrics, path):
print_metrics(metrics, '')
return metrics

def snapshot_peak_stack_usage(self):
_, rsp = self.rpcs.pw.thread.ThreadSnapshotService.GetPeakStackUsage()
for thread_info in rsp:
for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
_LOG.info('%s', line)
def snapshot_peak_stack_usage(self, thread_name: str = None):
_, rsp = self.rpcs.pw.thread.ThreadSnapshotService \
.GetPeakStackUsage(name=thread_name)

thread_info = thread_pb2.SnapshotThreadInfo()
for thread_info_block in rsp:
for thread in thread_info_block.threads:
thread_info.threads.append(thread)
for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
_LOG.info('%s', line)
6 changes: 1 addition & 5 deletions pw_system/thread_snapshot_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
namespace pw::system {
namespace {

constexpr size_t kEncodeBufferSize = thread::RequiredServiceBufferSize();

std::array<std::byte, kEncodeBufferSize> encode_buffer;

thread::ThreadSnapshotService system_thread_snapshot_service(encode_buffer);
thread::ThreadSnapshotServiceBuilder<> system_thread_snapshot_service;

} // namespace

Expand Down
19 changes: 7 additions & 12 deletions pw_thread/docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,11 @@ RPC service setup
=================
To expose a ``ThreadSnapshotService`` in your application, do the following:

1. Instantiate a buffer with specified size (See ``RequiredServiceBufferSize``
below for more information on buffer size calculation).
2. Create an instance of ``pw::thread::ThreadSnapshotService``.
3. Register the service with your RPC server.
1. Create an instance of ``pw::thread::ThreadSnapshotServiceBuilder``. This is a
template class that takes in the number of threads as at template, or
defaults this value to ``PW_THREAD_MAXIMUM_THREADS`` if no argument is
provided.
2. Register the service with your RPC server.

For example:

Expand All @@ -502,14 +503,8 @@ For example:
};
Server server(channels);
// Calculate encode buffer size, defaults to `PW_THREAD_MAXIMUM_THREADS`
// if no argument is provided.
constexpr size_t kEncodeBufferSize =
pw::thread::RequiredBufferSize(/* number of threads */);
std::array<std::byte, kEncodeBufferSize> encode_buffer;
// Thread snapshot service instance.
pw::thread::ThreadSnapshotService thread_snapshot_service(encode_buffer);
// Thread snapshot service builder instance.
thread::ThreadSnapshotServiceBuilder</*num threads*/> thread_snapshot_service;
void RegisterServices() {
server.RegisterService(thread_snapshot_service);
Expand Down
5 changes: 5 additions & 0 deletions pw_thread/public/pw_thread/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@
#ifndef PW_THREAD_MAXIMUM_THREADS
#define PW_THREAD_MAXIMUM_THREADS 10
#endif // PW_THREAD_MAXIMUM_THREADS

// The max number of threads to bundle by default for thread snapshot service.
#ifndef PW_THREAD_NUM_BUNDLED_THREADS
#define PW_THREAD_NUM_BUNDLED_THREADS 3
#endif // PW_THREAD_MAXIMUM_THREADS
43 changes: 38 additions & 5 deletions pw_thread/public/pw_thread/thread_snapshot_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,59 @@ namespace pw::thread {
Status ProtoEncodeThreadInfo(SnapshotThreadInfo::MemoryEncoder& encoder,
const ThreadInfo& thread_info);

// Calculates encoded buffer size based on code gen constants.
constexpr size_t RequiredServiceBufferSize(
const size_t num_threads = PW_THREAD_MAXIMUM_THREADS) {
size_t num_threads = PW_THREAD_MAXIMUM_THREADS) {
constexpr size_t kSizeOfResponse =
SnapshotThreadInfo::kMaxEncodedSizeBytes + Thread::kMaxEncodedSizeBytes;
return kSizeOfResponse * num_threads;
}

// The ThreadSnapshotService will return peak stack usage across running
// threads when requested by GetPeak().
class ThreadSnapshotService final
//
// Parameter encode_buffer: buffer where thread information is encoded. Size
// depends on RequiredBufferSize().
//
// Parameter thread_proto_indices: array keeping track of thread boundaries in
// the encode buffer. The service uses these indices to send response data out
// in bundles.
//
// Parameter num_bundled_threads: constant describing number of threads per
// bundle in response.
class ThreadSnapshotService
: public pw_rpc::raw::ThreadSnapshotService::Service<
ThreadSnapshotService> {
public:
ThreadSnapshotService(span<std::byte> encode_buffer)
: encode_buffer_(encode_buffer) {}

constexpr ThreadSnapshotService(
span<std::byte> encode_buffer,
Vector<size_t>& thread_proto_indices,
size_t num_bundled_threads = PW_THREAD_NUM_BUNDLED_THREADS)
: encode_buffer_(encode_buffer),
thread_proto_indices_(thread_proto_indices),
num_bundled_threads_(num_bundled_threads) {}
void GetPeakStackUsage(ConstByteSpan request, rpc::RawServerWriter& response);

private:
span<std::byte> encode_buffer_;
Vector<size_t>& thread_proto_indices_;
size_t num_bundled_threads_;
};

// A ThreadSnapshotService that allocates required buffers based on the
// number of running threads on a device.
template <size_t kNumThreads = PW_THREAD_MAXIMUM_THREADS>
class ThreadSnapshotServiceBuilder : public ThreadSnapshotService {
public:
ThreadSnapshotServiceBuilder()
: ThreadSnapshotService(encode_buffer_, thread_proto_indices_) {}

private:
std::array<std::byte, thread::RequiredServiceBufferSize(kNumThreads)>
encode_buffer_;
// + 1 is needed to account for extra index that comes with the first
// submessage start or the last submessage end.
Vector<size_t, kNumThreads + 1> thread_proto_indices_;
};

} // namespace pw::thread
53 changes: 45 additions & 8 deletions pw_thread/thread_snapshot_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "pw_thread/thread_snapshot_service.h"

#include "pw_containers/vector.h"
#include "pw_log/log.h"
#include "pw_protobuf/decoder.h"
#include "pw_rpc/raw/server_reader_writer.h"
Expand Down Expand Up @@ -99,17 +100,28 @@ void ThreadSnapshotService::GetPeakStackUsage(
SnapshotThreadInfo::MemoryEncoder encoder;
Status status;
ConstByteSpan name;

// For sending out data by chunks.
Vector<size_t>& thread_proto_indices;
};

ConstByteSpan name_request;
if (!request.empty()) {
DecodeThreadName(request, name_request);
Status status = DecodeThreadName(request, name_request);
if (!status.ok()) {
PW_LOG_ERROR("Service unable to decode thread name with error code %d",
status.code());
}
}

IterationInfo iteration_info{
SnapshotThreadInfo::MemoryEncoder(encode_buffer_),
OkStatus(),
name_request};
name_request,
thread_proto_indices_};

iteration_info.thread_proto_indices.clear();
iteration_info.thread_proto_indices.push_back(iteration_info.encoder.size());

auto cb = [&iteration_info](const ThreadInfo& thread_info) {
if (!iteration_info.name.empty() && thread_info.thread_name().has_value()) {
Expand All @@ -118,11 +130,15 @@ void ThreadSnapshotService::GetPeakStackUsage(
iteration_info.name.begin())) {
iteration_info.status.Update(
ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
iteration_info.thread_proto_indices.push_back(
iteration_info.encoder.size());
return false;
}
} else {
iteration_info.status.Update(
ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
iteration_info.thread_proto_indices.push_back(
iteration_info.encoder.size());
}
return iteration_info.status.ok();
};
Expand All @@ -134,14 +150,35 @@ void ThreadSnapshotService::GetPeakStackUsage(

Status status;
if (iteration_info.encoder.size() && iteration_info.status.ok()) {
status = response_writer.Write(iteration_info.encoder);
if (status != OkStatus()) {
PW_LOG_ERROR(
"Failed to send response with status code %d, packet may be too "
"large to send",
status.code());
// Must subtract 1 because the last boundary index of thread_proto_indices
// is the end of the last submessage, and NOT the start of another.
size_t last_start_index = iteration_info.thread_proto_indices.size() - 1;
for (size_t i = 0; i < last_start_index; i += num_bundled_threads_) {
const size_t num_threads =
std::min(num_bundled_threads_, last_start_index - i);

// Sending out a bundle of threads at a time.
const size_t bundle_size =
iteration_info.thread_proto_indices[i + num_threads] -
iteration_info.thread_proto_indices[i];

ConstByteSpan thread =
ConstByteSpan(iteration_info.encoder.data() +
iteration_info.thread_proto_indices[i],
bundle_size);

if (bundle_size) {
status.Update(response_writer.Write(thread));
}
if (!status.ok()) {
PW_LOG_ERROR(
"Failed to send response with error code %d, packet may be too "
"large to send",
status.code());
}
}
}

if (response_writer.Finish(status) != OkStatus()) {
PW_LOG_ERROR(
"Failed to close stream for GetPeakStackUsage() with error code %d",
Expand Down

0 comments on commit 413cc71

Please sign in to comment.