pw_thread: Update service with breaking apart responses
Requires: pigweed-internal:31501
Change-Id: I29205f00b889b2c07d85fd3f419f236f14c3e1a4
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/109256
Commit-Queue: Medha Kini <medhakini@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
diff --git a/pw_system/py/pw_system/device.py b/pw_system/py/pw_system/device.py
index f757544..f80aa04 100644
--- a/pw_system/py/pw_system/device.py
+++ b/pw_system/py/pw_system/device.py
@@ -26,6 +26,7 @@
from pw_rpc import callback_client, console_tools
from pw_status import Status
from pw_thread.thread_analyzer import ThreadSnapshotAnalyzer
+from pw_thread_protos import thread_pb2
from pw_tokenizer import detokenize
from pw_tokenizer.proto import decode_optionally_tokenized
import pw_unit_test.rpc
@@ -190,8 +191,13 @@
print_metrics(metrics, '')
return metrics
- def snapshot_peak_stack_usage(self):
- _, rsp = self.rpcs.pw.thread.ThreadSnapshotService.GetPeakStackUsage()
- for thread_info in rsp:
- for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
- _LOG.info('%s', line)
+ def snapshot_peak_stack_usage(self, thread_name: str = None):
+ _, rsp = self.rpcs.pw.thread.ThreadSnapshotService \
+ .GetPeakStackUsage(name=thread_name)
+
+ thread_info = thread_pb2.SnapshotThreadInfo()
+ for thread_info_block in rsp:
+ for thread in thread_info_block.threads:
+ thread_info.threads.append(thread)
+ for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
+ _LOG.info('%s', line)
diff --git a/pw_system/thread_snapshot_service.cc b/pw_system/thread_snapshot_service.cc
index 5b82eef..3f40492 100644
--- a/pw_system/thread_snapshot_service.cc
+++ b/pw_system/thread_snapshot_service.cc
@@ -19,11 +19,7 @@
namespace pw::system {
namespace {
-constexpr size_t kEncodeBufferSize = thread::RequiredServiceBufferSize();
-
-std::array<std::byte, kEncodeBufferSize> encode_buffer;
-
-thread::ThreadSnapshotService system_thread_snapshot_service(encode_buffer);
+thread::ThreadSnapshotServiceBuilder<> system_thread_snapshot_service;
} // namespace
diff --git a/pw_thread/docs.rst b/pw_thread/docs.rst
index 8403f0b..7581bbc 100644
--- a/pw_thread/docs.rst
+++ b/pw_thread/docs.rst
@@ -484,10 +484,11 @@
=================
To expose a ``ThreadSnapshotService`` in your application, do the following:
-1. Instantiate a buffer with specified size (See ``RequiredServiceBufferSize``
-below for more information on buffer size calculation).
-2. Create an instance of ``pw::thread::ThreadSnapshotService``.
-3. Register the service with your RPC server.
+1. Create an instance of ``pw::thread::ThreadSnapshotServiceBuilder``. This is a
+ template class that takes in the number of threads as at template, or
+ defaults this value to ``PW_THREAD_MAXIMUM_THREADS`` if no argument is
+ provided.
+2. Register the service with your RPC server.
For example:
@@ -502,14 +503,8 @@
};
Server server(channels);
- // Calculate encode buffer size, defaults to `PW_THREAD_MAXIMUM_THREADS`
- // if no argument is provided.
- constexpr size_t kEncodeBufferSize =
- pw::thread::RequiredBufferSize(/* number of threads */);
- std::array<std::byte, kEncodeBufferSize> encode_buffer;
-
- // Thread snapshot service instance.
- pw::thread::ThreadSnapshotService thread_snapshot_service(encode_buffer);
+ // Thread snapshot service builder instance.
+ thread::ThreadSnapshotServiceBuilder</*num threads*/> thread_snapshot_service;
void RegisterServices() {
server.RegisterService(thread_snapshot_service);
diff --git a/pw_thread/public/pw_thread/config.h b/pw_thread/public/pw_thread/config.h
index ca06bf5..7b83a7e 100644
--- a/pw_thread/public/pw_thread/config.h
+++ b/pw_thread/public/pw_thread/config.h
@@ -22,3 +22,8 @@
#ifndef PW_THREAD_MAXIMUM_THREADS
#define PW_THREAD_MAXIMUM_THREADS 10
#endif // PW_THREAD_MAXIMUM_THREADS
+
+// The max number of threads to bundle by default for thread snapshot service.
+#ifndef PW_THREAD_NUM_BUNDLED_THREADS
+#define PW_THREAD_NUM_BUNDLED_THREADS 3
+#endif // PW_THREAD_MAXIMUM_THREADS
\ No newline at end of file
diff --git a/pw_thread/public/pw_thread/thread_snapshot_service.h b/pw_thread/public/pw_thread/thread_snapshot_service.h
index 74c6f6f..b8926f9 100644
--- a/pw_thread/public/pw_thread/thread_snapshot_service.h
+++ b/pw_thread/public/pw_thread/thread_snapshot_service.h
@@ -27,8 +27,9 @@
Status ProtoEncodeThreadInfo(SnapshotThreadInfo::MemoryEncoder& encoder,
const ThreadInfo& thread_info);
+// Calculates encoded buffer size based on code gen constants.
constexpr size_t RequiredServiceBufferSize(
- const size_t num_threads = PW_THREAD_MAXIMUM_THREADS) {
+ size_t num_threads = PW_THREAD_MAXIMUM_THREADS) {
constexpr size_t kSizeOfResponse =
SnapshotThreadInfo::kMaxEncodedSizeBytes + Thread::kMaxEncodedSizeBytes;
return kSizeOfResponse * num_threads;
@@ -36,17 +37,49 @@
// The ThreadSnapshotService will return peak stack usage across running
// threads when requested by GetPeak().
-class ThreadSnapshotService final
+//
+// Parameter encode_buffer: buffer where thread information is encoded. Size
+// depends on RequiredBufferSize().
+//
+// Parameter thread_proto_indices: array keeping track of thread boundaries in
+// the encode buffer. The service uses these indices to send response data out
+// in bundles.
+//
+// Parameter num_bundled_threads: constant describing number of threads per
+// bundle in response.
+class ThreadSnapshotService
: public pw_rpc::raw::ThreadSnapshotService::Service<
ThreadSnapshotService> {
public:
- ThreadSnapshotService(span<std::byte> encode_buffer)
- : encode_buffer_(encode_buffer) {}
-
+ constexpr ThreadSnapshotService(
+ span<std::byte> encode_buffer,
+ Vector<size_t>& thread_proto_indices,
+ size_t num_bundled_threads = PW_THREAD_NUM_BUNDLED_THREADS)
+ : encode_buffer_(encode_buffer),
+ thread_proto_indices_(thread_proto_indices),
+ num_bundled_threads_(num_bundled_threads) {}
void GetPeakStackUsage(ConstByteSpan request, rpc::RawServerWriter& response);
private:
span<std::byte> encode_buffer_;
+ Vector<size_t>& thread_proto_indices_;
+ size_t num_bundled_threads_;
+};
+
+// A ThreadSnapshotService that allocates required buffers based on the
+// number of running threads on a device.
+template <size_t kNumThreads = PW_THREAD_MAXIMUM_THREADS>
+class ThreadSnapshotServiceBuilder : public ThreadSnapshotService {
+ public:
+ ThreadSnapshotServiceBuilder()
+ : ThreadSnapshotService(encode_buffer_, thread_proto_indices_) {}
+
+ private:
+ std::array<std::byte, thread::RequiredServiceBufferSize(kNumThreads)>
+ encode_buffer_;
+ // + 1 is needed to account for extra index that comes with the first
+ // submessage start or the last submessage end.
+ Vector<size_t, kNumThreads + 1> thread_proto_indices_;
};
} // namespace pw::thread
diff --git a/pw_thread/thread_snapshot_service.cc b/pw_thread/thread_snapshot_service.cc
index ea72034..df87eac 100644
--- a/pw_thread/thread_snapshot_service.cc
+++ b/pw_thread/thread_snapshot_service.cc
@@ -14,6 +14,7 @@
#include "pw_thread/thread_snapshot_service.h"
+#include "pw_containers/vector.h"
#include "pw_log/log.h"
#include "pw_protobuf/decoder.h"
#include "pw_rpc/raw/server_reader_writer.h"
@@ -99,17 +100,28 @@
SnapshotThreadInfo::MemoryEncoder encoder;
Status status;
ConstByteSpan name;
+
+ // For sending out data by chunks.
+ Vector<size_t>& thread_proto_indices;
};
ConstByteSpan name_request;
if (!request.empty()) {
- DecodeThreadName(request, name_request);
+ Status status = DecodeThreadName(request, name_request);
+ if (!status.ok()) {
+ PW_LOG_ERROR("Service unable to decode thread name with error code %d",
+ status.code());
+ }
}
IterationInfo iteration_info{
SnapshotThreadInfo::MemoryEncoder(encode_buffer_),
OkStatus(),
- name_request};
+ name_request,
+ thread_proto_indices_};
+
+ iteration_info.thread_proto_indices.clear();
+ iteration_info.thread_proto_indices.push_back(iteration_info.encoder.size());
auto cb = [&iteration_info](const ThreadInfo& thread_info) {
if (!iteration_info.name.empty() && thread_info.thread_name().has_value()) {
@@ -118,11 +130,15 @@
iteration_info.name.begin())) {
iteration_info.status.Update(
ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
+ iteration_info.thread_proto_indices.push_back(
+ iteration_info.encoder.size());
return false;
}
} else {
iteration_info.status.Update(
ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
+ iteration_info.thread_proto_indices.push_back(
+ iteration_info.encoder.size());
}
return iteration_info.status.ok();
};
@@ -134,14 +150,35 @@
Status status;
if (iteration_info.encoder.size() && iteration_info.status.ok()) {
- status = response_writer.Write(iteration_info.encoder);
- if (status != OkStatus()) {
- PW_LOG_ERROR(
- "Failed to send response with status code %d, packet may be too "
- "large to send",
- status.code());
+ // Must subtract 1 because the last boundary index of thread_proto_indices
+ // is the end of the last submessage, and NOT the start of another.
+ size_t last_start_index = iteration_info.thread_proto_indices.size() - 1;
+ for (size_t i = 0; i < last_start_index; i += num_bundled_threads_) {
+ const size_t num_threads =
+ std::min(num_bundled_threads_, last_start_index - i);
+
+ // Sending out a bundle of threads at a time.
+ const size_t bundle_size =
+ iteration_info.thread_proto_indices[i + num_threads] -
+ iteration_info.thread_proto_indices[i];
+
+ ConstByteSpan thread =
+ ConstByteSpan(iteration_info.encoder.data() +
+ iteration_info.thread_proto_indices[i],
+ bundle_size);
+
+ if (bundle_size) {
+ status.Update(response_writer.Write(thread));
+ }
+ if (!status.ok()) {
+ PW_LOG_ERROR(
+ "Failed to send response with error code %d, packet may be too "
+ "large to send",
+ status.code());
+ }
}
}
+
if (response_writer.Finish(status) != OkStatus()) {
PW_LOG_ERROR(
"Failed to close stream for GetPeakStackUsage() with error code %d",