pw_log & pw_log_rpc: use dropped field count

- Use the dropped field to notify RPC clients of log drops and avoid the
extra memory cost that may be added by the string builder.
- Refactor unit tests to improve parsing log entries.

Change-Id: I409995598cb4453c46ca4f04259da5ee9d3ac99c
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/67060
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_log/log.proto b/pw_log/log.proto
index 988f34a..f329ca3 100644
--- a/pw_log/log.proto
+++ b/pw_log/log.proto
@@ -126,20 +126,15 @@
     int64 time_since_last_entry = 5;
   }
 
+  // When the log buffers are full but more logs come in, the logs are counted
+  // and a special log message is omitted with only counts for the number of
+  // messages dropped.
+  optional uint32 dropped = 6;
+
   // The following fields are planned but will not be added until they are
   // needed. Protobuf field numbers over 15 use an extra byte, so these fields
   // are left out for now to avoid reserving field numbers unnecessarily.
 
-  // When the log buffers are full but more logs come in, the logs are counted
-  // and a special log message is omitted with only counts for the number of
-  // messages dropped. The timestamp indicates the time that the "missed logs"
-  // message was inserted into the queue.
-  //
-  // As an alternative to these fields, implementations may simply send a
-  // message stating the drop count.
-  // optional uint32 dropped = ?;
-  // optional uint32 dropped_warning_or_above = ?;
-
   // Represents the device from which the log originated. The meaning of this
   // field is implementation defined
   // optional uint32 source_id = ?;
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index c5f411c..f1db98c 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -51,7 +51,6 @@
         "//pw_protobuf",
         "//pw_result",
         "//pw_status",
-        "//pw_string",
         "//pw_sync:lock_annotations",
         "//pw_sync:mutex",
     ],
@@ -88,7 +87,6 @@
         "//pw_result",
         "//pw_rpc/raw:test_method_context",
         "//pw_status",
-        "//pw_string",
         "//pw_unit_test",
     ],
 )
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 5e15a1d..cbe304a 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -44,10 +44,6 @@
     "public/pw_log_rpc/rpc_log_drain_map.h",
   ]
   sources = [ "rpc_log_drain.cc" ]
-  deps = [
-    "$dir_pw_log",
-    "$dir_pw_string",
-  ]
   public_deps = [
     "$dir_pw_assert",
     "$dir_pw_log:protos.pwpb",
@@ -89,7 +85,6 @@
     "$dir_pw_result",
     "$dir_pw_rpc/raw:test_method_context",
     "$dir_pw_status",
-    "$dir_pw_string",
   ]
 }
 
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst
index b3db664..d93083c 100644
--- a/pw_log_rpc/docs.rst
+++ b/pw_log_rpc/docs.rst
@@ -118,7 +118,8 @@
 ``MultiSink`` while also accounting for the interface's Maximum Transmission
 Unit (MTU). If the ``RpcLogDrain`` finds a drop message count as it reads the
 ``MultiSink`` it will insert a message in the stream with the drop message
-count.
+count in the log proto dropped optional field. The receiving end can display the
+count with the logs if desired.
 
 RpcLogDrainMap
 ==============
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 9c04caf..e3a9259 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -24,12 +24,12 @@
 #include "pw_log/log.h"
 #include "pw_log/proto/log.pwpb.h"
 #include "pw_log/proto_utils.h"
+#include "pw_log_tokenized/metadata.h"
 #include "pw_protobuf/decoder.h"
 #include "pw_result/result.h"
 #include "pw_rpc/channel.h"
 #include "pw_rpc/raw/fake_channel_output.h"
 #include "pw_rpc/raw/test_method_context.h"
-#include "pw_string/string_builder.h"
 #include "pw_sync/mutex.h"
 
 namespace pw::log_rpc {
@@ -41,9 +41,9 @@
   PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 6, 128)
 
 constexpr size_t kMaxMessageSize = 50;
-static_assert(RpcLogDrain::kMaxDropMessageSize < kMaxMessageSize);
 constexpr size_t kMaxLogEntrySize =
     RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize;
+static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
 constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
 constexpr size_t kMaxDrains = 3;
 constexpr char kMessage[] = "message";
@@ -52,8 +52,13 @@
 constexpr char kLongMessage[] =
     "This is a long log message that will be dropped.";
 static_assert(sizeof(kLongMessage) < kMaxMessageSize);
-static_assert(sizeof(kLongMessage) > RpcLogDrain::kMaxDropMessageSize);
+static_assert(sizeof(kLongMessage) > RpcLogDrain::kMinEntryBufferSize);
 std::array<std::byte, 1> rpc_request_buffer;
+constexpr auto kSampleMetadata =
+    log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
+constexpr auto kDropMessageMetadata =
+    log_tokenized::Metadata::Set<0, 0, 0, 0>();
+constexpr int64_t kSampleTimestamp = 1000;
 
 // `LogServiceTest` sets up a logging environment for testing with a `MultiSink`
 // for log entries, and multiple `RpcLogDrain`s for consuming such log entries.
@@ -68,22 +73,26 @@
     }
   }
 
-  void AddLogEntries(size_t log_count, std::string_view message) {
+  void AddLogEntries(size_t log_count,
+                     std::string_view message,
+                     log_tokenized::Metadata metadata,
+                     int64_t timestamp) {
     for (size_t i = 0; i < log_count; ++i) {
-      AddLogEntry(message);
+      ASSERT_TRUE(AddLogEntry(message, metadata, timestamp).ok());
     }
   }
 
-  void AddLogEntry(std::string_view message) {
-    auto metadata =
-        log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>();
+  StatusWithSize AddLogEntry(std::string_view message,
+                             log_tokenized::Metadata metadata,
+                             int64_t timestamp) {
     Result<ConstByteSpan> encoded_log_result =
         log::EncodeTokenizedLog(metadata,
                                 std::as_bytes(std::span(message)),
-                                /*ticks_since_epoch=*/0,
+                                timestamp,
                                 entry_encode_buffer_);
-    EXPECT_EQ(encoded_log_result.status(), OkStatus());
+    PW_TRY_WITH_SIZE(encoded_log_result.status());
     multisink_.HandleEntry(encoded_log_result.value());
+    return StatusWithSize(encoded_log_result.value().size());
   }
 
  protected:
@@ -116,74 +125,80 @@
                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
   };
 };
+struct TestLogEntry {
+  log_tokenized::Metadata metadata = kSampleMetadata;
+  int64_t timestamp = 0;
+  uint32_t dropped = 0;
+  ConstByteSpan tokenized_data = {};
+};
 
 // Unpacks a `LogEntry` proto buffer and compares it with the expected data.
 void VerifyLogEntry(protobuf::Decoder& entry_decoder,
-                    log_tokenized::Metadata expected_metadata,
-                    ConstByteSpan expected_tokenized_data,
-                    const int64_t expected_timestamp) {
+                    const TestLogEntry& expected_entry) {
   ConstByteSpan tokenized_data;
-  EXPECT_TRUE(entry_decoder.Next().ok());  // message [tokenized]
-  EXPECT_EQ(1U, entry_decoder.FieldNumber());
-  EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
-  if (tokenized_data.size() != expected_tokenized_data.size()) {
-    PW_LOG_ERROR(
-        "actual: '%s', expected: '%s'",
-        reinterpret_cast<const char*>(tokenized_data.begin()),
-        reinterpret_cast<const char*>(expected_tokenized_data.begin()));
+  if (!expected_entry.tokenized_data.empty()) {
+    ASSERT_EQ(entry_decoder.Next(), OkStatus());
+    ASSERT_EQ(entry_decoder.FieldNumber(), 1u);  // message [tokenized]
+    ASSERT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
+    if (tokenized_data.size() != expected_entry.tokenized_data.size()) {
+      PW_LOG_ERROR(
+          "actual: '%s', expected: '%s'",
+          reinterpret_cast<const char*>(tokenized_data.begin()),
+          reinterpret_cast<const char*>(expected_entry.tokenized_data.begin()));
+    }
+    EXPECT_EQ(tokenized_data.size(), expected_entry.tokenized_data.size());
+    EXPECT_EQ(std::memcmp(tokenized_data.begin(),
+                          expected_entry.tokenized_data.begin(),
+                          expected_entry.tokenized_data.size()),
+              0);
   }
-  EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size());
-  EXPECT_EQ(std::memcmp(tokenized_data.begin(),
-                        expected_tokenized_data.begin(),
-                        expected_tokenized_data.size()),
-            0);
-
-  uint32_t line_level;
-  EXPECT_TRUE(entry_decoder.Next().ok());  // line_level
-  EXPECT_EQ(2U, entry_decoder.FieldNumber());
-  EXPECT_TRUE(entry_decoder.ReadUint32(&line_level).ok());
-  EXPECT_EQ(expected_metadata.level(), line_level & PW_LOG_LEVEL_BITMASK);
-  EXPECT_EQ(expected_metadata.line_number(),
-            (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS);
-
-  if (expected_metadata.flags() != 0) {
+  if (expected_entry.metadata.level()) {
+    ASSERT_EQ(entry_decoder.Next(), OkStatus());
+    ASSERT_EQ(entry_decoder.FieldNumber(), 2u);  // line_level
+    uint32_t line_level;
+    ASSERT_TRUE(entry_decoder.ReadUint32(&line_level).ok());
+    EXPECT_EQ(expected_entry.metadata.level(),
+              line_level & PW_LOG_LEVEL_BITMASK);
+    EXPECT_EQ(expected_entry.metadata.line_number(),
+              (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS);
+  }
+  if (expected_entry.metadata.flags()) {
+    ASSERT_EQ(entry_decoder.Next(), OkStatus());
+    ASSERT_EQ(entry_decoder.FieldNumber(), 3u);  // flags
     uint32_t flags;
-    EXPECT_TRUE(entry_decoder.Next().ok());  // flags
-    EXPECT_EQ(3U, entry_decoder.FieldNumber());
-    EXPECT_TRUE(entry_decoder.ReadUint32(&flags).ok());
-    EXPECT_EQ(expected_metadata.flags(), flags);
+    ASSERT_TRUE(entry_decoder.ReadUint32(&flags).ok());
+    EXPECT_EQ(expected_entry.metadata.flags(), flags);
   }
-
-  const bool has_timestamp = entry_decoder.Next().ok();  // timestamp
-  if (expected_timestamp == 0 && !has_timestamp) {
-    return;
+  if (expected_entry.timestamp) {
+    ASSERT_EQ(entry_decoder.Next(), OkStatus());
+    ASSERT_TRUE(entry_decoder.FieldNumber() == 4u       // timestamp
+                || entry_decoder.FieldNumber() == 5u);  // time_since_last_entry
+    int64_t timestamp;
+    ASSERT_TRUE(entry_decoder.ReadInt64(&timestamp).ok());
+    EXPECT_EQ(expected_entry.timestamp, timestamp);
   }
-  int64_t timestamp;
-  EXPECT_TRUE(has_timestamp);
-  EXPECT_EQ(4U, entry_decoder.FieldNumber());
-  EXPECT_TRUE(entry_decoder.ReadInt64(&timestamp).ok());
-  EXPECT_EQ(expected_timestamp, timestamp);
+  if (expected_entry.dropped) {
+    ASSERT_EQ(entry_decoder.Next(), OkStatus());
+    ASSERT_EQ(entry_decoder.FieldNumber(), 6u);  // dropped
+    uint32_t dropped;
+    ASSERT_TRUE(entry_decoder.ReadUint32(&dropped).ok());
+    EXPECT_EQ(expected_entry.dropped, dropped);
+  }
 }
 
 // Verifies a stream of log entries, returning the total count found.
 size_t VerifyLogEntries(protobuf::Decoder& entries_decoder,
-                        Vector<ConstByteSpan>& message_stack) {
+                        Vector<TestLogEntry>& expected_entries_stack) {
   size_t entries_found = 0;
   while (entries_decoder.Next().ok()) {
     ConstByteSpan entry;
     EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
     protobuf::Decoder entry_decoder(entry);
-    auto expected_metadata =
-        log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>();
-    if (message_stack.empty()) {
+    if (expected_entries_stack.empty()) {
       break;
     }
-    ConstByteSpan expected_message = message_stack.back();
-    VerifyLogEntry(entry_decoder,
-                   expected_metadata,
-                   expected_message,
-                   /*expected_timestamp=*/0);
-    message_stack.pop_back();
+    VerifyLogEntry(entry_decoder, expected_entries_stack.back());
+    expected_entries_stack.pop_back();
     ++entries_found;
   }
   return entries_found;
@@ -248,7 +263,7 @@
 
   // Add log entries.
   const size_t total_entries = 10;
-  AddLogEntries(total_entries, kMessage);
+  AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
   // Request logs.
   context.call(rpc_request_buffer);
   EXPECT_EQ(active_drain.Flush(), OkStatus());
@@ -263,10 +278,11 @@
   EXPECT_GE(context.responses().size(), 1u);
 
   // Verify data in responses.
-  Vector<ConstByteSpan, total_entries> message_stack;
+  Vector<TestLogEntry, total_entries> message_stack;
   for (size_t i = 0; i < total_entries; ++i) {
-    message_stack.push_back(
-        std::as_bytes(std::span(std::string_view(kMessage))));
+    message_stack.push_back({.timestamp = kSampleTimestamp,
+                             .tokenized_data = std::as_bytes(
+                                 std::span(std::string_view(kMessage)))});
   }
   size_t entries_found = 0;
   for (auto& response : context.responses()) {
@@ -285,7 +301,7 @@
   // Add log entries.
   const size_t total_entries = 5;
   const uint32_t total_drop_count = 2;
-  AddLogEntries(total_entries, kMessage);
+  AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
   multisink_.HandleDropped(total_drop_count);
 
   // Request logs.
@@ -297,13 +313,13 @@
   ASSERT_GE(context.responses().size(), 1u);
 
   // Add create expected messages in a stack to match the order they arrive in.
-  Vector<ConstByteSpan, total_entries + 1> message_stack;
-  StringBuffer<32> message;
-  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
-  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+  Vector<TestLogEntry, total_entries + 1> message_stack;
+  message_stack.push_back(
+      {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
   for (size_t i = 0; i < total_entries; ++i) {
-    message_stack.push_back(
-        std::as_bytes(std::span(std::string_view(kMessage))));
+    message_stack.push_back({.timestamp = kSampleTimestamp,
+                             .tokenized_data = std::as_bytes(
+                                 std::span(std::string_view(kMessage)))});
   }
 
   // Verify data in responses.
@@ -326,7 +342,7 @@
   // Add log entries.
   const size_t total_entries = 5;
   const uint32_t total_drop_count = total_entries;
-  AddLogEntries(total_entries, kLongMessage);
+  AddLogEntries(total_entries, kLongMessage, kSampleMetadata, kSampleTimestamp);
   // Request logs.
   context.call(rpc_request_buffer);
   EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus());
@@ -334,10 +350,9 @@
   ASSERT_EQ(context.status(), OkStatus());
   ASSERT_GE(context.responses().size(), 1u);
 
-  Vector<ConstByteSpan, total_entries + 1> message_stack;
-  StringBuffer<32> message;
-  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
-  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+  Vector<TestLogEntry, total_entries + 1> message_stack;
+  message_stack.push_back(
+      {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
 
   // Verify data in responses.
   size_t entries_found = 0;
@@ -357,7 +372,7 @@
 
   // Add log entries.
   const size_t total_entries = 5;
-  AddLogEntries(total_entries, kMessage);
+  AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
   // Request logs.
   context.call(rpc_request_buffer);
   EXPECT_EQ(detached_drain.Close(), OkStatus());
@@ -366,23 +381,25 @@
 }
 
 TEST_F(LogServiceTest, LargeLogEntry) {
-  const auto expected_metadata =
-      log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
-                                   (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
-                                   (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
-                                   (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>();
-  ConstByteSpan expected_message = std::as_bytes(std::span(kMessage));
-  const int64_t expected_timestamp = std::numeric_limits<int64_t>::max();
+  const TestLogEntry expected_entry{
+      .metadata =
+          log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
+                                       (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
+                                       (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
+                                       (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
+      .timestamp = std::numeric_limits<int64_t>::max(),
+      .tokenized_data = std::as_bytes(std::span(kMessage)),
+  };
 
   // Add entry to multisink.
   log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
-  encoder.WriteMessage(expected_message);
+  encoder.WriteMessage(expected_entry.tokenized_data);
   encoder.WriteLineLevel(
-      (expected_metadata.level() & PW_LOG_LEVEL_BITMASK) |
-      ((expected_metadata.line_number() << PW_LOG_LEVEL_BITS) &
+      (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
+      ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
        ~PW_LOG_LEVEL_BITMASK));
-  encoder.WriteFlags(expected_metadata.flags());
-  encoder.WriteTimestamp(expected_timestamp);
+  encoder.WriteFlags(expected_entry.metadata.flags());
+  encoder.WriteTimestamp(expected_entry.timestamp);
   ASSERT_EQ(encoder.status(), OkStatus());
   multisink_.HandleEntry(encoder);
 
@@ -403,8 +420,7 @@
   ConstByteSpan entry;
   EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
   protobuf::Decoder entry_decoder(entry);
-  VerifyLogEntry(
-      entry_decoder, expected_metadata, expected_message, expected_timestamp);
+  VerifyLogEntry(entry_decoder, expected_entry);
 }
 
 TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
@@ -413,20 +429,30 @@
   ASSERT_TRUE(drain.ok());
 
   LogService log_service(drain_map_);
-  const uint32_t output_buffer_size = 100;
+  const size_t output_buffer_size = 128;
+  const size_t max_packets = 10;
   rpc::RawFakeChannelOutput<10, output_buffer_size, 512> output;
   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
   rpc::Server server(std::span(&channel, 1));
 
   // Add as many entries needed to have multiple packets send.
-  const uint32_t min_packets_sent = 4;
-  const uint32_t max_messages_per_response =
-      output_buffer_size / sizeof(kMessage);
-  const size_t total_entries = min_packets_sent * max_messages_per_response;
-  AddLogEntries(total_entries, kMessage);
+  StatusWithSize status =
+      AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
+  ASSERT_TRUE(status.ok());
+
+  // In reality less than output_buffer_size is given as a buffer, since some
+  // bytes are used for the RPC framing.
+  const uint32_t max_messages_per_response = output_buffer_size / status.size();
+  // Send less packets than the max to avoid crashes.
+  const uint32_t packets_sent = max_packets / 2;
+  const size_t total_entries = packets_sent * max_messages_per_response;
+  const size_t max_entries = 50;
+  // Check we can test all these entries.
+  ASSERT_GE(max_entries, total_entries);
+  AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
 
   // Interrupt log stream with an error.
-  const uint32_t successful_packets_sent = min_packets_sent - 2;
+  const uint32_t successful_packets_sent = packets_sent / 2;
   output.set_send_status(Status::Unavailable(), successful_packets_sent);
 
   // Request logs.
@@ -441,10 +467,11 @@
   ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
 
   // Verify data in responses.
-  Vector<ConstByteSpan, total_entries> message_stack;
+  Vector<TestLogEntry, max_entries> message_stack;
   for (size_t i = 0; i < total_entries; ++i) {
-    message_stack.push_back(
-        std::as_bytes(std::span(std::string_view(kMessage))));
+    message_stack.push_back({.timestamp = kSampleTimestamp,
+                             .tokenized_data = std::as_bytes(
+                                 std::span(std::string_view(kMessage)))});
   }
   size_t entries_found = 0;
   for (auto& response : output.payloads<Logs::Listen>()) {
@@ -469,12 +496,11 @@
   const uint32_t total_drop_count = entries_found / successful_packets_sent;
   const uint32_t remaining_entries = total_entries - total_drop_count;
   for (size_t i = 0; i < remaining_entries; ++i) {
-    message_stack.push_back(
-        std::as_bytes(std::span(std::string_view(kMessage))));
+    message_stack.push_back({.tokenized_data = std::as_bytes(
+                                 std::span(std::string_view(kMessage)))});
   }
-  StringBuffer<32> message;
-  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
-  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+  message_stack.push_back(
+      {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
 
   for (auto& response : output.payloads<Logs::Listen>()) {
     protobuf::Decoder entry_decoder(response);
@@ -490,21 +516,31 @@
   ASSERT_TRUE(drain.ok());
 
   LogService log_service(drain_map_);
-  const uint32_t output_buffer_size = 100;
-  rpc::RawFakeChannelOutput<10, output_buffer_size, 768> output;
+  const size_t output_buffer_size = 50;
+  const size_t max_packets = 20;
+  rpc::RawFakeChannelOutput<max_packets, output_buffer_size, 512> output;
   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
   rpc::Server server(std::span(&channel, 1));
 
   // Add as many entries needed to have multiple packets send.
-  const uint32_t min_packets_sent = 4;
-  const uint32_t max_messages_per_response =
-      output_buffer_size / sizeof(kMessage);
-  const size_t total_entries = min_packets_sent * max_messages_per_response;
-  AddLogEntries(total_entries, kMessage);
+  StatusWithSize status =
+      AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
+  ASSERT_TRUE(status.ok());
+
+  // In reality less than output_buffer_size is given as a buffer, since some
+  // bytes are used for the RPC framing.
+  const uint32_t max_messages_per_response = output_buffer_size / status.size();
+  // Send less packets than the max to avoid crashes.
+  const uint32_t packets_sent = 4;
+  const size_t total_entries = packets_sent * max_messages_per_response;
+  const size_t max_entries = 50;
+  // Check we can test all these entries.q
+  ASSERT_GE(max_entries, total_entries);
+  AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
 
   // Interrupt log stream with an error.
-  const uint32_t error_on_packet_count = min_packets_sent;
-  output.set_send_status(Status::Unavailable(), min_packets_sent);
+  const uint32_t error_on_packet_count = packets_sent / 2;
+  output.set_send_status(Status::Unavailable(), error_on_packet_count);
 
   // Request logs.
   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
@@ -514,8 +550,8 @@
   EXPECT_EQ(drain.value()->Flush(), OkStatus());
   EXPECT_FALSE(output.done());
 
-  // Make some packets were sent.
-  ASSERT_GE(output.payloads<Logs::Listen>().size(), min_packets_sent);
+  // Make sure some packets were sent.
+  ASSERT_GE(output.payloads<Logs::Listen>().size(), packets_sent);
 
   // Verify that not all the entries were sent.
   size_t entries_found = 0;
@@ -534,18 +570,19 @@
       error_on_packet_count * total_drop_count;
   const uint32_t entry_count_after_error =
       entries_found - 1 - entry_count_before_error;
-  Vector<ConstByteSpan, total_entries> message_stack;
+  Vector<TestLogEntry, max_entries> message_stack;
   // Add messages to the stack in the reverse order they are sent.
   for (size_t i = 0; i < entry_count_after_error; ++i) {
-    message_stack.push_back(
-        std::as_bytes(std::span(std::string_view(kMessage))));
+    message_stack.push_back({.timestamp = kSampleTimestamp,
+                             .tokenized_data = std::as_bytes(
+                                 std::span(std::string_view(kMessage)))});
   }
-  StringBuffer<32> message;
-  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
-  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+  message_stack.push_back(
+      {.metadata = kDropMessageMetadata, .dropped = total_drop_count});
   for (size_t i = 0; i < entry_count_before_error; ++i) {
-    message_stack.push_back(
-        std::as_bytes(std::span(std::string_view(kMessage))));
+    message_stack.push_back({.timestamp = kSampleTimestamp,
+                             .tokenized_data = std::as_bytes(
+                                 std::span(std::string_view(kMessage)))});
   }
 
   for (auto& response : output.payloads<Logs::Listen>()) {
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
index cc12014..3c03579 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -52,7 +52,8 @@
 
   // The minimum buffer size, without the message payload, needed to retrieve a
   // log::LogEntry from the attached MultiSink. The user must account for the
-  // max message size to avoid log entry drops.
+  // max message size to avoid log entry drops. The dropped field is not
+  // accounted since a dropped message has all other fields unset.
   static constexpr size_t kMinEntrySizeWithoutPayload =
       // message
       protobuf::SizeOfFieldKey(1) +
@@ -65,13 +66,15 @@
       protobuf::kMaxSizeBytesUint32
       // timestamp or time_since_last_entry
       + protobuf::SizeOfFieldKey(4) + protobuf::kMaxSizeBytesInt64;
-  // Message format to report the drop count.
-  static constexpr char kDropMessageFormatString[] = "Dropped %u";
-  // With a uint32_t number, "Dropped %u" is no more than 18 characters long.
-  static constexpr size_t kMaxDropMessageSize = 18;
-  // The smallest buffer size must be able to fit a drop message.
-  static constexpr size_t kMinEntryBufferSize =
-      kMaxDropMessageSize + kMinEntrySizeWithoutPayload;
+  // The smallest buffer size must be able to fit a typical token size: 4 bytes.
+  static constexpr size_t kMinEntryBufferSize = kMinEntrySizeWithoutPayload + 4;
+
+  // When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize
+  // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize
+  // can be used to calculate the minimum RPC ChannelOutput buffer size.
+  static constexpr size_t kLogEntryEncodeFrameSize =
+      protobuf::SizeOfFieldKey(1)  // LogEntry
+      + protobuf::kMaxSizeOfLength;
 
   // Creates a log stream with the provided open writer. Useful for streaming
   // logs without a request.
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index 3759162..3cfbbff 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -17,28 +17,16 @@
 #include <mutex>
 
 #include "pw_assert/check.h"
-#include "pw_log/log.h"
-#include "pw_string/string_builder.h"
 
 namespace pw::log_rpc {
 namespace {
-// When encoding LogEntry in LogEntries, there are kLogEntryEncodeFrameSize
-// bytes added to the encoded LogEntry.
-constexpr size_t kLogEntryEncodeFrameSize =
-    protobuf::SizeOfFieldKey(1)  // LogEntry
-    + protobuf::kMaxSizeOfLength;
 
 // Creates an encoded drop message on the provided buffer.
 Result<ConstByteSpan> CreateEncodedDropMessage(
     uint32_t drop_count, ByteSpan encoded_drop_message_buffer) {
-  StringBuffer<RpcLogDrain::kMaxDropMessageSize> message;
-  message.Format(RpcLogDrain::kDropMessageFormatString,
-                 static_cast<unsigned int>(drop_count));
-
   // Encode message in protobuf.
   log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
-  encoder.WriteMessage(std::as_bytes(std::span(std::string_view(message))));
-  encoder.WriteLineLevel(PW_LOG_LEVEL_WARN & PW_LOG_LEVEL_BITMASK);
+  encoder.WriteDropped(drop_count);
   PW_TRY(encoder.status());
   return ConstByteSpan(encoder);
 }