pw_log_rpc: Add RPC command for logs
Adds Logs.Get RPC to acquire the logs from log queue.
Change-Id: I5a3d93604906f4dd0471e50964fe36a61c66797c
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/21960
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_log_rpc/BUILD b/pw_log_rpc/BUILD
index e00d050..17c9649 100644
--- a/pw_log_rpc/BUILD
+++ b/pw_log_rpc/BUILD
@@ -23,6 +23,19 @@
licenses(["notice"]) # Apache License 2.0
pw_cc_library(
+ name = "pw_logs",
+ srcs = [ "logs_rpc.cc" ],
+ includes = [ "public" ],
+ deps = [
+ "//pw_bytes",
+ "//pw_result",
+ "//pw_ring_buffer",
+ "//pw_status",
+ ],
+ hdrs = [ "public/pw_log_rpc/logs_rpc.h" ]
+)
+
+pw_cc_library(
name = "pw_log_queue",
srcs = [ "log_queue.cc" ],
includes = [ "public" ],
@@ -37,7 +50,7 @@
)
pw_cc_test(
- name = "test",
+ name = "log_queue_test",
srcs = [
"log_queue_test.cc",
],
@@ -46,3 +59,14 @@
"//pw_unit_test",
],
)
+
+pw_cc_test(
+ name = "logs_rpc_test",
+ srcs = [
+ "logs_rpc_test.cc",
+ ],
+ deps = [
+ "//pw_preprocessor",
+ "//pw_unit_test",
+ ],
+)
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index b2eac44..2f308a0 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -24,6 +24,17 @@
visibility = [ ":*" ]
}
+pw_source_set("logs") {
+ public_configs = [ ":default_config" ]
+ public = [ "public/pw_log_rpc/logs_rpc.h" ]
+ sources = [ "logs_rpc.cc" ]
+ public_deps = [
+ ":log_queue",
+ ":protos.pwpb",
+ ":protos.raw_rpc",
+ ]
+}
+
pw_source_set("log_queue") {
public_configs = [ ":default_config" ]
public = [ "public/pw_log_rpc/log_queue.h" ]
@@ -38,8 +49,16 @@
deps = [ ":protos.pwpb" ]
}
+pw_test("logs_rpc_test") {
+ deps = [
+ ":logs",
+ "$dir_pw_rpc/raw:test_method_context",
+ ]
+ sources = [ "logs_rpc_test.cc" ]
+}
+
pw_proto_library("protos") {
- sources = [ "pw_log_rpc_proto/log.proto" ]
+ sources = [ "pw_log_proto/log.proto" ]
}
pw_doc_group("docs") {
@@ -56,5 +75,8 @@
}
pw_test_group("tests") {
- tests = [ ":log_queue_test" ]
+ tests = [
+ ":log_queue_test",
+ ":logs_rpc_test",
+ ]
}
diff --git a/pw_log_rpc/log_queue.cc b/pw_log_rpc/log_queue.cc
index e23ca16..a17cedf 100644
--- a/pw_log_rpc/log_queue.cc
+++ b/pw_log_rpc/log_queue.cc
@@ -15,7 +15,7 @@
#include "pw_log_rpc/log_queue.h"
#include "pw_log/levels.h"
-#include "pw_log_rpc_proto/log.pwpb.h"
+#include "pw_log_proto/log.pwpb.h"
#include "pw_protobuf/wire_format.h"
#include "pw_status/try.h"
@@ -24,7 +24,7 @@
using pw::protobuf::WireType;
constexpr std::byte kLogKey = static_cast<std::byte>(pw::protobuf::MakeKey(
- static_cast<uint32_t>(pw::log_rpc::Log::Fields::ENTRIES),
+ static_cast<uint32_t>(pw::log::LogEntries::Fields::ENTRIES),
WireType::kDelimited));
} // namespace
@@ -36,7 +36,7 @@
uint32_t thread,
int64_t timestamp) {
pw::protobuf::NestedEncoder nested_encoder(encode_buffer_);
- LogEntry::Encoder encoder(&nested_encoder);
+ pw::log::LogEntry::Encoder encoder(&nested_encoder);
Status status;
encoder.WriteMessageTokenized(message);
@@ -55,10 +55,14 @@
ConstByteSpan log_entry;
status = nested_encoder.Encode(&log_entry);
- if (!status.ok()) {
- // When encoding failures occur, map the error to INTERNAL, as the
- // underlying allocation of this encode buffer and the nested encoding
- // sequencing are not the caller's responsibility.
+ if (!status.ok() || log_entry.size_bytes() > max_log_entry_size_) {
+ // If an encoding failure occurs or the constructed log entry is larger
+ // than the configured max size, map the error to INTERNAL. If the
+ // underlying allocation of this encode buffer or the nested encoding
+ // sequencing are at fault, they are not the caller's responsibility. If
+ // the log entry is larger than the max allowed size, the log is dropped
+ // intentionally, and it is expected that the caller accepts this
+ // possibility.
status = PW_STATUS_INTERNAL;
} else {
// Try to push back the encoded log entry.
@@ -78,26 +82,41 @@
return Status::Ok();
}
-Result<ConstByteSpan> LogQueue::Pop(ByteSpan entry_buffer) {
+Result<LogEntries> LogQueue::Pop(LogEntriesBuffer entry_buffer) {
size_t ring_buffer_entry_size = 0;
+ PW_TRY(pop_status_for_test_);
+ // The caller must provide a buffer that is at minimum max_log_entry_size, to
+ // ensure that the front entry of the ring buffer can be popped.
+ PW_DCHECK_UINT_GE(entry_buffer.size_bytes(), max_log_entry_size_);
PW_TRY(ring_buffer_.PeekFrontWithPreamble(entry_buffer,
&ring_buffer_entry_size));
PW_DCHECK_OK(ring_buffer_.PopFront());
- return ConstByteSpan(entry_buffer.first(ring_buffer_entry_size));
+
+ return LogEntries{
+ .entries = ConstByteSpan(entry_buffer.first(ring_buffer_entry_size)),
+ .entry_count = 1};
}
-Result<ConstByteSpan> LogQueue::PopMultiple(ByteSpan entries_buffer) {
+LogEntries LogQueue::PopMultiple(LogEntriesBuffer entries_buffer) {
size_t offset = 0;
+ size_t entry_count = 0;
+
+ // The caller must provide a buffer that is at minimum max_log_entry_size, to
+ // ensure that the front entry of the ring buffer can be popped.
+ PW_DCHECK_UINT_GE(entries_buffer.size_bytes(), max_log_entry_size_);
+
while (ring_buffer_.EntryCount() > 0 &&
- (entries_buffer.size_bytes() - offset) >
- ring_buffer_.FrontEntryTotalSizeBytes()) {
- const Result<ConstByteSpan> result = Pop(entries_buffer.subspan(offset));
+ (entries_buffer.size_bytes() - offset) > max_log_entry_size_) {
+ const Result<LogEntries> result = Pop(entries_buffer.subspan(offset));
if (!result.ok()) {
break;
}
- offset += result.value().size_bytes();
+ offset += result.value().entries.size_bytes();
+ entry_count += result.value().entry_count;
}
- return ConstByteSpan(entries_buffer.first(offset));
+
+ return LogEntries{.entries = ConstByteSpan(entries_buffer.first(offset)),
+ .entry_count = entry_count};
}
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_queue_test.cc b/pw_log_rpc/log_queue_test.cc
index 2f648fd..f6a499a 100644
--- a/pw_log_rpc/log_queue_test.cc
+++ b/pw_log_rpc/log_queue_test.cc
@@ -16,7 +16,7 @@
#include "gtest/gtest.h"
#include "pw_log/levels.h"
-#include "pw_log_rpc_proto/log.pwpb.h"
+#include "pw_log_proto/log.pwpb.h"
#include "pw_protobuf/decoder.h"
namespace pw::log_rpc {
@@ -97,10 +97,11 @@
kTimestamp));
std::byte log_entry[kEncodeBufferSize];
- Result<ConstByteSpan> pop_result = log_queue.Pop(std::span(log_entry));
+ Result<LogEntries> pop_result = log_queue.Pop(std::span(log_entry));
EXPECT_TRUE(pop_result.ok());
- pw::protobuf::Decoder log_decoder(pop_result.value());
+ pw::protobuf::Decoder log_decoder(pop_result.value().entries);
+ EXPECT_EQ(pop_result.value().entry_count, 1U);
VerifyLogEntry(log_decoder,
kTokenizedMessage,
kFlags,
@@ -111,12 +112,12 @@
}
TEST(LogQueue, MultiplePushPopTokenizedMessage) {
- constexpr int kEntryCount = 3;
+ constexpr size_t kEntryCount = 3;
std::byte log_buffer[1024];
LogQueueWithEncodeBuffer<kEncodeBufferSize> log_queue(log_buffer);
- for (int i = 0; i < kEntryCount; i++) {
+ for (size_t i = 0; i < kEntryCount; i++) {
EXPECT_EQ(Status::OK,
log_queue.PushTokenizedMessage(
std::as_bytes(std::span(kTokenizedMessage)),
@@ -128,11 +129,12 @@
}
std::byte log_entry[kEncodeBufferSize];
- for (int i = 0; i < kEntryCount; i++) {
- Result<ConstByteSpan> pop_result = log_queue.Pop(std::span(log_entry));
+ for (size_t i = 0; i < kEntryCount; i++) {
+ Result<LogEntries> pop_result = log_queue.Pop(std::span(log_entry));
EXPECT_TRUE(pop_result.ok());
- pw::protobuf::Decoder log_decoder(pop_result.value());
+ pw::protobuf::Decoder log_decoder(pop_result.value().entries);
+ EXPECT_EQ(pop_result.value().entry_count, 1U);
VerifyLogEntry(log_decoder,
kTokenizedMessage,
kFlags,
@@ -144,12 +146,12 @@
}
TEST(LogQueue, PopMultiple) {
- constexpr int kEntryCount = 3;
+ constexpr size_t kEntryCount = 3;
std::byte log_buffer[kLogBufferSize];
LogQueueWithEncodeBuffer<kEncodeBufferSize> log_queue(log_buffer);
- for (int i = 0; i < kEntryCount; i++) {
+ for (size_t i = 0; i < kEntryCount; i++) {
EXPECT_EQ(Status::OK,
log_queue.PushTokenizedMessage(
std::as_bytes(std::span(kTokenizedMessage)),
@@ -161,11 +163,12 @@
}
std::byte log_entries[kLogBufferSize];
- Result<ConstByteSpan> pop_result = log_queue.PopMultiple(log_entries);
+ Result<LogEntries> pop_result = log_queue.PopMultiple(log_entries);
EXPECT_TRUE(pop_result.ok());
- pw::protobuf::Decoder log_decoder(pop_result.value());
- for (int i = 0; i < kEntryCount; i++) {
+ pw::protobuf::Decoder log_decoder(pop_result.value().entries);
+ EXPECT_EQ(pop_result.value().entry_count, kEntryCount);
+ for (size_t i = 0; i < kEntryCount; i++) {
VerifyLogEntry(log_decoder,
kTokenizedMessage,
kFlags,
diff --git a/pw_log_rpc/logs_rpc.cc b/pw_log_rpc/logs_rpc.cc
new file mode 100644
index 0000000..ada5470
--- /dev/null
+++ b/pw_log_rpc/logs_rpc.cc
@@ -0,0 +1,75 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/logs_rpc.h"
+
+#include "pw_log/log.h"
+#include "pw_log_proto/log.pwpb.h"
+#include "pw_status/try.h"
+
+namespace pw::log_rpc {
+namespace {
+
+Result<ConstByteSpan> GenerateDroppedEntryMessage(ByteSpan encode_buffer,
+ size_t dropped_entries) {
+ pw::protobuf::NestedEncoder nested_encoder(encode_buffer);
+ pw::log::LogEntry::Encoder encoder(&nested_encoder);
+ encoder.WriteDropped(dropped_entries);
+ return nested_encoder.Encode();
+}
+
+} // namespace
+
+void Logs::Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer) {
+ response_writer_ = std::move(writer);
+}
+
+Status Logs::Flush() {
+ // If the response writer was not initialized or has since been closed,
+ // ignore the flush operation.
+ if (!response_writer_.open()) {
+ return Status::Ok();
+ }
+
+ // If previous calls to flush resulted in dropped entries, generate a
+ // dropped entry message and write it before further log messages.
+ if (dropped_entries_ > 0) {
+ ByteSpan payload = response_writer_.PayloadBuffer();
+ Result dropped_log = GenerateDroppedEntryMessage(payload, dropped_entries_);
+ PW_TRY(dropped_log.status());
+ PW_TRY(response_writer_.Write(dropped_log.value()));
+ dropped_entries_ = 0;
+ }
+
+ // Write logs to the response writer. An important limitation of this
+ // implementation is that if this RPC call fails, the logs are lost -
+ // a subsequent call to the RPC will produce a drop count message.
+ ByteSpan payload = response_writer_.PayloadBuffer();
+ Result possible_logs = log_queue_.PopMultiple(payload);
+ PW_TRY(possible_logs.status());
+ if (possible_logs.value().entry_count == 0) {
+ return Status::Ok();
+ }
+
+ Status status = response_writer_.Write(possible_logs.value().entries);
+ if (!status.ok()) {
+ // On a failure to send logs, track the dropped entries.
+ dropped_entries_ = possible_logs.value().entry_count;
+ return status;
+ }
+
+ return Status::Ok();
+}
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/logs_rpc_test.cc b/pw_log_rpc/logs_rpc_test.cc
new file mode 100644
index 0000000..50541bb
--- /dev/null
+++ b/pw_log_rpc/logs_rpc_test.cc
@@ -0,0 +1,132 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/logs_rpc.h"
+
+#include "gtest/gtest.h"
+#include "pw_log/log.h"
+#include "pw_rpc/raw_test_method_context.h"
+
+namespace pw::log_rpc {
+namespace {
+
+#define LOGS_METHOD_CONTEXT PW_RAW_TEST_METHOD_CONTEXT(Logs, Get)
+
+constexpr size_t kEncodeBufferSize = 128;
+constexpr size_t kLogBufferSize = 4096;
+
+class LogQueueTester : public LogQueueWithEncodeBuffer<kLogBufferSize> {
+ public:
+ LogQueueTester(ByteSpan log_queue)
+ : LogQueueWithEncodeBuffer<kLogBufferSize>(log_queue) {}
+
+ void SetPopStatus(Status error_status) {
+ pop_status_for_test_ = error_status;
+ }
+};
+
+class LogsService : public ::testing::Test {
+ public:
+ LogsService() : log_queue_(log_queue_buffer_) {}
+
+ protected:
+ void AddLogs(const size_t log_count = 1) {
+ constexpr char kTokenizedMessage[] = "message";
+ for (size_t i = 0; i < log_count; i++) {
+ EXPECT_EQ(
+ Status::Ok(),
+ log_queue_.PushTokenizedMessage(
+ std::as_bytes(std::span(kTokenizedMessage)), 0, 0, 0, 0, 0));
+ }
+ }
+
+ static Logs& GetLogs(LOGS_METHOD_CONTEXT& context) {
+ return (Logs&)(context.service());
+ }
+
+ std::array<std::byte, kEncodeBufferSize> log_queue_buffer_;
+ LogQueueWithEncodeBuffer<kLogBufferSize> log_queue_;
+};
+
+TEST_F(LogsService, Get) {
+ constexpr size_t kLogEntryCount = 3;
+ std::array<std::byte, 1> rpc_buffer;
+ LOGS_METHOD_CONTEXT context(log_queue_);
+
+ context.call(rpc_buffer);
+
+ // Flush all logs from the buffer, then close the RPC.
+ AddLogs(kLogEntryCount);
+ GetLogs(context).Flush();
+ GetLogs(context).Finish();
+
+ EXPECT_TRUE(context.done());
+ EXPECT_EQ(Status::Ok(), context.status());
+
+ // Although |kLogEntryCount| messages were in the queue, they are batched
+ // before being written to the client, so there is only one response.
+ EXPECT_EQ(1U, context.total_responses());
+}
+
+TEST_F(LogsService, GetMultiple) {
+ constexpr size_t kLogEntryCount = 1;
+ constexpr size_t kFlushCount = 3;
+ std::array<std::byte, 1> rpc_buffer;
+ LOGS_METHOD_CONTEXT context(log_queue_);
+
+ context.call(rpc_buffer);
+
+ for (size_t i = 0; i < kFlushCount; i++) {
+ AddLogs(kLogEntryCount);
+ GetLogs(context).Flush();
+ }
+ GetLogs(context).Finish();
+
+ EXPECT_TRUE(context.done());
+ EXPECT_EQ(Status::Ok(), context.status());
+ EXPECT_EQ(kFlushCount, context.total_responses());
+}
+
+TEST_F(LogsService, NoEntriesOnEmptyQueue) {
+ std::array<std::byte, 1> rpc_buffer;
+ LOGS_METHOD_CONTEXT context(log_queue_);
+
+ // Invoking flush with no logs in the queue should behave like a no-op.
+ context.call(rpc_buffer);
+ GetLogs(context).Flush();
+ GetLogs(context).Finish();
+
+ EXPECT_TRUE(context.done());
+ EXPECT_EQ(Status::Ok(), context.status());
+ EXPECT_EQ(0U, context.total_responses());
+}
+
+TEST_F(LogsService, QueueError) {
+ std::array<std::byte, 1> rpc_buffer;
+ LogQueueTester log_queue_tester(log_queue_buffer_);
+ LOGS_METHOD_CONTEXT context(log_queue_tester);
+
+ // Generate failure on log queue.
+ log_queue_tester.SetPopStatus(Status::Internal());
+ context.call(rpc_buffer);
+ GetLogs(context).Flush();
+ GetLogs(context).Finish();
+
+ EXPECT_TRUE(context.done());
+ EXPECT_EQ(Status::Ok(), context.status());
+ EXPECT_EQ(0U, context.total_responses());
+}
+
+} // namespace
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_queue.h b/pw_log_rpc/public/pw_log_rpc/log_queue.h
index 4c35893..450d0fa 100644
--- a/pw_log_rpc/public/pw_log_rpc/log_queue.h
+++ b/pw_log_rpc/public/pw_log_rpc/log_queue.h
@@ -22,7 +22,7 @@
// LogQueue is a ring-buffer queue of log messages. LogQueue is backed by
// a caller-provided byte array and stores its messages in the format
-// dictated by the pw_rpc_log log.proto format.
+// dictated by the pw_log log.proto format.
//
// Logs can be returned as a repeated proto message and the output of this
// class can be directly fed into an RPC stream.
@@ -36,11 +36,32 @@
// 1) For single entires, LogQueue::Pop().
// 2) For multiple entries, LogQueue::PopMultiple().
namespace pw::log_rpc {
+namespace {
+constexpr size_t kLogEntryMaxSize = 100;
+} // namespace
+
+using LogEntriesBuffer = ByteSpan;
+
+struct LogEntries {
+ // A buffer containing an encoded protobuf of type pw.log.LogEntries.
+ ConstByteSpan entries;
+ size_t entry_count;
+};
class LogQueue {
public:
- LogQueue(ByteSpan log_buffer, ByteSpan encode_buffer)
- : encode_buffer_(encode_buffer), ring_buffer_(true) {
+ // Constructs a LogQueue. Callers can optionally supply a maximum log entry
+ // size, which limits the size of messages that can be pushed into this log
+ // queue. When such an entry arrives, the queue increments its drop counter.
+ // Calls to Pop and PopMultiple should be provided a buffer of at least the
+ // configured max size.
+ LogQueue(ByteSpan log_buffer,
+ ByteSpan encode_buffer,
+ size_t max_log_entry_size = kLogEntryMaxSize)
+ : pop_status_for_test_(Status::Ok()),
+ max_log_entry_size_(max_log_entry_size),
+ encode_buffer_(encode_buffer),
+ ring_buffer_(true) {
ring_buffer_.SetBuffer(log_buffer);
}
@@ -55,12 +76,12 @@
// OK - success.
// FAILED_PRECONDITION - Failed when encoding the proto message.
// RESOURCE_EXHAUSTED - Not enough space in the buffer to write the entry.
- pw::Status PushTokenizedMessage(ConstByteSpan message,
- uint32_t flags,
- uint32_t level,
- uint32_t line,
- uint32_t thread,
- int64_t timestamp);
+ Status PushTokenizedMessage(ConstByteSpan message,
+ uint32_t flags,
+ uint32_t level,
+ uint32_t line,
+ uint32_t thread,
+ int64_t timestamp);
// Pop the oldest LogEntry from the queue into the provided buffer.
// On success, the size is the length of the entry, on failure, the size is 0.
@@ -75,15 +96,22 @@
// bytes than the data size of the data chunk being read. Available
// destination bytes were filled, remaining bytes of the data chunk were
// ignored.
- pw::Result<ConstByteSpan> Pop(ByteSpan entry_buffer);
+ Result<LogEntries> Pop(LogEntriesBuffer entry_buffer);
// Pop entries from the queue into the provided buffer. The provided buffer is
// filled until there is insufficient space for the next log entry.
+ // Returns:
//
- // OK - success.
- pw::Result<ConstByteSpan> PopMultiple(ByteSpan entries_buffer);
+ // LogEntries - contains an encoded protobuf byte span of pw.log.LogEntries.
+ LogEntries PopMultiple(LogEntriesBuffer entries_buffer);
+
+ protected:
+ friend class LogQueueTester;
+ // For testing, status to return on calls to Pop.
+ Status pop_status_for_test_;
private:
+ const size_t max_log_entry_size_;
size_t dropped_entries_;
int64_t latest_dropped_timestamp_;
diff --git a/pw_log_rpc/public/pw_log_rpc/logs_rpc.h b/pw_log_rpc/public/pw_log_rpc/logs_rpc.h
new file mode 100644
index 0000000..2bd354f
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/logs_rpc.h
@@ -0,0 +1,52 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include "pw_log/log.h"
+#include "pw_log_proto/log.raw_rpc.pb.h"
+#include "pw_log_rpc/log_queue.h"
+
+namespace pw::log_rpc {
+
+// The Logs RPC service will send logs when requested by Get(). For now, Get()
+// requests result in a stream of responses, containing all log entries from
+// the attached log queue.
+//
+// The Get() method will return logs in the current queue immediately, but
+// someone else is responsible for pumping the log queue using Flush().
+class Logs final : public pw::log::generated::Logs<Logs> {
+ public:
+ Logs(LogQueue& log_queue) : log_queue_(log_queue), dropped_entries_(0) {}
+
+ // RPC API for the Logs that produces a log stream. This method will
+ // return immediately, another class must call Flush() to push logs from
+ // the queue to this stream.
+ void Get(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer);
+
+ // Interface for the owner of the service instance to flush all existing
+ // logs to the writer, if one is attached.
+ Status Flush();
+
+ // Interface for the owner of the service instance to close the RPC, if
+ // one is attached.
+ void Finish() { response_writer_.Finish(); }
+
+ private:
+ LogQueue& log_queue_;
+ rpc::RawServerWriter response_writer_;
+ size_t dropped_entries_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/pw_log_rpc_proto/log.proto b/pw_log_rpc/pw_log_proto/log.proto
similarity index 97%
rename from pw_log_rpc/pw_log_rpc_proto/log.proto
rename to pw_log_rpc/pw_log_proto/log.proto
index 052f866..2e38f42 100644
--- a/pw_log_rpc/pw_log_rpc_proto/log.proto
+++ b/pw_log_rpc/pw_log_proto/log.proto
@@ -14,15 +14,11 @@
syntax = "proto2";
-package pw.log_rpc;
+package pw.log;
option java_package = "pw.rpc.proto";
option java_outer_classname = "Log";
-message Log {
- repeated LogEntry entries = 1;
-}
-
// A log with a tokenized message, a string message, or dropped indicator. A
// message can be one of three types:
//
@@ -158,3 +154,12 @@
optional string data_format_string = 21;
optional bytes data = 22;
}
+
+message LogRequest {}
+message LogEntries {
+ repeated LogEntry entries = 1;
+}
+
+service Logs {
+ rpc Get(LogRequest) returns (stream LogEntries) {}
+}