pw_log_rpc: Add unit tests with open server writer

Test RPC log streams with provided open server writer.

No-Docs-Update-Reason: added unit tests.
Change-Id: I81acdd59361b1883f25e34fde4e83d5a089f70af
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/61422
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index 27c7221..29048a9 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -94,7 +94,9 @@
         "rpc_log_drain_test.cc",
     ],
     deps = [
+        ":log_service",
         ":rpc_log_drain",
+        "//pw_rpc/raw:test_method_context",
         "//pw_unit_test",
     ],
 )
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 122fccb..4e4189d 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -91,7 +91,11 @@
 
 pw_test("rpc_log_drain_test") {
   sources = [ "rpc_log_drain_test.cc" ]
-  deps = [ ":rpc_log_drain" ]
+  deps = [
+    ":log_service",
+    ":rpc_log_drain",
+    "$dir_pw_rpc/raw:test_method_context",
+  ]
 }
 
 # TODO(cachinchilla): update docs.
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 620ea46..5737c44 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -19,12 +19,15 @@
 #include <limits>
 
 #include "gtest/gtest.h"
+#include "pw_assert/check.h"
 #include "pw_containers/vector.h"
 #include "pw_log/log.h"
 #include "pw_log/proto/log.pwpb.h"
 #include "pw_log/proto_utils.h"
 #include "pw_protobuf/decoder.h"
 #include "pw_result/result.h"
+#include "pw_rpc/channel.h"
+#include "pw_rpc/raw/fake_channel_output.h"
 #include "pw_rpc/raw/test_method_context.h"
 #include "pw_string/string_builder.h"
 #include "pw_sync/mutex.h"
@@ -91,15 +94,17 @@
   std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
   std::array<std::byte, kMaxLogEntrySize> drain_buffer2_;
   std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_;
+  static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
+  static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
   static constexpr uint32_t kSmallBufferDrainId = 3;
   sync::Mutex shared_mutex_;
   std::array<RpcLogDrain, kMaxDrains> drains_{
-      RpcLogDrain(1,
+      RpcLogDrain(kIgnoreWriterErrorsDrainId,
                   drain_buffer1_,
                   shared_mutex_,
                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
       RpcLogDrain(
-          2,
+          kCloseWriterOnErrorDrainId,
           drain_buffer2_,
           shared_mutex_,
           RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
@@ -119,6 +124,12 @@
   EXPECT_TRUE(entry_decoder.Next().ok());  // message [tokenized]
   EXPECT_EQ(1U, entry_decoder.FieldNumber());
   EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
+  if (tokenized_data.size() != expected_tokenized_data.size()) {
+    PW_LOG_ERROR(
+        "actual: '%s', expected: '%s'",
+        reinterpret_cast<const char*>(tokenized_data.begin()),
+        reinterpret_cast<const char*>(expected_tokenized_data.begin()));
+  }
   EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size());
   EXPECT_EQ(std::memcmp(tokenized_data.begin(),
                         expected_tokenized_data.begin(),
@@ -176,6 +187,14 @@
   return entries_found;
 }
 
+size_t CountLogEntries(protobuf::Decoder& entries_decoder) {
+  size_t entries_found = 0;
+  while (entries_decoder.Next().ok()) {
+    ++entries_found;
+  }
+  return entries_found;
+}
+
 TEST_F(LogServiceTest, AssignWriter) {
   // Drains don't have writers.
   for (auto& drain : drain_map_.drains()) {
@@ -386,9 +405,167 @@
       entry_decoder, expected_metadata, expected_message, expected_timestamp);
 }
 
-// TODO(pwbug/469): add tests for an open RawServerWriter that closes or fails
-// while flushing, then on re-open the drain sends a counts. The drain mus have
-// ignore_writer_error disabled.
+TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
+  const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
+  auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
+  ASSERT_TRUE(drain.ok());
+
+  LogService log_service(drain_map_);
+  const uint32_t output_buffer_size = 100;
+  rpc::RawFakeChannelOutput<output_buffer_size, 10> output(
+      rpc::MethodType::kServerStreaming);
+  rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
+  rpc::Server server(std::span(&channel, 1));
+
+  // Add as many entries needed to have multiple packets send.
+  const uint32_t min_packets_sent = 4;
+  const uint32_t max_messages_per_response =
+      output_buffer_size / sizeof(kMessage);
+  const size_t total_entries = min_packets_sent * max_messages_per_response;
+  AddLogEntries(total_entries, kMessage);
+
+  // Interrupt log stream with an error.
+  const uint32_t successful_packets_sent = min_packets_sent - 2;
+  output.set_send_status(Status::Unavailable(), successful_packets_sent);
+
+  // Request logs.
+  rpc::RawServerWriter writer =
+      rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+          server, drain_channel_id, log_service);
+  EXPECT_EQ(drain.value()->Open(writer), OkStatus());
+  // This drain closes on errors.
+  EXPECT_EQ(drain.value()->Flush(), Status::Aborted());
+  EXPECT_TRUE(output.done());
+
+  // Make sure not all packets were sent.
+  ASSERT_EQ(output.total_stream_packets(), successful_packets_sent);
+
+  // Verify data in responses.
+  Vector<ConstByteSpan, total_entries> message_stack;
+  for (size_t i = 0; i < total_entries; ++i) {
+    message_stack.push_back(
+        std::as_bytes(std::span(std::string_view(kMessage))));
+  }
+  size_t entries_found = 0;
+  for (auto& response : output.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += VerifyLogEntries(entry_decoder, message_stack);
+  }
+
+  // Verify that not all the entries were sent.
+  EXPECT_LE(entries_found, total_entries);
+
+  // Reset channel output and resume log stream with a new writer.
+  output.clear();
+  writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+      server, drain_channel_id, log_service);
+  EXPECT_EQ(drain.value()->Open(writer), OkStatus());
+  EXPECT_EQ(drain.value()->Flush(), OkStatus());
+
+  // Add expected messages to the stack in the reverse order they are received.
+  message_stack.clear();
+  // One full packet was dropped. Since all messages are the same length, there
+  // are entries_found / successful_packets_sent per packet.
+  const uint32_t total_drop_count = entries_found / successful_packets_sent;
+  const uint32_t remaining_entries = total_entries - total_drop_count;
+  for (size_t i = 0; i < remaining_entries; ++i) {
+    message_stack.push_back(
+        std::as_bytes(std::span(std::string_view(kMessage))));
+  }
+  StringBuffer<32> message;
+  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
+  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+
+  for (auto& response : output.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += VerifyLogEntries(entry_decoder, message_stack);
+  }
+  // All entries are accounted for, including the drop message.
+  EXPECT_EQ(entries_found, remaining_entries + 1);
+}
+
+TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
+  const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
+  auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
+  ASSERT_TRUE(drain.ok());
+
+  LogService log_service(drain_map_);
+  const uint32_t output_buffer_size = 100;
+  rpc::RawFakeChannelOutput<output_buffer_size, 10> output(
+      rpc::MethodType::kServerStreaming);
+  rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
+  rpc::Server server(std::span(&channel, 1));
+
+  // Add as many entries needed to have multiple packets send.
+  const uint32_t min_packets_sent = 4;
+  const uint32_t max_messages_per_response =
+      output_buffer_size / sizeof(kMessage);
+  const size_t total_entries = min_packets_sent * max_messages_per_response;
+  AddLogEntries(total_entries, kMessage);
+
+  // Interrupt log stream with an error.
+  const uint32_t error_on_packet_count = min_packets_sent;
+  output.set_send_status(Status::Unavailable(), min_packets_sent);
+
+  // Request logs.
+  rpc::RawServerWriter writer =
+      rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+          server, drain_channel_id, log_service);
+  EXPECT_EQ(drain.value()->Open(writer), OkStatus());
+  // This drain ignores errors.
+  EXPECT_EQ(drain.value()->Flush(), OkStatus());
+  EXPECT_FALSE(output.done());
+
+  // Make some packets were sent.
+  ASSERT_GE(output.total_stream_packets(), min_packets_sent);
+
+  // Verify that not all the entries were sent.
+  size_t entries_found = 0;
+  for (auto& response : output.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += CountLogEntries(entry_decoder);
+  }
+  EXPECT_LE(entries_found, total_entries);
+
+  // Verify that a drop message count is found.
+  // Don't account the drop count message in the total drop count.
+  const uint32_t total_drop_count = total_entries - entries_found + 1;
+  // Since all messages are the same, the is a constant `total_drop_count`
+  // number of entries per packet.
+  const uint32_t entry_count_before_error =
+      error_on_packet_count * total_drop_count;
+  const uint32_t entry_count_after_error =
+      entries_found - 1 - entry_count_before_error;
+  Vector<ConstByteSpan, total_entries> message_stack;
+  // Add messages to the stack in the reverse order they are sent.
+  for (size_t i = 0; i < entry_count_after_error; ++i) {
+    message_stack.push_back(
+        std::as_bytes(std::span(std::string_view(kMessage))));
+  }
+  StringBuffer<32> message;
+  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
+  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+  for (size_t i = 0; i < entry_count_before_error; ++i) {
+    message_stack.push_back(
+        std::as_bytes(std::span(std::string_view(kMessage))));
+  }
+
+  for (auto& response : output.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    VerifyLogEntries(entry_decoder, message_stack);
+  }
+
+  // More calls to flush with errors will not affect this stubborn drain.
+  const size_t previous_stream_packet_count = output.total_stream_packets();
+  output.set_send_status(Status::Unavailable());
+  EXPECT_EQ(drain.value()->Flush(), OkStatus());
+  EXPECT_FALSE(output.done());
+  ASSERT_EQ(output.total_stream_packets(), previous_stream_packet_count);
+
+  output.clear();
+  EXPECT_EQ(drain.value()->Close(), OkStatus());
+  EXPECT_TRUE(output.done());
+}
 
 }  // namespace
 }  // namespace pw::log_rpc
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index 1eba118..5913171 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -19,8 +19,12 @@
 #include <span>
 
 #include "gtest/gtest.h"
+#include "pw_log_rpc/log_service.h"
 #include "pw_log_rpc/rpc_log_drain_map.h"
 #include "pw_multisink/multisink.h"
+#include "pw_rpc/channel.h"
+#include "pw_rpc/raw/fake_channel_output.h"
+#include "pw_rpc/raw/server_reader_writer.h"
 #include "pw_status/status.h"
 #include "pw_sync/mutex.h"
 
@@ -32,11 +36,11 @@
 TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
   // Drain without a writer.
   const uint32_t drain_id = 1;
-  std::array<std::byte, kBufferSize> buffer1;
+  std::array<std::byte, kBufferSize> buffer;
   sync::Mutex mutex;
   RpcLogDrain drain(
       drain_id,
-      buffer1,
+      buffer,
       mutex,
       RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError);
   EXPECT_EQ(drain.channel_id(), drain_id);
@@ -87,8 +91,76 @@
   }
 }
 
-// TODO(cachinchilla): add tests for passing an open RawServerWriter when there
-// is a way to create an one manually.
+TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
+  const uint32_t drain_id = 1;
+  std::array<std::byte, kBufferSize> buffer;
+  sync::Mutex mutex;
+  std::array<RpcLogDrain, 1> drains{
+      RpcLogDrain(
+          drain_id,
+          buffer,
+          mutex,
+          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+  };
+  RpcLogDrainMap drain_map(drains);
+  LogService log_service(drain_map);
+
+  rpc::RawFakeChannelOutput<128, 1> output(rpc::MethodType::kServerStreaming);
+  rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
+  rpc::Server server(std::span(&channel, 1));
+
+  // Attach drain to a MultiSink.
+  RpcLogDrain& drain = drains[0];
+  std::array<std::byte, kBufferSize * 2> multisink_buffer;
+  multisink::MultiSink multisink(multisink_buffer);
+  multisink.AttachDrain(drain);
+  EXPECT_EQ(drain.Flush(), Status::Unavailable());
+
+  rpc::RawServerWriter writer =
+      rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+          server, drain_id, log_service);
+  ASSERT_TRUE(writer.open());
+  EXPECT_EQ(drain.Open(writer), OkStatus());
+  EXPECT_EQ(drain.Flush(), OkStatus());
+  // Can call multliple times until closed on error.
+  EXPECT_EQ(drain.Flush(), OkStatus());
+  EXPECT_EQ(drain.Close(), OkStatus());
+  rpc::RawServerWriter& writer_ref = writer;
+  ASSERT_FALSE(writer_ref.open());
+  EXPECT_EQ(drain.Flush(), Status::Unavailable());
+}
+
+TEST(RpcLogDrain, TryReopenOpenedDrain) {
+  const uint32_t drain_id = 1;
+  std::array<std::byte, kBufferSize> buffer;
+  sync::Mutex mutex;
+  std::array<RpcLogDrain, 1> drains{
+      RpcLogDrain(
+          drain_id,
+          buffer,
+          mutex,
+          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+  };
+  RpcLogDrainMap drain_map(drains);
+  LogService log_service(drain_map);
+
+  rpc::RawFakeChannelOutput<128, 1> output(rpc::MethodType::kServerStreaming);
+  rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
+  rpc::Server server(std::span(&channel, 1));
+
+  // Open Drain and try to open with a new writer.
+  rpc::RawServerWriter writer =
+      rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+          server, drain_id, log_service);
+  ASSERT_TRUE(writer.open());
+  RpcLogDrain& drain = drains[0];
+  EXPECT_EQ(drain.Open(writer), OkStatus());
+  rpc::RawServerWriter second_writer =
+      rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
+          server, drain_id, log_service);
+  ASSERT_TRUE(second_writer.open());
+  EXPECT_EQ(drain.Open(second_writer), Status::AlreadyExists());
+}
 
 }  // namespace
 }  // namespace pw::log_rpc