pw_thread: Update service with breaking apart responses

Requires: pigweed-internal:31501
Change-Id: I29205f00b889b2c07d85fd3f419f236f14c3e1a4
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/109256
Commit-Queue: Medha Kini <medhakini@google.com>
Reviewed-by: Armando Montanez <amontanez@google.com>
diff --git a/pw_system/py/pw_system/device.py b/pw_system/py/pw_system/device.py
index f757544..f80aa04 100644
--- a/pw_system/py/pw_system/device.py
+++ b/pw_system/py/pw_system/device.py
@@ -26,6 +26,7 @@
 from pw_rpc import callback_client, console_tools
 from pw_status import Status
 from pw_thread.thread_analyzer import ThreadSnapshotAnalyzer
+from pw_thread_protos import thread_pb2
 from pw_tokenizer import detokenize
 from pw_tokenizer.proto import decode_optionally_tokenized
 import pw_unit_test.rpc
@@ -190,8 +191,13 @@
         print_metrics(metrics, '')
         return metrics
 
-    def snapshot_peak_stack_usage(self):
-        _, rsp = self.rpcs.pw.thread.ThreadSnapshotService.GetPeakStackUsage()
-        for thread_info in rsp:
-            for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
-                _LOG.info('%s', line)
+    def snapshot_peak_stack_usage(self, thread_name: str = None):
+        _, rsp = self.rpcs.pw.thread.ThreadSnapshotService \
+        .GetPeakStackUsage(name=thread_name)
+
+        thread_info = thread_pb2.SnapshotThreadInfo()
+        for thread_info_block in rsp:
+            for thread in thread_info_block.threads:
+                thread_info.threads.append(thread)
+        for line in str(ThreadSnapshotAnalyzer(thread_info)).splitlines():
+            _LOG.info('%s', line)
diff --git a/pw_system/thread_snapshot_service.cc b/pw_system/thread_snapshot_service.cc
index 5b82eef..3f40492 100644
--- a/pw_system/thread_snapshot_service.cc
+++ b/pw_system/thread_snapshot_service.cc
@@ -19,11 +19,7 @@
 namespace pw::system {
 namespace {
 
-constexpr size_t kEncodeBufferSize = thread::RequiredServiceBufferSize();
-
-std::array<std::byte, kEncodeBufferSize> encode_buffer;
-
-thread::ThreadSnapshotService system_thread_snapshot_service(encode_buffer);
+thread::ThreadSnapshotServiceBuilder<> system_thread_snapshot_service;
 
 }  // namespace
 
diff --git a/pw_thread/docs.rst b/pw_thread/docs.rst
index 8403f0b..7581bbc 100644
--- a/pw_thread/docs.rst
+++ b/pw_thread/docs.rst
@@ -484,10 +484,11 @@
 =================
 To expose a ``ThreadSnapshotService`` in your application, do the following:
 
-1. Instantiate a buffer with specified size (See ``RequiredServiceBufferSize``
-below for more information on buffer size calculation).
-2. Create an instance of ``pw::thread::ThreadSnapshotService``.
-3. Register the service with your RPC server.
+1. Create an instance of ``pw::thread::ThreadSnapshotServiceBuilder``. This is a
+   template class that takes in the number of threads as at template, or
+   defaults this value to ``PW_THREAD_MAXIMUM_THREADS`` if no argument is
+   provided.
+2. Register the service with your RPC server.
 
 For example:
 
@@ -502,14 +503,8 @@
    };
    Server server(channels);
 
-   // Calculate encode buffer size, defaults to `PW_THREAD_MAXIMUM_THREADS`
-   // if no argument is provided.
-   constexpr size_t kEncodeBufferSize =
-     pw::thread::RequiredBufferSize(/* number of threads */);
-   std::array<std::byte, kEncodeBufferSize> encode_buffer;
-
-   // Thread snapshot service instance.
-   pw::thread::ThreadSnapshotService thread_snapshot_service(encode_buffer);
+  // Thread snapshot service builder instance.
+  thread::ThreadSnapshotServiceBuilder</*num threads*/> thread_snapshot_service;
 
    void RegisterServices() {
      server.RegisterService(thread_snapshot_service);
diff --git a/pw_thread/public/pw_thread/config.h b/pw_thread/public/pw_thread/config.h
index ca06bf5..7b83a7e 100644
--- a/pw_thread/public/pw_thread/config.h
+++ b/pw_thread/public/pw_thread/config.h
@@ -22,3 +22,8 @@
 #ifndef PW_THREAD_MAXIMUM_THREADS
 #define PW_THREAD_MAXIMUM_THREADS 10
 #endif  // PW_THREAD_MAXIMUM_THREADS
+
+// The max number of threads to bundle by default for thread snapshot service.
+#ifndef PW_THREAD_NUM_BUNDLED_THREADS
+#define PW_THREAD_NUM_BUNDLED_THREADS 3
+#endif  // PW_THREAD_MAXIMUM_THREADS
\ No newline at end of file
diff --git a/pw_thread/public/pw_thread/thread_snapshot_service.h b/pw_thread/public/pw_thread/thread_snapshot_service.h
index 74c6f6f..b8926f9 100644
--- a/pw_thread/public/pw_thread/thread_snapshot_service.h
+++ b/pw_thread/public/pw_thread/thread_snapshot_service.h
@@ -27,8 +27,9 @@
 Status ProtoEncodeThreadInfo(SnapshotThreadInfo::MemoryEncoder& encoder,
                              const ThreadInfo& thread_info);
 
+// Calculates encoded buffer size based on code gen constants.
 constexpr size_t RequiredServiceBufferSize(
-    const size_t num_threads = PW_THREAD_MAXIMUM_THREADS) {
+    size_t num_threads = PW_THREAD_MAXIMUM_THREADS) {
   constexpr size_t kSizeOfResponse =
       SnapshotThreadInfo::kMaxEncodedSizeBytes + Thread::kMaxEncodedSizeBytes;
   return kSizeOfResponse * num_threads;
@@ -36,17 +37,49 @@
 
 // The ThreadSnapshotService will return peak stack usage across running
 // threads when requested by GetPeak().
-class ThreadSnapshotService final
+//
+// Parameter encode_buffer: buffer where thread information is encoded. Size
+// depends on RequiredBufferSize().
+//
+// Parameter thread_proto_indices: array keeping track of thread boundaries in
+// the encode buffer. The service uses these indices to send response data out
+// in bundles.
+//
+// Parameter num_bundled_threads: constant describing number of threads per
+// bundle in response.
+class ThreadSnapshotService
     : public pw_rpc::raw::ThreadSnapshotService::Service<
           ThreadSnapshotService> {
  public:
-  ThreadSnapshotService(span<std::byte> encode_buffer)
-      : encode_buffer_(encode_buffer) {}
-
+  constexpr ThreadSnapshotService(
+      span<std::byte> encode_buffer,
+      Vector<size_t>& thread_proto_indices,
+      size_t num_bundled_threads = PW_THREAD_NUM_BUNDLED_THREADS)
+      : encode_buffer_(encode_buffer),
+        thread_proto_indices_(thread_proto_indices),
+        num_bundled_threads_(num_bundled_threads) {}
   void GetPeakStackUsage(ConstByteSpan request, rpc::RawServerWriter& response);
 
  private:
   span<std::byte> encode_buffer_;
+  Vector<size_t>& thread_proto_indices_;
+  size_t num_bundled_threads_;
+};
+
+// A ThreadSnapshotService that allocates required buffers based on the
+// number of running threads on a device.
+template <size_t kNumThreads = PW_THREAD_MAXIMUM_THREADS>
+class ThreadSnapshotServiceBuilder : public ThreadSnapshotService {
+ public:
+  ThreadSnapshotServiceBuilder()
+      : ThreadSnapshotService(encode_buffer_, thread_proto_indices_) {}
+
+ private:
+  std::array<std::byte, thread::RequiredServiceBufferSize(kNumThreads)>
+      encode_buffer_;
+  // + 1 is needed to account for extra index that comes with the first
+  // submessage start or the last submessage end.
+  Vector<size_t, kNumThreads + 1> thread_proto_indices_;
 };
 
 }  // namespace pw::thread
diff --git a/pw_thread/thread_snapshot_service.cc b/pw_thread/thread_snapshot_service.cc
index ea72034..df87eac 100644
--- a/pw_thread/thread_snapshot_service.cc
+++ b/pw_thread/thread_snapshot_service.cc
@@ -14,6 +14,7 @@
 
 #include "pw_thread/thread_snapshot_service.h"
 
+#include "pw_containers/vector.h"
 #include "pw_log/log.h"
 #include "pw_protobuf/decoder.h"
 #include "pw_rpc/raw/server_reader_writer.h"
@@ -99,17 +100,28 @@
     SnapshotThreadInfo::MemoryEncoder encoder;
     Status status;
     ConstByteSpan name;
+
+    // For sending out data by chunks.
+    Vector<size_t>& thread_proto_indices;
   };
 
   ConstByteSpan name_request;
   if (!request.empty()) {
-    DecodeThreadName(request, name_request);
+    Status status = DecodeThreadName(request, name_request);
+    if (!status.ok()) {
+      PW_LOG_ERROR("Service unable to decode thread name with error code %d",
+                   status.code());
+    }
   }
 
   IterationInfo iteration_info{
       SnapshotThreadInfo::MemoryEncoder(encode_buffer_),
       OkStatus(),
-      name_request};
+      name_request,
+      thread_proto_indices_};
+
+  iteration_info.thread_proto_indices.clear();
+  iteration_info.thread_proto_indices.push_back(iteration_info.encoder.size());
 
   auto cb = [&iteration_info](const ThreadInfo& thread_info) {
     if (!iteration_info.name.empty() && thread_info.thread_name().has_value()) {
@@ -118,11 +130,15 @@
                      iteration_info.name.begin())) {
         iteration_info.status.Update(
             ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
+        iteration_info.thread_proto_indices.push_back(
+            iteration_info.encoder.size());
         return false;
       }
     } else {
       iteration_info.status.Update(
           ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
+      iteration_info.thread_proto_indices.push_back(
+          iteration_info.encoder.size());
     }
     return iteration_info.status.ok();
   };
@@ -134,14 +150,35 @@
 
   Status status;
   if (iteration_info.encoder.size() && iteration_info.status.ok()) {
-    status = response_writer.Write(iteration_info.encoder);
-    if (status != OkStatus()) {
-      PW_LOG_ERROR(
-          "Failed to send response with status code %d, packet may be too "
-          "large to send",
-          status.code());
+    // Must subtract 1 because the last boundary index of thread_proto_indices
+    // is the end of the last submessage, and NOT the start of another.
+    size_t last_start_index = iteration_info.thread_proto_indices.size() - 1;
+    for (size_t i = 0; i < last_start_index; i += num_bundled_threads_) {
+      const size_t num_threads =
+          std::min(num_bundled_threads_, last_start_index - i);
+
+      // Sending out a bundle of threads at a time.
+      const size_t bundle_size =
+          iteration_info.thread_proto_indices[i + num_threads] -
+          iteration_info.thread_proto_indices[i];
+
+      ConstByteSpan thread =
+          ConstByteSpan(iteration_info.encoder.data() +
+                            iteration_info.thread_proto_indices[i],
+                        bundle_size);
+
+      if (bundle_size) {
+        status.Update(response_writer.Write(thread));
+      }
+      if (!status.ok()) {
+        PW_LOG_ERROR(
+            "Failed to send response with error code %d, packet may be too "
+            "large to send",
+            status.code());
+      }
     }
   }
+
   if (response_writer.Finish(status) != OkStatus()) {
     PW_LOG_ERROR(
         "Failed to close stream for GetPeakStackUsage() with error code %d",