pw_log_rpc: Add RPC command for logs

Adds Logs.Get RPC to acquire the logs from log queue.

Change-Id: I5a3d93604906f4dd0471e50964fe36a61c66797c
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/21960
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_log_rpc/BUILD b/pw_log_rpc/BUILD
index e00d050..17c9649 100644
--- a/pw_log_rpc/BUILD
+++ b/pw_log_rpc/BUILD
@@ -23,6 +23,19 @@
 licenses(["notice"])  # Apache License 2.0
 
 pw_cc_library(
+    name = "pw_logs",
+    srcs = [ "logs_rpc.cc" ],
+    includes = [ "public" ],
+    deps = [
+        "//pw_bytes",
+        "//pw_result",
+        "//pw_ring_buffer",
+        "//pw_status",
+    ],
+    hdrs = [ "public/pw_log_rpc/logs_rpc.h" ]
+)
+
+pw_cc_library(
     name = "pw_log_queue",
     srcs = [ "log_queue.cc" ],
     includes = [ "public" ],
@@ -37,7 +50,7 @@
 )
 
 pw_cc_test(
-    name = "test",
+    name = "log_queue_test",
     srcs = [
         "log_queue_test.cc",
     ],
@@ -46,3 +59,14 @@
         "//pw_unit_test",
     ],
 )
+
+pw_cc_test(
+    name = "logs_rpc_test",
+    srcs = [
+        "logs_rpc_test.cc",
+    ],
+    deps = [
+        "//pw_preprocessor",
+        "//pw_unit_test",
+    ],
+)
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index b2eac44..2f308a0 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -24,6 +24,17 @@
   visibility = [ ":*" ]
 }
 
+pw_source_set("logs") {
+  public_configs = [ ":default_config" ]
+  public = [ "public/pw_log_rpc/logs_rpc.h" ]
+  sources = [ "logs_rpc.cc" ]
+  public_deps = [
+    ":log_queue",
+    ":protos.pwpb",
+    ":protos.raw_rpc",
+  ]
+}
+
 pw_source_set("log_queue") {
   public_configs = [ ":default_config" ]
   public = [ "public/pw_log_rpc/log_queue.h" ]
@@ -38,8 +49,16 @@
   deps = [ ":protos.pwpb" ]
 }
 
+pw_test("logs_rpc_test") {
+  deps = [
+    ":logs",
+    "$dir_pw_rpc/raw:test_method_context",
+  ]
+  sources = [ "logs_rpc_test.cc" ]
+}
+
 pw_proto_library("protos") {
-  sources = [ "pw_log_rpc_proto/log.proto" ]
+  sources = [ "pw_log_proto/log.proto" ]
 }
 
 pw_doc_group("docs") {
@@ -56,5 +75,8 @@
 }
 
 pw_test_group("tests") {
-  tests = [ ":log_queue_test" ]
+  tests = [
+    ":log_queue_test",
+    ":logs_rpc_test",
+  ]
 }
diff --git a/pw_log_rpc/log_queue.cc b/pw_log_rpc/log_queue.cc
index e23ca16..a17cedf 100644
--- a/pw_log_rpc/log_queue.cc
+++ b/pw_log_rpc/log_queue.cc
@@ -15,7 +15,7 @@
 #include "pw_log_rpc/log_queue.h"
 
 #include "pw_log/levels.h"
-#include "pw_log_rpc_proto/log.pwpb.h"
+#include "pw_log_proto/log.pwpb.h"
 #include "pw_protobuf/wire_format.h"
 #include "pw_status/try.h"
 
@@ -24,7 +24,7 @@
 
 using pw::protobuf::WireType;
 constexpr std::byte kLogKey = static_cast<std::byte>(pw::protobuf::MakeKey(
-    static_cast<uint32_t>(pw::log_rpc::Log::Fields::ENTRIES),
+    static_cast<uint32_t>(pw::log::LogEntries::Fields::ENTRIES),
     WireType::kDelimited));
 
 }  // namespace
@@ -36,7 +36,7 @@
                                       uint32_t thread,
                                       int64_t timestamp) {
   pw::protobuf::NestedEncoder nested_encoder(encode_buffer_);
-  LogEntry::Encoder encoder(&nested_encoder);
+  pw::log::LogEntry::Encoder encoder(&nested_encoder);
   Status status;
 
   encoder.WriteMessageTokenized(message);
@@ -55,10 +55,14 @@
 
   ConstByteSpan log_entry;
   status = nested_encoder.Encode(&log_entry);
-  if (!status.ok()) {
-    // When encoding failures occur, map the error to INTERNAL, as the
-    // underlying allocation of this encode buffer and the nested encoding
-    // sequencing are not the caller's responsibility.
+  if (!status.ok() || log_entry.size_bytes() > max_log_entry_size_) {
+    // If an encoding failure occurs or the constructed log entry is larger
+    // than the configured max size, map the error to INTERNAL. If the
+    // underlying allocation of this encode buffer or the nested encoding
+    // sequencing are at fault, they are not the caller's responsibility. If
+    // the log entry is larger than the max allowed size, the log is dropped
+    // intentionally, and it is expected that the caller accepts this
+    // possibility.
     status = PW_STATUS_INTERNAL;
   } else {
     // Try to push back the encoded log entry.
@@ -78,26 +82,41 @@
   return Status::Ok();
 }
 
-Result<ConstByteSpan> LogQueue::Pop(ByteSpan entry_buffer) {
+Result<LogEntries> LogQueue::Pop(LogEntriesBuffer entry_buffer) {
   size_t ring_buffer_entry_size = 0;
+  PW_TRY(pop_status_for_test_);
+  // The caller must provide a buffer that is at minimum max_log_entry_size, to
+  // ensure that the front entry of the ring buffer can be popped.
+  PW_DCHECK_UINT_GE(entry_buffer.size_bytes(), max_log_entry_size_);
   PW_TRY(ring_buffer_.PeekFrontWithPreamble(entry_buffer,
                                             &ring_buffer_entry_size));
   PW_DCHECK_OK(ring_buffer_.PopFront());
-  return ConstByteSpan(entry_buffer.first(ring_buffer_entry_size));
+
+  return LogEntries{
+      .entries = ConstByteSpan(entry_buffer.first(ring_buffer_entry_size)),
+      .entry_count = 1};
 }
 
-Result<ConstByteSpan> LogQueue::PopMultiple(ByteSpan entries_buffer) {
+LogEntries LogQueue::PopMultiple(LogEntriesBuffer entries_buffer) {
   size_t offset = 0;
+  size_t entry_count = 0;
+
+  // The caller must provide a buffer that is at minimum max_log_entry_size, to
+  // ensure that the front entry of the ring buffer can be popped.
+  PW_DCHECK_UINT_GE(entries_buffer.size_bytes(), max_log_entry_size_);
+
   while (ring_buffer_.EntryCount() > 0 &&
-         (entries_buffer.size_bytes() - offset) >
-             ring_buffer_.FrontEntryTotalSizeBytes()) {
-    const Result<ConstByteSpan> result = Pop(entries_buffer.subspan(offset));
+         (entries_buffer.size_bytes() - offset) > max_log_entry_size_) {
+    const Result<LogEntries> result = Pop(entries_buffer.subspan(offset));
     if (!result.ok()) {
       break;
     }
-    offset += result.value().size_bytes();
+    offset += result.value().entries.size_bytes();
+    entry_count += result.value().entry_count;
   }
-  return ConstByteSpan(entries_buffer.first(offset));
+
+  return LogEntries{.entries = ConstByteSpan(entries_buffer.first(offset)),
+                    .entry_count = entry_count};
 }
 
 }  // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_queue_test.cc b/pw_log_rpc/log_queue_test.cc
index 2f648fd..f6a499a 100644
--- a/pw_log_rpc/log_queue_test.cc
+++ b/pw_log_rpc/log_queue_test.cc
@@ -16,7 +16,7 @@
 
 #include "gtest/gtest.h"
 #include "pw_log/levels.h"
-#include "pw_log_rpc_proto/log.pwpb.h"
+#include "pw_log_proto/log.pwpb.h"
 #include "pw_protobuf/decoder.h"
 
 namespace pw::log_rpc {
@@ -97,10 +97,11 @@
                 kTimestamp));
 
   std::byte log_entry[kEncodeBufferSize];
-  Result<ConstByteSpan> pop_result = log_queue.Pop(std::span(log_entry));
+  Result<LogEntries> pop_result = log_queue.Pop(std::span(log_entry));
   EXPECT_TRUE(pop_result.ok());
 
-  pw::protobuf::Decoder log_decoder(pop_result.value());
+  pw::protobuf::Decoder log_decoder(pop_result.value().entries);
+  EXPECT_EQ(pop_result.value().entry_count, 1U);
   VerifyLogEntry(log_decoder,
                  kTokenizedMessage,
                  kFlags,
@@ -111,12 +112,12 @@
 }
 
 TEST(LogQueue, MultiplePushPopTokenizedMessage) {
-  constexpr int kEntryCount = 3;
+  constexpr size_t kEntryCount = 3;
 
   std::byte log_buffer[1024];
   LogQueueWithEncodeBuffer<kEncodeBufferSize> log_queue(log_buffer);
 
-  for (int i = 0; i < kEntryCount; i++) {
+  for (size_t i = 0; i < kEntryCount; i++) {
     EXPECT_EQ(Status::OK,
               log_queue.PushTokenizedMessage(
                   std::as_bytes(std::span(kTokenizedMessage)),
@@ -128,11 +129,12 @@
   }
 
   std::byte log_entry[kEncodeBufferSize];
-  for (int i = 0; i < kEntryCount; i++) {
-    Result<ConstByteSpan> pop_result = log_queue.Pop(std::span(log_entry));
+  for (size_t i = 0; i < kEntryCount; i++) {
+    Result<LogEntries> pop_result = log_queue.Pop(std::span(log_entry));
     EXPECT_TRUE(pop_result.ok());
 
-    pw::protobuf::Decoder log_decoder(pop_result.value());
+    pw::protobuf::Decoder log_decoder(pop_result.value().entries);
+    EXPECT_EQ(pop_result.value().entry_count, 1U);
     VerifyLogEntry(log_decoder,
                    kTokenizedMessage,
                    kFlags,
@@ -144,12 +146,12 @@
 }
 
 TEST(LogQueue, PopMultiple) {
-  constexpr int kEntryCount = 3;
+  constexpr size_t kEntryCount = 3;
 
   std::byte log_buffer[kLogBufferSize];
   LogQueueWithEncodeBuffer<kEncodeBufferSize> log_queue(log_buffer);
 
-  for (int i = 0; i < kEntryCount; i++) {
+  for (size_t i = 0; i < kEntryCount; i++) {
     EXPECT_EQ(Status::OK,
               log_queue.PushTokenizedMessage(
                   std::as_bytes(std::span(kTokenizedMessage)),
@@ -161,11 +163,12 @@
   }
 
   std::byte log_entries[kLogBufferSize];
-  Result<ConstByteSpan> pop_result = log_queue.PopMultiple(log_entries);
+  Result<LogEntries> pop_result = log_queue.PopMultiple(log_entries);
   EXPECT_TRUE(pop_result.ok());
 
-  pw::protobuf::Decoder log_decoder(pop_result.value());
-  for (int i = 0; i < kEntryCount; i++) {
+  pw::protobuf::Decoder log_decoder(pop_result.value().entries);
+  EXPECT_EQ(pop_result.value().entry_count, kEntryCount);
+  for (size_t i = 0; i < kEntryCount; i++) {
     VerifyLogEntry(log_decoder,
                    kTokenizedMessage,
                    kFlags,
diff --git a/pw_log_rpc/logs_rpc.cc b/pw_log_rpc/logs_rpc.cc
new file mode 100644
index 0000000..ada5470
--- /dev/null
+++ b/pw_log_rpc/logs_rpc.cc
@@ -0,0 +1,75 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/logs_rpc.h"
+
+#include "pw_log/log.h"
+#include "pw_log_proto/log.pwpb.h"
+#include "pw_status/try.h"
+
+namespace pw::log_rpc {
+namespace {
+
+Result<ConstByteSpan> GenerateDroppedEntryMessage(ByteSpan encode_buffer,
+                                                  size_t dropped_entries) {
+  pw::protobuf::NestedEncoder nested_encoder(encode_buffer);
+  pw::log::LogEntry::Encoder encoder(&nested_encoder);
+  encoder.WriteDropped(dropped_entries);
+  return nested_encoder.Encode();
+}
+
+}  // namespace
+
+void Logs::Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer) {
+  response_writer_ = std::move(writer);
+}
+
+Status Logs::Flush() {
+  // If the response writer was not initialized or has since been closed,
+  // ignore the flush operation.
+  if (!response_writer_.open()) {
+    return Status::Ok();
+  }
+
+  // If previous calls to flush resulted in dropped entries, generate a
+  // dropped entry message and write it before further log messages.
+  if (dropped_entries_ > 0) {
+    ByteSpan payload = response_writer_.PayloadBuffer();
+    Result dropped_log = GenerateDroppedEntryMessage(payload, dropped_entries_);
+    PW_TRY(dropped_log.status());
+    PW_TRY(response_writer_.Write(dropped_log.value()));
+    dropped_entries_ = 0;
+  }
+
+  // Write logs to the response writer. An important limitation of this
+  // implementation is that if this RPC call fails, the logs are lost -
+  // a subsequent call to the RPC will produce a drop count message.
+  ByteSpan payload = response_writer_.PayloadBuffer();
+  Result possible_logs = log_queue_.PopMultiple(payload);
+  PW_TRY(possible_logs.status());
+  if (possible_logs.value().entry_count == 0) {
+    return Status::Ok();
+  }
+
+  Status status = response_writer_.Write(possible_logs.value().entries);
+  if (!status.ok()) {
+    // On a failure to send logs, track the dropped entries.
+    dropped_entries_ = possible_logs.value().entry_count;
+    return status;
+  }
+
+  return Status::Ok();
+}
+
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/logs_rpc_test.cc b/pw_log_rpc/logs_rpc_test.cc
new file mode 100644
index 0000000..50541bb
--- /dev/null
+++ b/pw_log_rpc/logs_rpc_test.cc
@@ -0,0 +1,132 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/logs_rpc.h"
+
+#include "gtest/gtest.h"
+#include "pw_log/log.h"
+#include "pw_rpc/raw_test_method_context.h"
+
+namespace pw::log_rpc {
+namespace {
+
+#define LOGS_METHOD_CONTEXT PW_RAW_TEST_METHOD_CONTEXT(Logs, Get)
+
+constexpr size_t kEncodeBufferSize = 128;
+constexpr size_t kLogBufferSize = 4096;
+
+class LogQueueTester : public LogQueueWithEncodeBuffer<kLogBufferSize> {
+ public:
+  LogQueueTester(ByteSpan log_queue)
+      : LogQueueWithEncodeBuffer<kLogBufferSize>(log_queue) {}
+
+  void SetPopStatus(Status error_status) {
+    pop_status_for_test_ = error_status;
+  }
+};
+
+class LogsService : public ::testing::Test {
+ public:
+  LogsService() : log_queue_(log_queue_buffer_) {}
+
+ protected:
+  void AddLogs(const size_t log_count = 1) {
+    constexpr char kTokenizedMessage[] = "message";
+    for (size_t i = 0; i < log_count; i++) {
+      EXPECT_EQ(
+          Status::Ok(),
+          log_queue_.PushTokenizedMessage(
+              std::as_bytes(std::span(kTokenizedMessage)), 0, 0, 0, 0, 0));
+    }
+  }
+
+  static Logs& GetLogs(LOGS_METHOD_CONTEXT& context) {
+    return (Logs&)(context.service());
+  }
+
+  std::array<std::byte, kEncodeBufferSize> log_queue_buffer_;
+  LogQueueWithEncodeBuffer<kLogBufferSize> log_queue_;
+};
+
+TEST_F(LogsService, Get) {
+  constexpr size_t kLogEntryCount = 3;
+  std::array<std::byte, 1> rpc_buffer;
+  LOGS_METHOD_CONTEXT context(log_queue_);
+
+  context.call(rpc_buffer);
+
+  // Flush all logs from the buffer, then close the RPC.
+  AddLogs(kLogEntryCount);
+  GetLogs(context).Flush();
+  GetLogs(context).Finish();
+
+  EXPECT_TRUE(context.done());
+  EXPECT_EQ(Status::Ok(), context.status());
+
+  // Although |kLogEntryCount| messages were in the queue, they are batched
+  // before being written to the client, so there is only one response.
+  EXPECT_EQ(1U, context.total_responses());
+}
+
+TEST_F(LogsService, GetMultiple) {
+  constexpr size_t kLogEntryCount = 1;
+  constexpr size_t kFlushCount = 3;
+  std::array<std::byte, 1> rpc_buffer;
+  LOGS_METHOD_CONTEXT context(log_queue_);
+
+  context.call(rpc_buffer);
+
+  for (size_t i = 0; i < kFlushCount; i++) {
+    AddLogs(kLogEntryCount);
+    GetLogs(context).Flush();
+  }
+  GetLogs(context).Finish();
+
+  EXPECT_TRUE(context.done());
+  EXPECT_EQ(Status::Ok(), context.status());
+  EXPECT_EQ(kFlushCount, context.total_responses());
+}
+
+TEST_F(LogsService, NoEntriesOnEmptyQueue) {
+  std::array<std::byte, 1> rpc_buffer;
+  LOGS_METHOD_CONTEXT context(log_queue_);
+
+  // Invoking flush with no logs in the queue should behave like a no-op.
+  context.call(rpc_buffer);
+  GetLogs(context).Flush();
+  GetLogs(context).Finish();
+
+  EXPECT_TRUE(context.done());
+  EXPECT_EQ(Status::Ok(), context.status());
+  EXPECT_EQ(0U, context.total_responses());
+}
+
+TEST_F(LogsService, QueueError) {
+  std::array<std::byte, 1> rpc_buffer;
+  LogQueueTester log_queue_tester(log_queue_buffer_);
+  LOGS_METHOD_CONTEXT context(log_queue_tester);
+
+  // Generate failure on log queue.
+  log_queue_tester.SetPopStatus(Status::Internal());
+  context.call(rpc_buffer);
+  GetLogs(context).Flush();
+  GetLogs(context).Finish();
+
+  EXPECT_TRUE(context.done());
+  EXPECT_EQ(Status::Ok(), context.status());
+  EXPECT_EQ(0U, context.total_responses());
+}
+
+}  // namespace
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_queue.h b/pw_log_rpc/public/pw_log_rpc/log_queue.h
index 4c35893..450d0fa 100644
--- a/pw_log_rpc/public/pw_log_rpc/log_queue.h
+++ b/pw_log_rpc/public/pw_log_rpc/log_queue.h
@@ -22,7 +22,7 @@
 
 // LogQueue is a ring-buffer queue of log messages. LogQueue is backed by
 // a caller-provided byte array and stores its messages in the format
-// dictated by the pw_rpc_log log.proto format.
+// dictated by the pw_log log.proto format.
 //
 // Logs can be returned as a repeated proto message and the output of this
 // class can be directly fed into an RPC stream.
@@ -36,11 +36,32 @@
 // 1) For single entires, LogQueue::Pop().
 // 2) For multiple entries, LogQueue::PopMultiple().
 namespace pw::log_rpc {
+namespace {
+constexpr size_t kLogEntryMaxSize = 100;
+}  // namespace
+
+using LogEntriesBuffer = ByteSpan;
+
+struct LogEntries {
+  // A buffer containing an encoded protobuf of type pw.log.LogEntries.
+  ConstByteSpan entries;
+  size_t entry_count;
+};
 
 class LogQueue {
  public:
-  LogQueue(ByteSpan log_buffer, ByteSpan encode_buffer)
-      : encode_buffer_(encode_buffer), ring_buffer_(true) {
+  // Constructs a LogQueue. Callers can optionally supply a maximum log entry
+  // size, which limits the size of messages that can be pushed into this log
+  // queue. When such an entry arrives, the queue increments its drop counter.
+  // Calls to Pop and PopMultiple should be provided a buffer of at least the
+  // configured max size.
+  LogQueue(ByteSpan log_buffer,
+           ByteSpan encode_buffer,
+           size_t max_log_entry_size = kLogEntryMaxSize)
+      : pop_status_for_test_(Status::Ok()),
+        max_log_entry_size_(max_log_entry_size),
+        encode_buffer_(encode_buffer),
+        ring_buffer_(true) {
     ring_buffer_.SetBuffer(log_buffer);
   }
 
@@ -55,12 +76,12 @@
   //  OK - success.
   //  FAILED_PRECONDITION - Failed when encoding the proto message.
   //  RESOURCE_EXHAUSTED - Not enough space in the buffer to write the entry.
-  pw::Status PushTokenizedMessage(ConstByteSpan message,
-                                  uint32_t flags,
-                                  uint32_t level,
-                                  uint32_t line,
-                                  uint32_t thread,
-                                  int64_t timestamp);
+  Status PushTokenizedMessage(ConstByteSpan message,
+                              uint32_t flags,
+                              uint32_t level,
+                              uint32_t line,
+                              uint32_t thread,
+                              int64_t timestamp);
 
   // Pop the oldest LogEntry from the queue into the provided buffer.
   // On success, the size is the length of the entry, on failure, the size is 0.
@@ -75,15 +96,22 @@
   //  bytes than the data size of the data chunk being read.  Available
   //  destination bytes were filled, remaining bytes of the data chunk were
   //  ignored.
-  pw::Result<ConstByteSpan> Pop(ByteSpan entry_buffer);
+  Result<LogEntries> Pop(LogEntriesBuffer entry_buffer);
 
   // Pop entries from the queue into the provided buffer. The provided buffer is
   // filled until there is insufficient space for the next log entry.
+  // Returns:
   //
-  //  OK - success.
-  pw::Result<ConstByteSpan> PopMultiple(ByteSpan entries_buffer);
+  // LogEntries - contains an encoded protobuf byte span of pw.log.LogEntries.
+  LogEntries PopMultiple(LogEntriesBuffer entries_buffer);
+
+ protected:
+  friend class LogQueueTester;
+  // For testing, status to return on calls to Pop.
+  Status pop_status_for_test_;
 
  private:
+  const size_t max_log_entry_size_;
   size_t dropped_entries_;
   int64_t latest_dropped_timestamp_;
 
diff --git a/pw_log_rpc/public/pw_log_rpc/logs_rpc.h b/pw_log_rpc/public/pw_log_rpc/logs_rpc.h
new file mode 100644
index 0000000..2bd354f
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/logs_rpc.h
@@ -0,0 +1,52 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include "pw_log/log.h"
+#include "pw_log_proto/log.raw_rpc.pb.h"
+#include "pw_log_rpc/log_queue.h"
+
+namespace pw::log_rpc {
+
+// The Logs RPC service will send logs when requested by Get(). For now, Get()
+// requests result in a stream of responses, containing all log entries from
+// the attached log queue.
+//
+// The Get() method will return logs in the current queue immediately, but
+// someone else is responsible for pumping the log queue using Flush().
+class Logs final : public pw::log::generated::Logs<Logs> {
+ public:
+  Logs(LogQueue& log_queue) : log_queue_(log_queue), dropped_entries_(0) {}
+
+  // RPC API for the Logs that produces a log stream. This method will
+  // return immediately, another class must call Flush() to push logs from
+  // the queue to this stream.
+  void Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer);
+
+  // Interface for the owner of the service instance to flush all existing
+  // logs to the writer, if one is attached.
+  Status Flush();
+
+  // Interface for the owner of the service instance to close the RPC, if
+  // one is attached.
+  void Finish() { response_writer_.Finish(); }
+
+ private:
+  LogQueue& log_queue_;
+  rpc::RawServerWriter response_writer_;
+  size_t dropped_entries_;
+};
+
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/pw_log_rpc_proto/log.proto b/pw_log_rpc/pw_log_proto/log.proto
similarity index 97%
rename from pw_log_rpc/pw_log_rpc_proto/log.proto
rename to pw_log_rpc/pw_log_proto/log.proto
index 052f866..2e38f42 100644
--- a/pw_log_rpc/pw_log_rpc_proto/log.proto
+++ b/pw_log_rpc/pw_log_proto/log.proto
@@ -14,15 +14,11 @@
 
 syntax = "proto2";
 
-package pw.log_rpc;
+package pw.log;
 
 option java_package = "pw.rpc.proto";
 option java_outer_classname = "Log";
 
-message Log {
-  repeated LogEntry entries = 1;
-}
-
 // A log with a tokenized message, a string message, or dropped indicator.  A
 // message can be one of three types:
 //
@@ -158,3 +154,12 @@
   optional string data_format_string = 21;
   optional bytes data = 22;
 }
+
+message LogRequest {}
+message LogEntries {
+  repeated LogEntry entries = 1;
+}
+
+service Logs {
+  rpc Get(LogRequest) returns (stream LogEntries) {}
+}