pw_log_rpc: Add unit tests, docs, & error handling

Add pw_log_rpc unit tests and documentation.
Add an enum to dictacte how to handle server writer errors in
RpcLogDrain.

Test: unit tests pass
Change-Id: Ib22dd3414cf344fddd6f179d43fd4d533ba12fd8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/56931
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index e570466..27c7221 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -68,7 +68,33 @@
     ],
 )
 
-# TODO(cachinchilla): implement tests.
 pw_cc_test(
-    name = "logs_rpc_test",
+    name = "log_service_test",
+    srcs = [
+        "log_service_test.cc",
+    ],
+    deps = [
+        ":log_service",
+        "//pw_containers:vector",
+        "//pw_log",
+        "//pw_log:proto_utils",
+        "//pw_log:protos.pwpb",
+        "//pw_protobuf",
+        "//pw_result",
+        "//pw_rpc/raw:test_method_context",
+        "//pw_status",
+        "//pw_string",
+        "//pw_unit_test",
+    ],
+)
+
+pw_cc_test(
+    name = "rpc_log_drain_test",
+    srcs = [
+        "rpc_log_drain_test.cc",
+    ],
+    deps = [
+        ":rpc_log_drain",
+        "//pw_unit_test",
+    ],
 )
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 10f3139..122fccb 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -73,8 +73,25 @@
   ]
 }
 
-# TODO(cachinchilla): implement tests.
-pw_test("logs_rpc_test") {
+pw_test("log_service_test") {
+  sources = [ "log_service_test.cc" ]
+  deps = [
+    ":log_service",
+    "$dir_pw_containers:vector",
+    "$dir_pw_log",
+    "$dir_pw_log:proto_utils",
+    "$dir_pw_log:protos.pwpb",
+    "$dir_pw_protobuf",
+    "$dir_pw_result",
+    "$dir_pw_rpc/raw:test_method_context",
+    "$dir_pw_status",
+    "$dir_pw_string",
+  ]
+}
+
+pw_test("rpc_log_drain_test") {
+  sources = [ "rpc_log_drain_test.cc" ]
+  deps = [ ":rpc_log_drain" ]
 }
 
 # TODO(cachinchilla): update docs.
@@ -83,5 +100,8 @@
 }
 
 pw_test_group("tests") {
-  tests = [ ":logs_rpc_test" ]
+  tests = [
+    ":log_service_test",
+    ":rpc_log_drain_test",
+  ]
 }
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst
index ff8b466..54916a0 100644
--- a/pw_log_rpc/docs.rst
+++ b/pw_log_rpc/docs.rst
@@ -3,5 +3,253 @@
 ----------
 pw_log_rpc
 ----------
-This is a RPC-based logging backend for Pigweed. It is not ready for use, and
-is under construction.
+An RPC-based logging backend for Pigweed.
+
+.. warning::
+  This module is under construction and might change in the future.
+
+How to use
+==========
+1. Set up RPC
+-------------
+Set up RPC for your target device. Basic deployments run RPC over a UART, with
+HDLC on top for framing. See :ref:`module-pw_rpc` for details on how to enable
+``pw_rpc``.
+
+2. Set up tokenized logging (optional)
+--------------------------------------
+Set up the :ref:`module-pw_log_tokenized` log backend.
+
+3. Connect the tokenized logging handler to the MultiSink
+---------------------------------------------------------
+Create a :ref:`MultiSink <module-pw_multisink>` instance to buffer log entries.
+Then, make the log backend handler,
+``pw_tokenizer_HandleEncodedMessageWithPayload``, encode log entries in the
+``log::LogEntry`` format, and add them to the ``MultiSink``.
+
+4. Create log drains
+--------------------
+Create an ``RpcLogDrainMap`` with one ``RpcLogDrain`` for each RPC channel used
+to stream logs. Provide this map to the ``LogService`` and register the latter
+with the application's RPC service. The ``RpcLogDrainMap`` provides a convenient
+way to access and maintain each ``RpcLogDrain``. Attach each ``RpcLogDrain`` to
+the ``MultiSink``.
+
+5. Flush the log drains in the background
+-----------------------------------------
+Depending on the product's requirements, create a thread to flush all
+``RpcLogDrain``\s or one thread per drain. The thread(s) must continuously call
+``RpcLogDrain::Flush()`` to pull entries from the ``MultiSink`` and send them to
+the log listeners.
+
+Logging over RPC diagrams
+=========================
+
+Sample RPC logs request
+-----------------------
+The log listener, e.g. a computer, requests logs via RPC. The log service
+receives the request and sets up the corresponding ``RpcLogDrain`` to start the
+log stream.
+
+.. mermaid::
+
+  graph TD
+    computer[Computer]-->pw_rpc;
+    pw_rpc-->log_service[LogService];
+    log_service-->rpc_log_drain_pc[RpcLogDrain<br>streams to<br>computer];;
+
+Sample logging over RPC
+------------------------
+Logs are streamed via RPC to a computer, and to another log listener. There can
+also be internal log readers, i.e. ``MultiSink::Drain``\s, attached to the
+``MultiSink``, such as a writer to persistent memory, for example.
+
+.. mermaid::
+
+  graph TD
+    source1[Source 1]-->log_api[pw_log API];
+    source2[Source 2]-->log_api;
+    log_api-->log_backend[Log backend];
+    log_backend-->multisink[MultiSink];
+    multisink-->drain[MultiSink::Drain];
+    multisink-->rpc_log_drain_pc[RpcLogDrain<br>streams to<br>computer];
+    multisink-->rpc_log_drain_other[RpcLogDrain<br>streams to<br>other log listener];
+    drain-->other_consumer[Other log consumer<br>e.g. persistent memory];
+    rpc_log_drain_pc-->pw_rpc;
+    rpc_log_drain_other-->pw_rpc;
+    pw_rpc-->computer[Computer];
+    pw_rpc-->other_listener[Other log<br>listener];
+
+RPC log service
+===============
+The ``LogService`` class is an RPC service that provides a way to request a log
+stream sent via RPC. Thus, it helps avoid using a different protocol for logs
+and RPCs over the same interface(s). It requires a map of ``RpcLogDrains`` to
+assign stream writers and delegate the log stream flushing to the user's
+preferred method.
+
+RpcLogDrain
+===========
+An ``RpcLogDrain`` reads from the ``MultiSink`` instance that buffers logs, then
+packs, and sends the retrieved log entries to the log listener. One
+``RpcLogDrain`` is needed for each log listener. An ``RpcLogDrain`` needs a
+thread to continuously call ``Flush()`` to maintain the log stream. A thread can
+maintain multiple log streams, but it must not be the same thread used by the
+RPC server, to avoid blocking it.
+
+Each ``RpcLogDrain`` is identified by a known RPC channel ID and requires a
+``rpc::RawServerWriter`` to write the packed multiple log entries. This writer
+is assigned by the ``LogService::Listen`` RPC. Future work will allow
+``RpcLogDrain``\s to have an open RPC writer, to constantly stream logs without
+the need to request them. This is useful in cases where the connection to the
+client is dropped silently because the log stream can continue when reconnected
+without the client requesting it.
+
+An ``RpcLogDrain`` must be attached to a ``MultiSink`` containing multiple
+``log::LogEntry``\s. When ``Flush`` is called, the drain acquires the
+``rpc::RawServerWriter`` 's write buffer, grabs one ``log::LogEntry`` from the
+multisink, encodes it into a ``log::LogEntries`` stream, and repeats the process
+until the write buffer is full. Then the drain calls
+``rpc::RawServerWriter::Write`` to flush the write buffer and repeats the
+process until all the entries in the ``MultiSink`` are read or an error is
+found.
+
+The user must provide a buffer large enough for the largest entry in the
+``MultiSink`` while also accounting for the interface's Maximum Transmission
+Unit (MTU). If the ``RpcLogDrain`` finds a drop message count as it reads the
+``MultiSink`` it will insert a message in the stream with the drop message
+count.
+
+RpcLogDrainMap
+==============
+Provides a convenient way to access all or a single ``RpcLogDrain`` by its RPC
+channel ID.
+
+RpcLogDrainThread
+=================
+The module includes a sample thread that flushes each drain sequentially. Future
+work might replace this with enqueueing the flush work on a work queue. The user
+can also choose to have different threads flushing individual ``RpcLogDrain``\s
+with different priorities.
+
+Logging example
+===============
+The following code shows a sample setup to defer the log handling to the
+``RpcLogDrainThread`` to avoid having the log streaming block at the log
+callsite.
+
+main.cc
+-------
+.. code-block:: cpp
+
+  #include "foo/foo_log.h"
+  #include "pw_log/log.h"
+  #include "pw_thread/detached_thread.h"
+  #include "pw_thread_stl/options.h"
+
+  namespace {
+
+  void RegisterServices() {
+    pw::rpc::system_server::Server().RegisterService(foo_log::log_service);
+  }
+  }  // namespace
+
+  int main() {
+    PW_LOG_INFO("Deferred logging over RPC example");
+    pw::rpc::system_server::Init();
+    RegisterServices();
+    pw::thread::DetachedThread(pw::thread::stl::Options(), foo_log::log_thread);
+    pw::rpc::system_server::Start();
+    return 0;
+  }
+
+foo_log.cc
+----------
+Example of a log backend implementation, where logs enter the ``MultiSink`` and
+log drains are set up.
+
+.. code-block:: cpp
+
+  #include "foo/foo_log.h"
+
+  #include <array>
+  #include <cstdint>
+
+  #include "pw_chrono/system_clock.h"
+  #include "pw_log/proto_utils.h"
+  #include "pw_log_rpc/log_service.h"
+  #include "pw_log_rpc/rpc_log_drain.h"
+  #include "pw_log_rpc/rpc_log_drain_map.h"
+  #include "pw_log_rpc/rpc_log_drain_thread.h"
+  #include "pw_rpc_system_server/rpc_server.h"
+  #include "pw_sync/interrupt_spin_lock.h"
+  #include "pw_sync/lock_annotations.h"
+  #include "pw_sync/mutex.h"
+  #include "pw_tokenizer/tokenize_to_global_handler_with_payload.h"
+
+  namespace foo_log {
+  namespace {
+  constexpr size_t kLogBufferSize = 5000;
+  // Tokenized logs are typically 12-24 bytes.
+  constexpr size_t kMaxMessageSize = 32;
+  // kMaxLogEntrySize should be less than the MTU of the RPC channel output used
+  // by the provided server writer.
+  constexpr size_t kMaxLogEntrySize =
+      pw::log_rpc::RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize;
+  std::array<std::byte, kLogBufferSize> multisink_buffer;
+
+  // To save RAM, share the mutex, since drains will be managed sequentially.
+  pw::sync::Mutex shared_mutex;
+  std::array<std::byte, kMaxEntrySize> client1_buffer
+      PW_GUARDED_BY(shared_mutex);
+  std::array<std::byte, kMaxEntrySize> client2_buffer
+      PW_GUARDED_BY(shared_mutex);
+  std::array<pw::log_rpc::RpcLogDrain, 2> drains = {
+      pw::log_rpc::RpcLogDrain(
+          1,
+          client1_buffer,
+          shared_mutex,
+          RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+      pw::log_rpc::RpcLogDrain(
+          2,
+          client2_buffer,
+          shared_mutex,
+          RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+  };
+
+  pw::sync::InterruptSpinLock log_encode_lock;
+  std::array<std::byte, kMaxLogEntrySize> log_encode_buffer
+      PW_GUARDED_BY(log_encode_lock);
+
+  extern "C" void pw_tokenizer_HandleEncodedMessageWithPayload(
+      pw_tokenizer_Payload metadata, const uint8_t message[], size_t size_bytes) {
+    int64_t timestamp =
+        pw::chrono::SystemClock::now().time_since_epoch().count();
+    std::lock_guard lock(log_encode_lock);
+    pw::Result<pw::ConstByteSpan> encoded_log_result =
+      pw::log::EncodeTokenizedLog(
+          metadata, message, size_bytes, timestamp, log_encode_buffer);
+
+    if (!encoded_log_result.ok()) {
+      GetMultiSink().HandleDropped();
+      return;
+    }
+    GetMultiSink().HandleEntry(encoded_log_result.value());
+  }
+  }  // namespace
+
+  pw::log_rpc::RpcLogDrainMap drain_map(drains);
+  pw::log_rpc::RpcLogDrainThread log_thread(GetMultiSink(), drain_map);
+  pw::log_rpc::LogService log_service(drain_map);
+
+  pw::multisink::MultiSink& GetMultiSink() {
+    static pw::multisink::MultiSink multisink(multisink_buffer);
+    return multisink;
+  }
+  }  // namespace foo_log
+
+Logging in other source files
+-----------------------------
+To defer logging, other source files must simply include ``pw_log/log.h`` and
+use the :ref:`module-pw_log` APIs, as long as the source set that includes
+``foo_log.cc`` is setup as the log backend.
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
new file mode 100644
index 0000000..620ea46
--- /dev/null
+++ b/pw_log_rpc/log_service_test.cc
@@ -0,0 +1,394 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/log_service.h"
+
+#include <array>
+#include <cstdint>
+#include <limits>
+
+#include "gtest/gtest.h"
+#include "pw_containers/vector.h"
+#include "pw_log/log.h"
+#include "pw_log/proto/log.pwpb.h"
+#include "pw_log/proto_utils.h"
+#include "pw_protobuf/decoder.h"
+#include "pw_result/result.h"
+#include "pw_rpc/raw/test_method_context.h"
+#include "pw_string/string_builder.h"
+#include "pw_sync/mutex.h"
+
+namespace pw::log_rpc {
+namespace {
+
+#define LOG_SERVICE_METHOD_CONTEXT \
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen)
+
+constexpr size_t kMaxMessageSize = 50;
+static_assert(RpcLogDrain::kMaxDropMessageSize < kMaxMessageSize);
+constexpr size_t kMaxLogEntrySize =
+    RpcLogDrain::kMinEntrySizeWithoutPayload + kMaxMessageSize;
+constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
+constexpr size_t kMaxDrains = 3;
+constexpr char kMessage[] = "message";
+// A message small enough to fit encoded in LogServiceTest::entry_encode_buffer_
+// but large enough to not fit in LogServiceTest::small_buffer_.
+constexpr char kLongMessage[] =
+    "This is a long log message that will be dropped.";
+static_assert(sizeof(kLongMessage) < kMaxMessageSize);
+static_assert(sizeof(kLongMessage) > RpcLogDrain::kMaxDropMessageSize);
+std::array<std::byte, 1> rpc_request_buffer;
+
+// `LogServiceTest` sets up a logging environment for testing with a `MultiSink`
+// for log entries, and multiple `RpcLogDrain`s for consuming such log entries.
+// It includes methods to add log entries to the `MultiSink`, and buffers for
+// encoding and retrieving log entries. Tests can choose how many entries to
+// add to the multisink, and which drain to use.
+class LogServiceTest : public ::testing::Test {
+ public:
+  LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
+    for (auto& drain : drain_map_.drains()) {
+      multisink_.AttachDrain(drain);
+    }
+  }
+
+  void AddLogEntries(size_t log_count, std::string_view message) {
+    for (size_t i = 0; i < log_count; ++i) {
+      AddLogEntry(message);
+    }
+  }
+
+  void AddLogEntry(std::string_view message) {
+    auto metadata =
+        log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>();
+    Result<ConstByteSpan> encoded_log_result =
+        log::EncodeTokenizedLog(metadata,
+                                std::as_bytes(std::span(message)),
+                                /*ticks_since_epoch=*/0,
+                                entry_encode_buffer_);
+    EXPECT_EQ(encoded_log_result.status(), OkStatus());
+    multisink_.HandleEntry(encoded_log_result.value());
+  }
+
+ protected:
+  std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_;
+  multisink::MultiSink multisink_;
+  RpcLogDrainMap drain_map_;
+  std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_;
+
+  // Drain Buffers
+  std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
+  std::array<std::byte, kMaxLogEntrySize> drain_buffer2_;
+  std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_;
+  static constexpr uint32_t kSmallBufferDrainId = 3;
+  sync::Mutex shared_mutex_;
+  std::array<RpcLogDrain, kMaxDrains> drains_{
+      RpcLogDrain(1,
+                  drain_buffer1_,
+                  shared_mutex_,
+                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+      RpcLogDrain(
+          2,
+          drain_buffer2_,
+          shared_mutex_,
+          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+      RpcLogDrain(kSmallBufferDrainId,
+                  small_buffer_,
+                  shared_mutex_,
+                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+  };
+};
+
+// Unpacks a `LogEntry` proto buffer and compares it with the expected data.
+void VerifyLogEntry(protobuf::Decoder& entry_decoder,
+                    log_tokenized::Metadata expected_metadata,
+                    ConstByteSpan expected_tokenized_data,
+                    const int64_t expected_timestamp) {
+  ConstByteSpan tokenized_data;
+  EXPECT_TRUE(entry_decoder.Next().ok());  // message [tokenized]
+  EXPECT_EQ(1U, entry_decoder.FieldNumber());
+  EXPECT_TRUE(entry_decoder.ReadBytes(&tokenized_data).ok());
+  EXPECT_EQ(tokenized_data.size(), expected_tokenized_data.size());
+  EXPECT_EQ(std::memcmp(tokenized_data.begin(),
+                        expected_tokenized_data.begin(),
+                        expected_tokenized_data.size()),
+            0);
+
+  uint32_t line_level;
+  EXPECT_TRUE(entry_decoder.Next().ok());  // line_level
+  EXPECT_EQ(2U, entry_decoder.FieldNumber());
+  EXPECT_TRUE(entry_decoder.ReadUint32(&line_level).ok());
+  EXPECT_EQ(expected_metadata.level(), line_level & PW_LOG_LEVEL_BITMASK);
+  EXPECT_EQ(expected_metadata.line_number(),
+            (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS);
+
+  if (expected_metadata.flags() != 0) {
+    uint32_t flags;
+    EXPECT_TRUE(entry_decoder.Next().ok());  // flags
+    EXPECT_EQ(3U, entry_decoder.FieldNumber());
+    EXPECT_TRUE(entry_decoder.ReadUint32(&flags).ok());
+    EXPECT_EQ(expected_metadata.flags(), flags);
+  }
+
+  const bool has_timestamp = entry_decoder.Next().ok();  // timestamp
+  if (expected_timestamp == 0 && !has_timestamp) {
+    return;
+  }
+  int64_t timestamp;
+  EXPECT_TRUE(has_timestamp);
+  EXPECT_EQ(4U, entry_decoder.FieldNumber());
+  EXPECT_TRUE(entry_decoder.ReadInt64(&timestamp).ok());
+  EXPECT_EQ(expected_timestamp, timestamp);
+}
+
+// Verifies a stream of log entries, returning the total count found.
+size_t VerifyLogEntries(protobuf::Decoder& entries_decoder,
+                        Vector<ConstByteSpan>& message_stack) {
+  size_t entries_found = 0;
+  while (entries_decoder.Next().ok()) {
+    ConstByteSpan entry;
+    EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
+    protobuf::Decoder entry_decoder(entry);
+    auto expected_metadata =
+        log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN, __LINE__, 0, 0>();
+    if (message_stack.empty()) {
+      break;
+    }
+    ConstByteSpan expected_message = message_stack.back();
+    VerifyLogEntry(entry_decoder,
+                   expected_metadata,
+                   expected_message,
+                   /*expected_timestamp=*/0);
+    message_stack.pop_back();
+    ++entries_found;
+  }
+  return entries_found;
+}
+
+TEST_F(LogServiceTest, AssignWriter) {
+  // Drains don't have writers.
+  for (auto& drain : drain_map_.drains()) {
+    EXPECT_EQ(drain.Flush(), Status::Unavailable());
+  }
+
+  // Create context directed to drain with ID 1.
+  RpcLogDrain& active_drain = drains_[0];
+  const uint32_t drain_channel_id = active_drain.channel_id();
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  context.set_channel_id(drain_channel_id);
+
+  // Call RPC, which sets the drain's writer.
+  context.call(rpc_request_buffer);
+  EXPECT_EQ(active_drain.Flush(), OkStatus());
+
+  // Other drains are still missing writers.
+  for (auto& drain : drain_map_.drains()) {
+    if (drain.channel_id() != drain_channel_id) {
+      EXPECT_EQ(drain.Flush(), Status::Unavailable());
+    }
+  }
+
+  // Calling an ongoing log stream must not change the active drain's
+  // writer, and the second writer must not get any responses.
+  LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
+  second_call_context.set_channel_id(drain_channel_id);
+  second_call_context.call(rpc_request_buffer);
+  EXPECT_EQ(active_drain.Flush(), OkStatus());
+  ASSERT_TRUE(second_call_context.done());
+  EXPECT_EQ(second_call_context.responses().size(), 0u);
+
+  // Setting a new writer on a closed stream is allowed.
+  ASSERT_EQ(active_drain.Close(), OkStatus());
+  LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
+  third_call_context.set_channel_id(drain_channel_id);
+  third_call_context.call(rpc_request_buffer);
+  EXPECT_EQ(active_drain.Flush(), OkStatus());
+  ASSERT_FALSE(third_call_context.done());
+  EXPECT_EQ(third_call_context.responses().size(), 1u);
+  EXPECT_EQ(active_drain.Close(), OkStatus());
+}
+
+TEST_F(LogServiceTest, StartAndEndStream) {
+  RpcLogDrain& active_drain = drains_[2];
+  const uint32_t drain_channel_id = active_drain.channel_id();
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  context.set_channel_id(drain_channel_id);
+
+  // Add log entries.
+  const size_t total_entries = 10;
+  AddLogEntries(total_entries, kMessage);
+  // Request logs.
+  context.call(rpc_request_buffer);
+  EXPECT_EQ(active_drain.Flush(), OkStatus());
+
+  // Not done until the stream is finished.
+  ASSERT_FALSE(context.done());
+  active_drain.Close();
+  ASSERT_TRUE(context.done());
+
+  EXPECT_EQ(context.status(), OkStatus());
+  // There is at least 1 response with multiple log entries packed.
+  EXPECT_GE(context.responses().size(), 1u);
+
+  // Verify data in responses.
+  Vector<ConstByteSpan, total_entries> message_stack;
+  for (size_t i = 0; i < total_entries; ++i) {
+    message_stack.push_back(
+        std::as_bytes(std::span(std::string_view(kMessage))));
+  }
+  size_t entries_found = 0;
+  for (auto& response : context.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += VerifyLogEntries(entry_decoder, message_stack);
+  }
+  EXPECT_EQ(entries_found, total_entries);
+}
+
+TEST_F(LogServiceTest, HandleDropped) {
+  RpcLogDrain& active_drain = drains_[0];
+  const uint32_t drain_channel_id = active_drain.channel_id();
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  context.set_channel_id(drain_channel_id);
+
+  // Add log entries.
+  const size_t total_entries = 5;
+  const uint32_t total_drop_count = 2;
+  AddLogEntries(total_entries, kMessage);
+  multisink_.HandleDropped(total_drop_count);
+
+  // Request logs.
+  context.call(rpc_request_buffer);
+  EXPECT_EQ(active_drain.Flush(), OkStatus());
+  active_drain.Close();
+  ASSERT_EQ(context.status(), OkStatus());
+  // There is at least 1 response with multiple log entries packed.
+  ASSERT_GE(context.responses().size(), 1u);
+
+  // Add create expected messages in a stack to match the order they arrive in.
+  Vector<ConstByteSpan, total_entries + 1> message_stack;
+  StringBuffer<32> message;
+  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
+  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+  for (size_t i = 0; i < total_entries; ++i) {
+    message_stack.push_back(
+        std::as_bytes(std::span(std::string_view(kMessage))));
+  }
+
+  // Verify data in responses.
+  size_t entries_found = 0;
+  for (auto& response : context.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += VerifyLogEntries(entry_decoder, message_stack);
+  }
+  // Expect an extra message with the drop count.
+  EXPECT_EQ(entries_found, total_entries + 1);
+}
+
+TEST_F(LogServiceTest, HandleSmallBuffer) {
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  context.set_channel_id(kSmallBufferDrainId);
+  auto small_buffer_drain =
+      drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
+  ASSERT_TRUE(small_buffer_drain.ok());
+
+  // Add log entries.
+  const size_t total_entries = 5;
+  const uint32_t total_drop_count = total_entries;
+  AddLogEntries(total_entries, kLongMessage);
+  // Request logs.
+  context.call(rpc_request_buffer);
+  EXPECT_EQ(small_buffer_drain.value()->Flush(), OkStatus());
+  EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
+  ASSERT_EQ(context.status(), OkStatus());
+  ASSERT_GE(context.responses().size(), 1u);
+
+  Vector<ConstByteSpan, total_entries + 1> message_stack;
+  StringBuffer<32> message;
+  message.Format("Dropped %u", static_cast<unsigned int>(total_drop_count));
+  message_stack.push_back(std::as_bytes(std::span(std::string_view(message))));
+
+  // Verify data in responses.
+  size_t entries_found = 0;
+  for (auto& response : context.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += VerifyLogEntries(entry_decoder, message_stack);
+  }
+  // No messages fit the buffer, expect a drop message.
+  EXPECT_EQ(entries_found, 1u);
+}
+
+TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
+  auto& detached_drain = drains_[0];
+  multisink_.DetachDrain(detached_drain);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  context.set_channel_id(detached_drain.channel_id());
+
+  // Add log entries.
+  const size_t total_entries = 5;
+  AddLogEntries(total_entries, kMessage);
+  // Request logs.
+  context.call(rpc_request_buffer);
+  EXPECT_EQ(detached_drain.Close(), OkStatus());
+  ASSERT_EQ(context.status(), OkStatus());
+  EXPECT_EQ(context.responses().size(), 0u);
+}
+
+TEST_F(LogServiceTest, LargeLogEntry) {
+  const auto expected_metadata =
+      log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
+                                   (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
+                                   (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
+                                   (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>();
+  ConstByteSpan expected_message = std::as_bytes(std::span(kMessage));
+  const int64_t expected_timestamp = std::numeric_limits<int64_t>::max();
+
+  // Add entry to multisink.
+  log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
+  encoder.WriteMessage(expected_message);
+  encoder.WriteLineLevel(
+      (expected_metadata.level() & PW_LOG_LEVEL_BITMASK) |
+      ((expected_metadata.line_number() << PW_LOG_LEVEL_BITS) &
+       ~PW_LOG_LEVEL_BITMASK));
+  encoder.WriteFlags(expected_metadata.flags());
+  encoder.WriteTimestamp(expected_timestamp);
+  ASSERT_EQ(encoder.status(), OkStatus());
+  multisink_.HandleEntry(encoder);
+
+  // Start log stream.
+  RpcLogDrain& active_drain = drains_[0];
+  const uint32_t drain_channel_id = active_drain.channel_id();
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  context.set_channel_id(drain_channel_id);
+  context.call(rpc_request_buffer);
+  ASSERT_EQ(active_drain.Flush(), OkStatus());
+  active_drain.Close();
+  ASSERT_EQ(context.status(), OkStatus());
+  ASSERT_EQ(context.responses().size(), 1u);
+
+  // Verify message.
+  protobuf::Decoder entries_decoder(context.responses()[0]);
+  ASSERT_TRUE(entries_decoder.Next().ok());
+  ConstByteSpan entry;
+  EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
+  protobuf::Decoder entry_decoder(entry);
+  VerifyLogEntry(
+      entry_decoder, expected_metadata, expected_message, expected_timestamp);
+}
+
+// TODO(pwbug/469): add tests for an open RawServerWriter that closes or fails
+// while flushing, then on re-open the drain sends a counts. The drain mus have
+// ignore_writer_error disabled.
+
+}  // namespace
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
index 6933690..93d5358 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -38,12 +38,18 @@
 // into a log::LogEntries message, writes the message to the provided writer,
 // then repeats the process until there are no more entries in the MultiSink or
 // the writer failed to write the outgoing package, in which case the RPC on
-// the writer is closed. When close_stream_on_writer_error is false the drain
+// the writer is closed. When error_handling is `kIgnoreWriterErrors` the drain
 // will continue to retrieve log entries out of the MultiSink and attempt to
 // send them out ignoring the writer errors. Note: this behavior might change or
 // be removed in the future.
 class RpcLogDrain : public multisink::MultiSink::Drain {
  public:
+  // Dictates how to handle server writer errors.
+  enum class LogDrainErrorHandling {
+    kIgnoreWriterErrors,
+    kCloseStreamOnWriterError,
+  };
+
   // The minimum buffer size, without the message payload, needed to retrieve a
   // log::LogEntry from the attached MultiSink. The user must account for the
   // max message size to avoid log entry drops.
@@ -77,9 +83,9 @@
               ByteSpan log_entry_buffer,
               rpc::RawServerWriter writer,
               sync::Mutex& mutex,
-              bool close_stream_on_writer_error)
+              LogDrainErrorHandling error_handling)
       : channel_id_(channel_id),
-        close_stream_on_writer_error_(close_stream_on_writer_error),
+        error_handling_(error_handling),
         server_writer_(std::move(writer)),
         log_entry_buffer_(log_entry_buffer),
         committed_entry_drop_count_(0),
@@ -96,9 +102,9 @@
   RpcLogDrain(uint32_t channel_id,
               ByteSpan log_entry_buffer,
               sync::Mutex& mutex,
-              bool close_stream_on_writer_error)
+              LogDrainErrorHandling error_handling)
       : channel_id_(channel_id),
-        close_stream_on_writer_error_(close_stream_on_writer_error),
+        error_handling_(error_handling),
         server_writer_(),
         log_entry_buffer_(log_entry_buffer),
         committed_entry_drop_count_(0),
@@ -122,15 +128,15 @@
   // Accesses log entries and sends them via the writer. Expected to be called
   // frequently to avoid log drops. If the writer fails to send a packet with
   // multiple log entries, the entries are dropped and a drop message with the
-  // count is sent. When close_stream_on_writer_error is set, the stream will
-  // automatically be closed and Flush will return the writer error.
+  // count is sent. When error_handling is kCloseStreamOnWriterError, the stream
+  // will automatically be closed and Flush will return the writer error.
   //
   // Precondition: the drain must be attached to a MultiSink.
   //
   // Return values:
   // OK - all entries were consumed.
-  // ABORTED - there was an error writing the packet, and
-  // close_stream_on_writer_error is true.
+  // ABORTED - there was an error writing the packet, and error_handling equals
+  // `kCloseStreamOnWriterError`.
   Status Flush() PW_LOCKS_EXCLUDED(mutex_);
 
   // Ends RPC log stream without flushing.
@@ -155,7 +161,7 @@
       PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
 
   const uint32_t channel_id_;
-  const bool close_stream_on_writer_error_;
+  const LogDrainErrorHandling error_handling_;
   rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_);
   const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
   uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_);
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index 871cbe8..385b9e0 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -70,7 +70,7 @@
     log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
     if (const Status status = server_writer_.Write(encoder); !status.ok()) {
       committed_entry_drop_count_ += packed_entry_count;
-      if (close_stream_on_writer_error_) {
+      if (error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
         server_writer_.Finish().IgnoreError();
         return Status::Aborted();
       }
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
new file mode 100644
index 0000000..1eba118
--- /dev/null
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -0,0 +1,94 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/rpc_log_drain.h"
+
+#include <array>
+#include <cstdint>
+#include <span>
+
+#include "gtest/gtest.h"
+#include "pw_log_rpc/rpc_log_drain_map.h"
+#include "pw_multisink/multisink.h"
+#include "pw_status/status.h"
+#include "pw_sync/mutex.h"
+
+namespace pw::log_rpc {
+namespace {
+static constexpr size_t kBufferSize =
+    RpcLogDrain::kMinEntrySizeWithoutPayload + 32;
+
+TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
+  // Drain without a writer.
+  const uint32_t drain_id = 1;
+  std::array<std::byte, kBufferSize> buffer1;
+  sync::Mutex mutex;
+  RpcLogDrain drain(
+      drain_id,
+      buffer1,
+      mutex,
+      RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError);
+  EXPECT_EQ(drain.channel_id(), drain_id);
+
+  // Attach drain to a MultiSink.
+  std::array<std::byte, kBufferSize * 2> multisink_buffer;
+  multisink::MultiSink multisink(multisink_buffer);
+  multisink.AttachDrain(drain);
+  EXPECT_EQ(drain.Flush(), Status::Unavailable());
+
+  rpc::RawServerWriter writer;
+  ASSERT_FALSE(writer.open());
+  EXPECT_EQ(drain.Open(writer), Status::FailedPrecondition());
+  EXPECT_EQ(drain.Flush(), Status::Unavailable());
+}
+
+TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) {
+  static constexpr size_t kMaxDrains = 3;
+  sync::Mutex mutex;
+  std::array<std::array<std::byte, kBufferSize>, kMaxDrains> buffers;
+  std::array<RpcLogDrain, kMaxDrains> drains{
+      RpcLogDrain(
+          0,
+          buffers[0],
+          mutex,
+          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+      RpcLogDrain(
+          1,
+          buffers[1],
+          mutex,
+          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+      RpcLogDrain(2,
+                  buffers[2],
+                  mutex,
+                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+  };
+
+  RpcLogDrainMap drain_map(drains);
+  for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) {
+    auto drain_result = drain_map.GetDrainFromChannelId(channel_id);
+    ASSERT_TRUE(drain_result.ok());
+    EXPECT_EQ(drain_result.value(), &drains[channel_id]);
+  }
+  const std::span<RpcLogDrain> mapped_drains = drain_map.drains();
+  ASSERT_EQ(mapped_drains.size(), kMaxDrains);
+  for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) {
+    EXPECT_EQ(&mapped_drains[channel_id], &drains[channel_id]);
+  }
+}
+
+// TODO(cachinchilla): add tests for passing an open RawServerWriter when there
+// is a way to create an one manually.
+
+}  // namespace
+}  // namespace pw::log_rpc