pw_{log, log_rpc}: Add log filters
- Add log filter proto message that contains rules for filtering logs
based on level, flags, and module information. Each filter has an
identifiable byte array, which can be a token or human readable string,
and is used to configure new filter rules.
- Add LogService methods to get and modify log filters.
- RpcLogDrain can get an optional filter to check if a log should be
dropped.
- Remove unused RpcLogDrain constructor with open server writer.
- Avoid sending empty packets in RpcLogEntry if all messages are
filtered out.
Requires: pigweed-internal:18160
Change-Id: I7fd299e6f281432f99c6a1bd8ede4223ca8e0a3d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/65000
Reviewed-by: Ewout van Bekkum <ewout@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_log/BUILD.bazel b/pw_log/BUILD.bazel
index 6bb0a87..adc8f9e 100644
--- a/pw_log/BUILD.bazel
+++ b/pw_log/BUILD.bazel
@@ -71,7 +71,10 @@
],
import_prefix = "pw_log/proto",
strip_import_prefix = "//pw_log",
- deps = ["//pw_tokenizer:tokenizer_proto"],
+ deps = [
+ "//pw_protobuf:common_protos",
+ "//pw_tokenizer:tokenizer_proto",
+ ],
)
pw_proto_library(
diff --git a/pw_log/BUILD.gn b/pw_log/BUILD.gn
index 9b64284..0da366b 100644
--- a/pw_log/BUILD.gn
+++ b/pw_log/BUILD.gn
@@ -123,7 +123,10 @@
pw_proto_library("protos") {
sources = [ "log.proto" ]
prefix = "pw_log/proto"
- deps = [ "$dir_pw_tokenizer:proto" ]
+ deps = [
+ "$dir_pw_protobuf:common_protos",
+ "$dir_pw_tokenizer:proto",
+ ]
}
pw_doc_group("docs") {
diff --git a/pw_log/log.proto b/pw_log/log.proto
index 6226d2b..046041f 100644
--- a/pw_log/log.proto
+++ b/pw_log/log.proto
@@ -16,6 +16,7 @@
package pw.log;
+import "pw_protobuf_protos/common.proto";
import "pw_tokenizer/proto/options.proto";
option java_outer_classname = "Log";
@@ -167,7 +168,62 @@
repeated LogEntry entries = 1;
}
+message FilterRule {
+ // Log level values match pw_log/levels.h. Enum names avoid collissions with
+ // possible macros.
+ enum Level {
+ ANY_LEVEL = 0;
+ DEBUG_LEVEL = 1;
+ INFO_LEVEL = 2;
+ WARN_LEVEL = 3;
+ ERROR_LEVEL = 4;
+ CRITICAL_LEVEL = 5;
+ FATAL_LEVEL = 7;
+ };
+ // Condition 1: log.level >= level_greater_than_or_equal.
+ Level level_greater_than_or_equal = 1;
+
+ // Condition 2: (module_equals.size() == 0) || (log.module == module_equals);
+ bytes module_equals = 2 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+
+ // Condition 3: (any_flags_set == 0) || (log.flags & any_flags_set) != 0))
+ uint32 any_flags_set = 3;
+
+ // Action to take if all conditions are met and rule is not inactive.
+ enum Action {
+ INACTIVE = 0; // Ignore the rule entirely.
+ KEEP = 1; // Keep the log entry if all conditions are met.
+ DROP = 2; // Drop the log entry if all conditions are met
+ };
+ Action action = 4;
+}
+
+// A filter is a series of rules. First matching rule wins.
+message Filter {
+ repeated FilterRule rule = 1;
+}
+
+message SetFilterRequest {
+ // A filter can be identified by a human readable string, token, or number.
+ bytes filter_id = 1 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+
+ Filter filter = 2;
+}
+
+message GetFilterRequest {
+ bytes filter_id = 1 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+}
+
+message FilterIdListRequest {}
+
+message FilterIdListResponse {
+ repeated bytes filter_id = 1 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+}
+
// RPC service for accessing logs.
service Logs {
rpc Listen(LogRequest) returns (stream LogEntries);
+ rpc SetFilter(SetFilterRequest) returns (pw.protobuf.Empty);
+ rpc GetFilter(GetFilterRequest) returns (Filter);
+ rpc ListFilterIds(FilterIdListRequest) returns (FilterIdListResponse);
}
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index 517b457..64dce44 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -28,10 +28,34 @@
hdrs = ["public/pw_log_rpc/log_service.h"],
includes = ["public"],
deps = [
+ ":log_filter",
":rpc_log_drain",
"//pw_log",
"//pw_log:log_pwpb",
"//pw_log:protos.raw_rpc",
+ "//pw_protobuf",
+ "//pw_protobuf:bytes_utils",
+ ],
+)
+
+pw_cc_library(
+ name = "log_filter",
+ srcs = ["log_filter.cc"],
+ hdrs = [
+ "public/pw_log_rpc/log_filter.h",
+ "public/pw_log_rpc/log_filter_map.h",
+ ],
+ includes = ["public"],
+ deps = [
+ "public/pw_log_rpc/internal/config.h",
+ "//pw_assert",
+ "//pw_bytes",
+ "//pw_containers:vector",
+ "//pw_log",
+ "//pw_log:log_pwpb",
+ "//pw_log:protos.pwpb",
+ "//pw_protobuf",
+ "//pw_status",
],
)
@@ -44,6 +68,7 @@
],
includes = ["public"],
deps = [
+ ":log_filter",
"//pw_assert",
"//pw_log:log_pwpb",
"//pw_log:protos.raw_rpc",
@@ -74,15 +99,15 @@
pw_cc_test(
name = "log_service_test",
- srcs = [
- "log_service_test.cc",
- ],
+ srcs = ["log_service_test.cc"],
deps = [
+ ":log_filter",
":log_service",
"//pw_containers:vector",
"//pw_log",
"//pw_log:log_pwpb",
"//pw_log:proto_utils",
+ "//pw_log_tokenized:metadata",
"//pw_protobuf",
"//pw_protobuf:bytes_utils",
"//pw_result",
@@ -93,10 +118,22 @@
)
pw_cc_test(
- name = "rpc_log_drain_test",
- srcs = [
- "rpc_log_drain_test.cc",
+ name = "log_filter_test",
+ srcs = ["log_filter_test.cc"],
+ deps = [
+ ":log_filter",
+ "//pw_log:log_pwpb",
+ "//pw_log:proto_utils",
+ "//pw_log_tokenized:metadata",
+ "//pw_result",
+ "//pw_status",
+ "//pw_unit_test",
],
+)
+
+pw_cc_test(
+ name = "rpc_log_drain_test",
+ srcs = ["rpc_log_drain_test.cc"],
deps = [
":log_service",
":rpc_log_drain",
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 1d05ed5..d974183 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -18,33 +18,66 @@
import("$dir_pw_docgen/docs.gni")
import("$dir_pw_unit_test/test.gni")
-config("default_config") {
+config("public_include_path") {
include_dirs = [ "public" ]
visibility = [ ":*" ]
}
+pw_source_set("config") {
+ sources = [ "public/pw_log_rpc/internal/config.h" ]
+ public_configs = [ ":public_include_path" ]
+ visibility = [ "./*" ]
+ friend = [ "./*" ]
+}
+
pw_source_set("log_service") {
- public_configs = [ ":default_config" ]
+ public_configs = [ ":public_include_path" ]
public = [ "public/pw_log_rpc/log_service.h" ]
sources = [ "log_service.cc" ]
deps = [
"$dir_pw_log",
"$dir_pw_log:protos.pwpb",
+ "$dir_pw_protobuf",
]
public_deps = [
+ ":log_filter",
":rpc_log_drain",
"$dir_pw_log:protos.raw_rpc",
+ "$dir_pw_protobuf:bytes_utils",
+ ]
+}
+
+pw_source_set("log_filter") {
+ public_configs = [ ":public_include_path" ]
+ public = [
+ "public/pw_log_rpc/log_filter.h",
+ "public/pw_log_rpc/log_filter_map.h",
+ ]
+ sources = [ "log_filter.cc" ]
+ deps = [
+ "$dir_pw_log",
+ "$dir_pw_protobuf",
+ ]
+ public_deps = [
+ ":config",
+ "$dir_pw_assert",
+ "$dir_pw_bytes",
+ "$dir_pw_containers:vector",
+ "$dir_pw_log:protos.pwpb",
+ "$dir_pw_protobuf",
+ "$dir_pw_status",
]
}
pw_source_set("rpc_log_drain") {
- public_configs = [ ":default_config" ]
+ public_configs = [ ":public_include_path" ]
public = [
"public/pw_log_rpc/rpc_log_drain.h",
"public/pw_log_rpc/rpc_log_drain_map.h",
]
sources = [ "rpc_log_drain.cc" ]
public_deps = [
+ ":log_filter",
"$dir_pw_assert",
"$dir_pw_log:protos.pwpb",
"$dir_pw_log:protos.raw_rpc",
@@ -58,7 +91,7 @@
}
pw_source_set("rpc_log_drain_thread") {
- public_configs = [ ":default_config" ]
+ public_configs = [ ":public_include_path" ]
public = [ "public/pw_log_rpc/rpc_log_drain_thread.h" ]
sources = []
public_deps = [
@@ -76,11 +109,13 @@
pw_test("log_service_test") {
sources = [ "log_service_test.cc" ]
deps = [
+ ":log_filter",
":log_service",
"$dir_pw_containers:vector",
"$dir_pw_log",
"$dir_pw_log:proto_utils",
"$dir_pw_log:protos.pwpb",
+ "$dir_pw_log_tokenized:metadata",
"$dir_pw_protobuf",
"$dir_pw_protobuf:bytes_utils",
"$dir_pw_result",
@@ -89,6 +124,18 @@
]
}
+pw_test("log_filter_test") {
+ sources = [ "log_filter_test.cc" ]
+ deps = [
+ ":log_filter",
+ "$dir_pw_log:proto_utils",
+ "$dir_pw_log:protos.pwpb",
+ "$dir_pw_log_tokenized:metadata",
+ "$dir_pw_result",
+ "$dir_pw_status",
+ ]
+}
+
pw_test("rpc_log_drain_test") {
sources = [ "rpc_log_drain_test.cc" ]
deps = [
@@ -105,6 +152,7 @@
pw_test_group("tests") {
tests = [
+ ":log_filter_test",
":log_service_test",
":rpc_log_drain_test",
]
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst
index af866a9..d0dfd91 100644
--- a/pw_log_rpc/docs.rst
+++ b/pw_log_rpc/docs.rst
@@ -1,14 +1,15 @@
.. _module-pw_log_rpc:
-----------
+==========
pw_log_rpc
-----------
-An RPC-based logging backend for Pigweed.
+==========
+An RPC-based logging solution for Pigweed with log filtering and log drops
+reporting -- coming soon!
.. warning::
This module is under construction and might change in the future.
-How to use
+How to Use
==========
1. Set up RPC
-------------
@@ -27,13 +28,15 @@
``pw_tokenizer_HandleEncodedMessageWithPayload``, encode log entries in the
``log::LogEntry`` format, and add them to the ``MultiSink``.
-4. Create log drains
---------------------
+4. Create log drains and filters
+--------------------------------
Create an ``RpcLogDrainMap`` with one ``RpcLogDrain`` for each RPC channel used
-to stream logs. Provide this map to the ``LogService`` and register the latter
+to stream logs. Optionally, create a ``FilterMap`` with ``Filter`` objects with
+different IDs. Provide these map to the ``LogService`` and register the latter
with the application's RPC service. The ``RpcLogDrainMap`` provides a convenient
way to access and maintain each ``RpcLogDrain``. Attach each ``RpcLogDrain`` to
-the ``MultiSink``.
+the ``MultiSink``. Optionally, set the ``RpcLogDrain`` callback to decide if a
+log should be kept or dropped. This callback can be ``Filter::ShouldDropLog``.
5. Flush the log drains in the background
-----------------------------------------
@@ -80,16 +83,19 @@
pw_rpc-->computer[Computer];
pw_rpc-->other_listener[Other log<br>listener];
+Components Overview
+===================
RPC log service
-===============
+---------------
The ``LogService`` class is an RPC service that provides a way to request a log
-stream sent via RPC. Thus, it helps avoid using a different protocol for logs
-and RPCs over the same interface(s). It requires a map of ``RpcLogDrains`` to
-assign stream writers and delegate the log stream flushing to the user's
-preferred method.
+stream sent via RPC and configure log filters. Thus, it helps avoid using a
+different protocol for logs and RPCs over the same interface(s).
+It requires a ``RpcLogDrainMap`` to assign stream writers and delegate the
+log stream flushing to the user's preferred method, as well as a ``FilterMap``
+to retrieve and modify filters.
RpcLogDrain
-===========
+-----------
An ``RpcLogDrain`` reads from the ``MultiSink`` instance that buffers logs, then
packs, and sends the retrieved log entries to the log listener. One
``RpcLogDrain`` is needed for each log listener. An ``RpcLogDrain`` needs a
@@ -124,12 +130,12 @@
count with the logs if desired.
RpcLogDrainMap
-==============
+--------------
Provides a convenient way to access all or a single ``RpcLogDrain`` by its RPC
channel ID.
RpcLogDrainThread
-=================
+-----------------
The module includes a sample thread that flushes each drain sequentially. Future
work might replace this with enqueueing the flush work on a work queue. The user
can also choose to have different threads flushing individual ``RpcLogDrain``\s
@@ -138,6 +144,32 @@
Calling ``OpenUnrequestedLogStream()`` is a convenient way to set up a log
stream that is started without the need to receive an RCP request for logs.
+Filter::Rule
+------------
+Contains a set of values that are compared against a log when set. All
+conditions must be met for the rule to be met.
+
+- ``action``: drops or keeps the log if the other conditions match.
+ The rule is ignored when inactive.
+
+- ``any_flags_set``: the condition is met if this value is 0 or the log has any
+ of these flags set.
+
+- ``level_greater_than_or_equal``: the condition is met when the log level is
+ greater than or equal to this value.
+
+- ``module_equals``: the condition is met if this byte array is empty, or the
+ log module equals the contents of this byte array.
+
+Filter
+------
+``Filter`` encapsulates a collection of zero or more ``Filter::Rule``\s and has
+an ID used to modify or retrieve its contents.
+
+FilterMap
+---------
+Provides a convenient way to retrieve register filters by ID.
+
Logging example
===============
The following code shows a sample setup to defer the log handling to the
diff --git a/pw_log_rpc/log_filter.cc b/pw_log_rpc/log_filter.cc
new file mode 100644
index 0000000..5385be4
--- /dev/null
+++ b/pw_log_rpc/log_filter.cc
@@ -0,0 +1,131 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/log_filter.h"
+
+#include "pw_log/levels.h"
+#include "pw_protobuf/decoder.h"
+#include "pw_status/try.h"
+
+namespace pw::log_rpc {
+namespace {
+
+// Returns true if the provided log parameters match the given filter rule.
+bool IsRuleMet(const Filter::Rule& rule,
+ uint32_t level,
+ ConstByteSpan module,
+ uint32_t flags) {
+ if (level < static_cast<uint32_t>(rule.level_greater_than_or_equal)) {
+ return false;
+ }
+ if ((rule.any_flags_set != 0) && ((flags & rule.any_flags_set) == 0)) {
+ return false;
+ }
+ if (!rule.module_equals.empty() && !std::equal(module.begin(),
+ module.end(),
+ rule.module_equals.begin(),
+ rule.module_equals.end())) {
+ return false;
+ }
+ return true;
+}
+
+} // namespace
+
+Status Filter::UpdateRulesFromProto(ConstByteSpan buffer) {
+ if (rules_.empty()) {
+ return Status::FailedPrecondition();
+ }
+
+ // Reset rules.
+ for (auto& rule : rules_) {
+ rule = {};
+ }
+
+ protobuf::Decoder decoder(buffer);
+ Status status;
+ for (size_t i = 0; (i < rules_.size()) && (status = decoder.Next()).ok();
+ ++i) {
+ ConstByteSpan rule_buffer;
+ PW_TRY(decoder.ReadBytes(&rule_buffer));
+ protobuf::Decoder rule_decoder(rule_buffer);
+ while ((status = rule_decoder.Next()).ok()) {
+ switch (
+ static_cast<log::FilterRule::Fields>(rule_decoder.FieldNumber())) {
+ case log::FilterRule::Fields::LEVEL_GREATER_THAN_OR_EQUAL:
+ PW_TRY(rule_decoder.ReadUint32(reinterpret_cast<uint32_t*>(
+ &rules_[i].level_greater_than_or_equal)));
+ break;
+ case log::FilterRule::Fields::MODULE_EQUALS: {
+ ConstByteSpan module;
+ PW_TRY(rule_decoder.ReadBytes(&module));
+ if (module.size() > rules_[i].module_equals.max_size()) {
+ return Status::InvalidArgument();
+ }
+ rules_[i].module_equals.assign(module.begin(), module.end());
+ } break;
+ case log::FilterRule::Fields::ANY_FLAGS_SET:
+ PW_TRY(rule_decoder.ReadUint32(&rules_[i].any_flags_set));
+ break;
+ case log::FilterRule::Fields::ACTION:
+ PW_TRY(rule_decoder.ReadUint32(
+ reinterpret_cast<uint32_t*>(&rules_[i].action)));
+ break;
+ }
+ }
+ }
+ return status.IsOutOfRange() ? OkStatus() : status;
+}
+
+bool Filter::ShouldDropLog(ConstByteSpan entry) const {
+ if (rules_.empty()) {
+ return false;
+ }
+
+ uint32_t log_level = 0;
+ ConstByteSpan log_module;
+ uint32_t log_flags = 0;
+ protobuf::Decoder decoder(entry);
+ while (decoder.Next().ok()) {
+ switch (static_cast<log::LogEntry::Fields>(decoder.FieldNumber())) {
+ case log::LogEntry::Fields::LINE_LEVEL:
+ if (decoder.ReadUint32(&log_level).ok()) {
+ log_level &= PW_LOG_LEVEL_BITMASK;
+ }
+ break;
+ case log::LogEntry::Fields::MODULE:
+ decoder.ReadBytes(&log_module).IgnoreError();
+ break;
+ case log::LogEntry::Fields::FLAGS:
+ decoder.ReadUint32(&log_flags).IgnoreError();
+ break;
+ default:
+ break;
+ }
+ }
+
+ // Follow the action of the first rule whose condition is met.
+ for (const auto& rule : rules_) {
+ if (rule.action == Filter::Rule::Action::kInactive) {
+ continue;
+ }
+ if (IsRuleMet(rule, log_level, log_module, log_flags)) {
+ return rule.action == Filter::Rule::Action::kDrop;
+ }
+ }
+
+ return false;
+}
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_filter_test.cc b/pw_log_rpc/log_filter_test.cc
new file mode 100644
index 0000000..fa41215
--- /dev/null
+++ b/pw_log_rpc/log_filter_test.cc
@@ -0,0 +1,514 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/log_filter.h"
+
+#include <array>
+#include <cstddef>
+#include <cstring>
+
+#include "gtest/gtest.h"
+#include "pw_bytes/endian.h"
+#include "pw_log/levels.h"
+#include "pw_log/log.h"
+#include "pw_log/proto/log.pwpb.h"
+#include "pw_log/proto_utils.h"
+#include "pw_log_rpc/log_filter_map.h"
+#include "pw_log_tokenized/metadata.h"
+#include "pw_result/result.h"
+#include "pw_status/status.h"
+#include "pw_status/try.h"
+
+namespace pw::log_rpc {
+namespace {
+
+constexpr uint32_t kSampleModule = 0x1234;
+constexpr uint32_t kSampleFlags = 0x3;
+constexpr char kSampleMessage[] = "message";
+constexpr auto kSampleModuleLittleEndian =
+ bytes::CopyInOrder<uint32_t>(std::endian::little, kSampleModule);
+
+// Creates and encodes a log entry in the provided buffer.
+template <uintptr_t log_level, uintptr_t module, uintptr_t flags>
+Result<ConstByteSpan> EncodeLogEntry(std::string_view message,
+ ByteSpan buffer) {
+ auto metadata = log_tokenized::Metadata::Set<log_level, module, flags, 0>();
+ return log::EncodeTokenizedLog(metadata,
+ std::as_bytes(std::span(message)),
+ /*ticks_since_epoch=*/0,
+ buffer);
+}
+
+Status EncodeFilterRule(const Filter::Rule& rule,
+ log::FilterRule::StreamEncoder& encoder) {
+ PW_TRY(
+ encoder.WriteLevelGreaterThanOrEqual(rule.level_greater_than_or_equal));
+ PW_TRY(encoder.WriteModuleEquals(rule.module_equals));
+ PW_TRY(encoder.WriteAnyFlagsSet(rule.any_flags_set));
+ return encoder.WriteAction(static_cast<log::FilterRule::Action>(rule.action));
+}
+
+Result<ConstByteSpan> EncodeFilter(const Filter& filter, ByteSpan buffer) {
+ log::Filter::MemoryEncoder encoder(buffer);
+ for (auto& rule : filter.rules()) {
+ log::FilterRule::StreamEncoder rule_encoder = encoder.GetRuleEncoder();
+ PW_TRY(EncodeFilterRule(rule, rule_encoder));
+ }
+ return ConstByteSpan(encoder);
+}
+
+void VerifyRule(const Filter::Rule& rule, const Filter::Rule& expected_rule) {
+ EXPECT_EQ(rule.level_greater_than_or_equal,
+ expected_rule.level_greater_than_or_equal);
+ EXPECT_EQ(rule.module_equals, expected_rule.module_equals);
+ EXPECT_EQ(rule.any_flags_set, expected_rule.any_flags_set);
+ EXPECT_EQ(rule.action, expected_rule.action);
+}
+
+TEST(FilterMap, RetrieveFiltersById) {
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1{
+ std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2{
+ std::byte(0xca), std::byte(0xfe), std::byte(0xc0), std::byte(0xc0)};
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3{
+ std::byte(0xfa), std::byte(0xfe), std::byte(0xf1), std::byte(0xf0)};
+ std::array<Filter, 3> filters = {
+ Filter(filter_id1, {}),
+ Filter(filter_id2, {}),
+ Filter(filter_id3, {}),
+ };
+
+ FilterMap filter_map(filters);
+
+ // Check that each filters() element points to the same object provided.
+ std::span<const Filter> filter_list = filter_map.filters();
+ ASSERT_EQ(filter_list.size(), filters.size());
+ size_t i = 0;
+ for (auto& filter : filter_list) {
+ EXPECT_EQ(&filter, &filters[i++]);
+ }
+
+ auto filter_result = filter_map.GetFilterFromId(filter_id3);
+ ASSERT_TRUE(filter_result.ok());
+ EXPECT_EQ(filter_result.value(), &filters[2]);
+
+ filter_result = filter_map.GetFilterFromId(filter_id2);
+ ASSERT_TRUE(filter_result.ok());
+ EXPECT_EQ(filter_result.value(), &filters[1]);
+
+ filter_result = filter_map.GetFilterFromId(filter_id1);
+ ASSERT_TRUE(filter_result.ok());
+ EXPECT_EQ(filter_result.value(), &filters[0]);
+
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> invalid_id{
+ std::byte(0xd0), std::byte(0x1c), std::byte(0xe7), std::byte(0xea)};
+ filter_result = filter_map.GetFilterFromId(invalid_id);
+ ASSERT_EQ(filter_result.status(), Status::NotFound());
+}
+
+TEST(Filter, UpdateFilterRules) {
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+ std::byte(0xba), std::byte(0x1d), std::byte(0xba), std::byte(0xb1)};
+ std::array<Filter::Rule, 4> rules;
+ const std::array<Filter::Rule, 4> new_rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+ .any_flags_set = 0x0f,
+ .module_equals{std::byte(123)},
+ },
+ {
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xef,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = 0x1234,
+ .module_equals{std::byte(99)},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals{std::byte(4)},
+ },
+ }};
+
+ Filter filter(filter_id, rules);
+ const Filter new_filter(filter_id,
+ const_cast<std::array<Filter::Rule, 4>&>(new_rules));
+ std::byte buffer[256];
+ auto encode_result = EncodeFilter(new_filter, buffer);
+ ASSERT_EQ(encode_result.status(), OkStatus());
+ EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+
+ size_t i = 0;
+ for (const auto& rule : filter.rules()) {
+ VerifyRule(rule, new_rules[i++]);
+ }
+
+ // A new filter with no rules should clear filter.
+ const Filter empty_filter(filter_id, {});
+ std::memset(buffer, 0, sizeof(buffer));
+ encode_result = EncodeFilter(empty_filter, buffer);
+ ASSERT_EQ(encode_result.status(), OkStatus());
+ EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+ const Filter::Rule empty_rule{
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals{},
+ };
+ for (const auto& rule : filter.rules()) {
+ VerifyRule(rule, empty_rule);
+ }
+ EXPECT_TRUE(empty_filter.rules().empty());
+
+ // Passing a new filter with less rules.
+ const std::array<Filter::Rule, 2> few_rules{{
+ {
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xef,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = 0x1234,
+ .module_equals{std::byte(99)},
+ },
+ }};
+ const Filter filter_few_rules(
+ filter_id, const_cast<std::array<Filter::Rule, 2>&>(few_rules));
+ std::memset(buffer, 0, sizeof(buffer));
+ encode_result = EncodeFilter(filter_few_rules, buffer);
+ ASSERT_EQ(encode_result.status(), OkStatus());
+ EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+ i = 0;
+ for (const auto& rule : filter.rules()) {
+ if (i >= few_rules.size()) {
+ VerifyRule(rule, empty_rule);
+ } else {
+ VerifyRule(rule, few_rules[i++]);
+ }
+ }
+
+ // Passing a new filter with extra rules.
+ const std::array<Filter::Rule, 6> extra_rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+ .any_flags_set = 0x0f,
+ .module_equals{std::byte(123)},
+ },
+ {
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xef,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xef,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = 0x1234,
+ .module_equals{std::byte(99)},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals{std::byte(4)},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = 0x1234,
+ .module_equals{std::byte(99)},
+ },
+ }};
+ const Filter filter_extra_rules(
+ filter_id, const_cast<std::array<Filter::Rule, 6>&>(extra_rules));
+ std::memset(buffer, 0, sizeof(buffer));
+ encode_result = EncodeFilter(filter_extra_rules, buffer);
+ ASSERT_EQ(encode_result.status(), OkStatus());
+ EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+ i = 0;
+ for (const auto& rule : filter.rules()) {
+ VerifyRule(rule, extra_rules[i++]);
+ }
+
+ // A filter with no rules buffer cannot get rules updated.
+ Filter filter_no_rules(filter_id, {});
+ EXPECT_EQ(filter_no_rules.UpdateRulesFromProto(encode_result.value()),
+ Status::FailedPrecondition());
+}
+
+TEST(FilterTest, FilterLogsRuleDefaultDrop) {
+ const std::array<Filter::Rule, 2> rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = kSampleFlags,
+ .module_equals{kSampleModuleLittleEndian.begin(),
+ kSampleModuleLittleEndian.end()},
+ },
+ // This rule catches all logs.
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals = {},
+ },
+ }};
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+ std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+ const Filter filter(filter_id,
+ const_cast<std::array<Filter::Rule, 2>&>(rules));
+
+ std::array<std::byte, 50> buffer;
+ const Result<ConstByteSpan> log_entry_info =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_debug =
+ EncodeLogEntry<PW_LOG_LEVEL_DEBUG, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_debug.status(), OkStatus());
+ EXPECT_TRUE(filter.ShouldDropLog(log_entry_debug.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_warn =
+ EncodeLogEntry<PW_LOG_LEVEL_WARN, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_warn.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_warn.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_error =
+ EncodeLogEntry<PW_LOG_LEVEL_ERROR, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_error.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_error.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_info_different =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, 0, 0>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info_different.status(), OkStatus());
+ EXPECT_TRUE(filter.ShouldDropLog(log_entry_info_different.value()));
+ // Because the last rule catches all logs, the filter default action is not
+ // applied.
+ const Filter filter_default_drop(
+ filter_id, const_cast<std::array<Filter::Rule, 2>&>(rules));
+ EXPECT_TRUE(
+ filter_default_drop.ShouldDropLog(log_entry_info_different.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_same_flags =
+ EncodeLogEntry<0, 0, kSampleFlags>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_same_flags.status(), OkStatus());
+ EXPECT_TRUE(filter.ShouldDropLog(log_entry_same_flags.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_same_module =
+ EncodeLogEntry<0, kSampleModule, 0>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_same_module.status(), OkStatus());
+ EXPECT_TRUE(filter.ShouldDropLog(log_entry_same_module.value()));
+}
+
+TEST(FilterTest, FilterLogsKeepLogsWhenNoRuleMatches) {
+ // There is no rule that catches all logs.
+ const std::array<Filter::Rule, 1> rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = kSampleFlags,
+ .module_equals = {kSampleModuleLittleEndian.begin(),
+ kSampleModuleLittleEndian.end()},
+ },
+ }};
+
+ // Filters should not share rules if they are mutable, to avoid race
+ // conditions.
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+ std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+ const Filter filter(filter_id,
+ const_cast<std::array<Filter::Rule, 1>&>(rules));
+
+ std::array<std::byte, 50> buffer;
+ const Result<ConstByteSpan> log_entry_info =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_debug =
+ EncodeLogEntry<PW_LOG_LEVEL_DEBUG, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_debug.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_debug.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_warn =
+ EncodeLogEntry<PW_LOG_LEVEL_WARN, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_warn.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_warn.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_error =
+ EncodeLogEntry<PW_LOG_LEVEL_ERROR, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_error.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_error.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_info_different =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, 0, 0>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info_different.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_info_different.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_same_flags =
+ EncodeLogEntry<0, 0, kSampleFlags>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_same_flags.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_flags.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_same_module =
+ EncodeLogEntry<0, kSampleModule, 0>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_same_module.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_module.value()));
+}
+
+TEST(FilterTest, FilterLogsKeepLogsWhenRulesEmpty) {
+ // Filters should not share rules if they are mutable, to avoid race
+ // conditions.
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+ std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+ const Filter filter(filter_id, {});
+
+ std::array<std::byte, 50> buffer;
+ const Result<ConstByteSpan> log_entry_info =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_debug =
+ EncodeLogEntry<PW_LOG_LEVEL_DEBUG, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_debug.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_debug.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_warn =
+ EncodeLogEntry<PW_LOG_LEVEL_WARN, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_warn.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_warn.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_error =
+ EncodeLogEntry<PW_LOG_LEVEL_ERROR, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_error.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_error.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_info_different =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, 0, 0>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info_different.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_info_different.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_same_flags =
+ EncodeLogEntry<0, 0, kSampleFlags>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_same_flags.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_flags.value()));
+
+ buffer.fill(std::byte(0));
+ const Result<ConstByteSpan> log_entry_same_module =
+ EncodeLogEntry<0, kSampleModule, 0>(kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_same_module.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_module.value()));
+}
+
+TEST(FilterTest, FilterLogsFirstRuleWins) {
+ const std::array<Filter::Rule, 2> rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = kSampleFlags,
+ .module_equals = {kSampleModuleLittleEndian.begin(),
+ kSampleModuleLittleEndian.end()},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = kSampleFlags,
+ .module_equals = {kSampleModuleLittleEndian.begin(),
+ kSampleModuleLittleEndian.end()},
+ },
+ }};
+ const std::array<Filter::Rule, 2> rules_reversed{{
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = kSampleFlags,
+ .module_equals = {kSampleModuleLittleEndian.begin(),
+ kSampleModuleLittleEndian.end()},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = kSampleFlags,
+ .module_equals = {kSampleModuleLittleEndian.begin(),
+ kSampleModuleLittleEndian.end()},
+ },
+ }};
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1{
+ std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+ const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2{
+ std::byte(0), std::byte(0), std::byte(0), std::byte(2)};
+ const Filter filter(filter_id1,
+ const_cast<std::array<Filter::Rule, 2>&>(rules));
+ const Filter filter_reverse_rules(
+ filter_id2, const_cast<std::array<Filter::Rule, 2>&>(rules_reversed));
+
+ std::array<std::byte, 50> buffer;
+ const Result<ConstByteSpan> log_entry_info =
+ EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+ kSampleMessage, buffer);
+ ASSERT_EQ(log_entry_info.status(), OkStatus());
+ EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+ EXPECT_TRUE(filter_reverse_rules.ShouldDropLog(log_entry_info.value()));
+}
+
+} // namespace
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_service.cc b/pw_log_rpc/log_service.cc
index 1da5471..eef904c 100644
--- a/pw_log_rpc/log_service.cc
+++ b/pw_log_rpc/log_service.cc
@@ -16,6 +16,8 @@
#include "pw_log/log.h"
#include "pw_log/proto/log.pwpb.h"
+#include "pw_log_rpc/log_filter.h"
+#include "pw_protobuf/decoder.h"
namespace pw::log_rpc {
@@ -34,4 +36,83 @@
}
}
+StatusWithSize LogService::SetFilter(ServerContext&,
+ ConstByteSpan request,
+ ByteSpan) {
+ if (filters_ == nullptr) {
+ return StatusWithSize::NotFound();
+ }
+
+ protobuf::Decoder decoder(request);
+ PW_TRY_WITH_SIZE(decoder.Next());
+ if (static_cast<log::SetFilterRequest::Fields>(decoder.FieldNumber()) !=
+ log::SetFilterRequest::Fields::FILTER_ID) {
+ return StatusWithSize::InvalidArgument();
+ }
+ ConstByteSpan filter_id;
+ PW_TRY_WITH_SIZE(decoder.ReadBytes(&filter_id));
+ Result<Filter*> filter = filters_->GetFilterFromId(filter_id);
+ if (!filter.ok()) {
+ return StatusWithSize::NotFound();
+ }
+
+ PW_TRY_WITH_SIZE(decoder.Next());
+ ConstByteSpan filter_buffer;
+ if (static_cast<log::SetFilterRequest::Fields>(decoder.FieldNumber()) !=
+ log::SetFilterRequest::Fields::FILTER) {
+ return StatusWithSize::InvalidArgument();
+ }
+ PW_TRY_WITH_SIZE(decoder.ReadBytes(&filter_buffer));
+ PW_TRY_WITH_SIZE(filter.value()->UpdateRulesFromProto(filter_buffer));
+ return StatusWithSize();
+}
+
+StatusWithSize LogService::GetFilter(ServerContext&,
+ ConstByteSpan request,
+ ByteSpan response) {
+ if (filters_ == nullptr) {
+ return StatusWithSize::NotFound();
+ }
+ protobuf::Decoder decoder(request);
+ PW_TRY_WITH_SIZE(decoder.Next());
+ if (static_cast<log::GetFilterRequest::Fields>(decoder.FieldNumber()) !=
+ log::GetFilterRequest::Fields::FILTER_ID) {
+ return StatusWithSize::InvalidArgument();
+ }
+ ConstByteSpan filter_id;
+ PW_TRY_WITH_SIZE(decoder.ReadBytes(&filter_id));
+ Result<Filter*> filter = filters_->GetFilterFromId(filter_id);
+ if (!filter.ok()) {
+ return StatusWithSize::NotFound();
+ }
+
+ log::Filter::MemoryEncoder encoder(response);
+ for (auto& rule : (*filter)->rules()) {
+ log::FilterRule::StreamEncoder rule_encoder = encoder.GetRuleEncoder();
+ rule_encoder.WriteLevelGreaterThanOrEqual(rule.level_greater_than_or_equal)
+ .IgnoreError();
+ rule_encoder.WriteModuleEquals(rule.module_equals).IgnoreError();
+ rule_encoder.WriteAnyFlagsSet(rule.any_flags_set).IgnoreError();
+ rule_encoder.WriteAction(static_cast<log::FilterRule::Action>(rule.action))
+ .IgnoreError();
+ PW_TRY_WITH_SIZE(rule_encoder.status());
+ }
+ PW_TRY_WITH_SIZE(encoder.status());
+
+ return StatusWithSize(encoder.size());
+}
+
+StatusWithSize LogService::ListFilterIds(ServerContext&,
+ ConstByteSpan,
+ ByteSpan response) {
+ if (filters_ == nullptr) {
+ return StatusWithSize::NotFound();
+ }
+ log::FilterIdListResponse::MemoryEncoder encoder(response);
+ for (auto& filter : filters_->filters()) {
+ PW_TRY_WITH_SIZE(encoder.WriteFilterId(filter.id()));
+ }
+ return StatusWithSize(encoder.size());
+}
+
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 992ee4d..e2e7db9 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -25,6 +25,7 @@
#include "pw_log/log.h"
#include "pw_log/proto/log.pwpb.h"
#include "pw_log/proto_utils.h"
+#include "pw_log_rpc/log_filter.h"
#include "pw_log_tokenized/metadata.h"
#include "pw_protobuf/bytes_utils.h"
#include "pw_protobuf/decoder.h"
@@ -69,7 +70,10 @@
// add to the multisink, and which drain to use.
class LogServiceTest : public ::testing::Test {
public:
- LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
+ LogServiceTest()
+ : multisink_(multisink_buffer_),
+ drain_map_(drains_),
+ filter_map_(filters_) {
for (auto& drain : drain_map_.drains()) {
multisink_.AttachDrain(drain);
}
@@ -102,6 +106,22 @@
multisink::MultiSink multisink_;
RpcLogDrainMap drain_map_;
std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_;
+ FilterMap filter_map_;
+ static constexpr size_t kMaxFilterRules = 4;
+ std::array<Filter::Rule, kMaxFilterRules> rules1_;
+ std::array<Filter::Rule, kMaxFilterRules> rules2_;
+ std::array<Filter::Rule, kMaxFilterRules> rules3_;
+ static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
+ std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
+ static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
+ std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
+ static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
+ std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
+ std::array<Filter, kMaxDrains> filters_ = {
+ Filter(filter_id1_, rules1_),
+ Filter(filter_id2_, rules2_),
+ Filter(filter_id3_, rules3_),
+ };
// Drain Buffers
std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
@@ -115,16 +135,18 @@
RpcLogDrain(kIgnoreWriterErrorsDrainId,
drain_buffer1_,
shared_mutex_,
- RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
- RpcLogDrain(
- kCloseWriterOnErrorDrainId,
- drain_buffer2_,
- shared_mutex_,
- RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+ RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
+ &filters_[0]),
+ RpcLogDrain(kCloseWriterOnErrorDrainId,
+ drain_buffer2_,
+ shared_mutex_,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ &filters_[1]),
RpcLogDrain(kSmallBufferDrainId,
small_buffer_,
shared_mutex_,
- RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+ RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
+ &filters_[2]),
};
};
struct TestLogEntry {
@@ -231,7 +253,7 @@
// Create context directed to drain with ID 1.
RpcLogDrain& active_drain = drains_[0];
const uint32_t drain_channel_id = active_drain.channel_id();
- LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
context.set_channel_id(drain_channel_id);
// Call RPC, which sets the drain's writer.
@@ -247,7 +269,7 @@
// Calling an ongoing log stream must not change the active drain's
// writer, and the second writer must not get any responses.
- LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_, &filter_map_);
second_call_context.set_channel_id(drain_channel_id);
second_call_context.call(rpc_request_buffer);
EXPECT_EQ(active_drain.Flush(), OkStatus());
@@ -256,19 +278,19 @@
// Setting a new writer on a closed stream is allowed.
ASSERT_EQ(active_drain.Close(), OkStatus());
- LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_, &filter_map_);
third_call_context.set_channel_id(drain_channel_id);
third_call_context.call(rpc_request_buffer);
EXPECT_EQ(active_drain.Flush(), OkStatus());
ASSERT_FALSE(third_call_context.done());
- EXPECT_EQ(third_call_context.responses().size(), 1u);
+ EXPECT_EQ(third_call_context.responses().size(), 0u);
EXPECT_EQ(active_drain.Close(), OkStatus());
}
TEST_F(LogServiceTest, StartAndEndStream) {
RpcLogDrain& active_drain = drains_[2];
const uint32_t drain_channel_id = active_drain.channel_id();
- LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
context.set_channel_id(drain_channel_id);
// Add log entries.
@@ -306,7 +328,7 @@
TEST_F(LogServiceTest, HandleDropped) {
RpcLogDrain& active_drain = drains_[0];
const uint32_t drain_channel_id = active_drain.channel_id();
- LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
context.set_channel_id(drain_channel_id);
// Add log entries.
@@ -344,7 +366,7 @@
}
TEST_F(LogServiceTest, HandleSmallBuffer) {
- LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
context.set_channel_id(kSmallBufferDrainId);
auto small_buffer_drain =
drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
@@ -378,7 +400,7 @@
TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
auto& detached_drain = drains_[0];
multisink_.DetachDrain(detached_drain);
- LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
context.set_channel_id(detached_drain.channel_id());
// Add log entries.
@@ -423,7 +445,7 @@
// Start log stream.
RpcLogDrain& active_drain = drains_[0];
const uint32_t drain_channel_id = active_drain.channel_id();
- LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
context.set_channel_id(drain_channel_id);
context.call(rpc_request_buffer);
ASSERT_EQ(active_drain.Flush(), OkStatus());
@@ -445,7 +467,7 @@
auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
ASSERT_TRUE(drain.ok());
- LogService log_service(drain_map_);
+ LogService log_service(drain_map_, &filter_map_);
const size_t output_buffer_size = 128;
const size_t max_packets = 10;
rpc::RawFakeChannelOutput<10, output_buffer_size, 512> output;
@@ -533,7 +555,7 @@
auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
ASSERT_TRUE(drain.ok());
- LogService log_service(drain_map_);
+ LogService log_service(drain_map_, &filter_map_);
const size_t output_buffer_size = 128;
const size_t max_packets = 20;
rpc::RawFakeChannelOutput<max_packets, output_buffer_size, 512> output;
@@ -607,5 +629,414 @@
EXPECT_TRUE(output.done());
}
+TEST_F(LogServiceTest, GetFilterIds) {
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, ListFilterIds, 1, 128)
+ context(drain_map_, &filter_map_);
+ context.call({});
+ ASSERT_TRUE(context.done());
+ ASSERT_EQ(context.responses().size(), 1u);
+ protobuf::Decoder decoder(context.responses()[0]);
+
+ for (const auto& filter : filter_map_.filters()) {
+ ASSERT_EQ(decoder.Next(), OkStatus());
+ ASSERT_EQ(decoder.FieldNumber(), 1u); // filter_id
+ ConstByteSpan filter_id;
+ ASSERT_EQ(decoder.ReadBytes(&filter_id), OkStatus());
+ ASSERT_EQ(filter_id.size(), filter.id().size());
+ EXPECT_EQ(
+ std::memcmp(filter_id.data(), filter.id().data(), filter_id.size()), 0);
+ }
+ EXPECT_FALSE(decoder.Next().ok());
+
+ // No IDs reported when none registered in the filter map.
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, ListFilterIds, 1, 128)
+ no_filter_context(drain_map_, nullptr);
+ no_filter_context.call({});
+ ASSERT_TRUE(no_filter_context.done());
+ ASSERT_EQ(no_filter_context.responses().size(), 1u);
+ protobuf::Decoder no_filter_decoder(no_filter_context.responses()[0]);
+ uint32_t filter_count = 0;
+ while (no_filter_decoder.Next().ok()) {
+ EXPECT_EQ(no_filter_decoder.FieldNumber(), 1u); // filter_id
+ ++filter_count;
+ }
+ EXPECT_EQ(filter_count, 0u);
+}
+
+Status EncodeFilterRule(const Filter::Rule& rule,
+ log::FilterRule::StreamEncoder& encoder) {
+ PW_TRY(
+ encoder.WriteLevelGreaterThanOrEqual(rule.level_greater_than_or_equal));
+ PW_TRY(encoder.WriteModuleEquals(rule.module_equals));
+ PW_TRY(encoder.WriteAnyFlagsSet(rule.any_flags_set));
+ return encoder.WriteAction(static_cast<log::FilterRule::Action>(rule.action));
+}
+
+Status EncodeFilter(const Filter& filter, log::Filter::StreamEncoder& encoder) {
+ for (auto& rule : filter.rules()) {
+ log::FilterRule::StreamEncoder rule_encoder = encoder.GetRuleEncoder();
+ PW_TRY(EncodeFilterRule(rule, rule_encoder));
+ }
+ return OkStatus();
+}
+
+Result<ConstByteSpan> EncodeFilterRequest(const Filter& filter,
+ ByteSpan buffer) {
+ stream::MemoryWriter writer(buffer);
+ std::byte encode_buffer[256];
+ protobuf::StreamEncoder encoder(writer, encode_buffer);
+ PW_TRY(encoder.WriteBytes(
+ static_cast<uint32_t>(log::SetFilterRequest::Fields::FILTER_ID),
+ filter.id()));
+ {
+ log::Filter::StreamEncoder filter_encoder = encoder.GetNestedEncoder(
+ static_cast<uint32_t>(log::SetFilterRequest::Fields::FILTER));
+ PW_TRY(EncodeFilter(filter, filter_encoder));
+ } // Let the StreamEncoder destructor finalize the data.
+ return ConstByteSpan(writer.data(), writer.bytes_written());
+}
+
+void VerifyRule(const Filter::Rule& rule, const Filter::Rule& expected_rule) {
+ EXPECT_EQ(rule.level_greater_than_or_equal,
+ expected_rule.level_greater_than_or_equal);
+ EXPECT_EQ(rule.module_equals, expected_rule.module_equals);
+ EXPECT_EQ(rule.any_flags_set, expected_rule.any_flags_set);
+ EXPECT_EQ(rule.action, expected_rule.action);
+}
+
+TEST_F(LogServiceTest, SetFilterRules) {
+ const std::array<Filter::Rule, 4> new_rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+ .any_flags_set = 0x0f,
+ .module_equals{std::byte(123)},
+ },
+ {
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xef,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = 0x1234,
+ .module_equals{std::byte(99)},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals{std::byte(4)},
+ },
+ }};
+ const Filter new_filter(filters_[0].id(),
+ const_cast<std::array<Filter::Rule, 4>&>(new_rules));
+
+ std::byte request_buffer[512];
+ const auto request = EncodeFilterRequest(new_filter, request_buffer);
+ ASSERT_EQ(request.status(), OkStatus());
+
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+ context(drain_map_, &filter_map_);
+ context.call(request.value());
+
+ size_t i = 0;
+ for (const auto& rule : filters_[0].rules()) {
+ VerifyRule(rule, new_rules[i++]);
+ }
+}
+
+TEST_F(LogServiceTest, SetFilterRulesWhenUsedByDrain) {
+ const std::array<Filter::Rule, 4> new_filter_rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::CRITICAL_LEVEL,
+ .any_flags_set = 0xfd,
+ .module_equals{std::byte(543)},
+ },
+ {
+ .action = Filter::Rule::Action::kInactive,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xca,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = 0xabcd,
+ .module_equals{std::byte(9000)},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals{std::byte(123)},
+ },
+ }};
+ Filter& filter = filters_[0];
+ const Filter new_filter(
+ filter.id(), const_cast<std::array<Filter::Rule, 4>&>(new_filter_rules));
+
+ // Add callback to drain.
+ RpcLogDrain& drain = drains_[0];
+
+ std::byte request_buffer[256];
+ const auto request = EncodeFilterRequest(new_filter, request_buffer);
+ ASSERT_EQ(request.status(), OkStatus());
+
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+ context(drain_map_, &filter_map_);
+ context.set_channel_id(drain.channel_id());
+ context.call(request.value());
+
+ size_t i = 0;
+ for (const auto& rule : filter.rules()) {
+ VerifyRule(rule, new_filter_rules[i++]);
+ }
+
+ // A request for logs without a filter should not modify the filter.
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+ context_no_filter(drain_map_, &filter_map_);
+ context_no_filter.set_channel_id(drain.channel_id());
+ context_no_filter.call({});
+ i = 0;
+ for (const auto& rule : filter.rules()) {
+ VerifyRule(rule, new_filter_rules[i++]);
+ }
+
+ // A new request for logs with a new filter updates filter.
+ const std::array<Filter::Rule, 4> second_filter_rules{{
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+ .any_flags_set = 0xab,
+ .module_equals{},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0x11,
+ .module_equals{std::byte(34)},
+ },
+ {
+ .action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0xef,
+ .module_equals{std::byte(23)},
+ },
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0x0f,
+ .module_equals{},
+ },
+ }};
+ const Filter second_filter(
+ filter.id(),
+ const_cast<std::array<Filter::Rule, 4>&>(second_filter_rules));
+
+ std::memset(request_buffer, 0, sizeof(request_buffer));
+ const auto second_filter_request =
+ EncodeFilterRequest(second_filter, request_buffer);
+ ASSERT_EQ(second_filter_request.status(), OkStatus());
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+ context_new_filter(drain_map_, &filter_map_);
+ context_new_filter.set_channel_id(drain.channel_id());
+ context_new_filter.call(second_filter_request.value());
+
+ i = 0;
+ for (const auto& rule : filter.rules()) {
+ VerifyRule(rule, second_filter_rules[i++]);
+ }
+}
+
+TEST_F(LogServiceTest, FilterLogs) {
+ // Add a variety of logs.
+ const uint32_t module = 0xcafe;
+ const uint32_t flags = 0x02;
+ const uint32_t line_number = 100;
+ const auto debug_metadata = log_tokenized::Metadata::
+ Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
+ ASSERT_TRUE(AddLogEntry(kMessage, debug_metadata, kSampleTimestamp).ok());
+ const auto info_metadata = log_tokenized::Metadata::
+ Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
+ ASSERT_TRUE(AddLogEntry(kMessage, info_metadata, kSampleTimestamp).ok());
+ const auto warn_metadata = log_tokenized::Metadata::
+ Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
+ ASSERT_TRUE(AddLogEntry(kMessage, warn_metadata, kSampleTimestamp).ok());
+ const auto error_metadata = log_tokenized::Metadata::
+ Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
+ ASSERT_TRUE(AddLogEntry(kMessage, error_metadata, kSampleTimestamp).ok());
+ const auto different_flags_metadata = log_tokenized::Metadata::
+ Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
+ ASSERT_TRUE(
+ AddLogEntry(kMessage, different_flags_metadata, kSampleTimestamp).ok());
+ const auto different_module_metadata = log_tokenized::Metadata::
+ Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
+ ASSERT_TRUE(
+ AddLogEntry(kMessage, different_module_metadata, kSampleTimestamp).ok());
+
+ // Add messages to the stack in the reverse order they are sent.
+ Vector<TestLogEntry, 6> message_stack;
+ message_stack.push_back(
+ {.metadata = error_metadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+ message_stack.push_back(
+ {.metadata = warn_metadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+ message_stack.push_back(
+ {.metadata = info_metadata,
+ .timestamp = kSampleTimestamp,
+ .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+
+ // Create request with filter.
+ const auto module_little_endian =
+ bytes::CopyInOrder<uint32_t>(std::endian::little, module);
+ const std::array<Filter::Rule, 2> rules{{
+ {.action = Filter::Rule::Action::kKeep,
+ .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+ .any_flags_set = flags,
+ .module_equals{module_little_endian.begin(),
+ module_little_endian.end()}},
+ {
+ .action = Filter::Rule::Action::kDrop,
+ .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+ .any_flags_set = 0,
+ .module_equals{},
+ },
+ }};
+
+ RpcLogDrain& drain = drains_[1];
+ Filter& filter = filters_[1];
+ const Filter new_filter(filter.id(),
+ const_cast<std::array<Filter::Rule, 2>&>(rules));
+
+ // Set filter.
+ std::byte request_buffer[256];
+ const auto request = EncodeFilterRequest(new_filter, request_buffer);
+ ASSERT_EQ(request.status(), OkStatus());
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+ set_filter_context(drain_map_, &filter_map_);
+ set_filter_context.set_channel_id(drain.channel_id());
+ set_filter_context.call(request.value());
+
+ // Request logs.
+ LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
+ context.set_channel_id(drain.channel_id());
+ context.call(request.value());
+ ASSERT_EQ(drain.Flush(), OkStatus());
+
+ size_t entries_found = 0;
+ for (auto& response : context.responses()) {
+ protobuf::Decoder entry_decoder(response);
+ entries_found += VerifyLogEntries(entry_decoder, message_stack);
+ }
+ EXPECT_EQ(entries_found, 3u);
+}
+
+void VerifyFilterRule(protobuf::Decoder& decoder,
+ const Filter::Rule& expected_rule) {
+ ASSERT_TRUE(decoder.Next().ok());
+ ASSERT_EQ(decoder.FieldNumber(), 1u); // level_greater_than_or_equal
+ log::FilterRule::Level level_greater_than_or_equal;
+ ASSERT_EQ(decoder.ReadUint32(
+ reinterpret_cast<uint32_t*>(&level_greater_than_or_equal)),
+ OkStatus());
+ EXPECT_EQ(level_greater_than_or_equal,
+ expected_rule.level_greater_than_or_equal);
+
+ ASSERT_TRUE(decoder.Next().ok());
+ ASSERT_EQ(decoder.FieldNumber(), 2u); // module_equals
+ ConstByteSpan module_equals;
+ ASSERT_EQ(decoder.ReadBytes(&module_equals), OkStatus());
+ ASSERT_EQ(module_equals.size(), expected_rule.module_equals.size());
+ EXPECT_EQ(std::memcmp(module_equals.data(),
+ expected_rule.module_equals.data(),
+ module_equals.size()),
+ 0);
+
+ ASSERT_TRUE(decoder.Next().ok());
+ ASSERT_EQ(decoder.FieldNumber(), 3u); // any_flags_set
+ uint32_t any_flags_set;
+ ASSERT_EQ(decoder.ReadUint32(&any_flags_set), OkStatus());
+ EXPECT_EQ(any_flags_set, expected_rule.any_flags_set);
+
+ ASSERT_TRUE(decoder.Next().ok());
+ ASSERT_EQ(decoder.FieldNumber(), 4u); // action
+ Filter::Rule::Action action;
+ ASSERT_EQ(decoder.ReadUint32(reinterpret_cast<uint32_t*>(&action)),
+ OkStatus());
+ EXPECT_EQ(action, expected_rule.action);
+}
+
+void VerifyFilterRules(protobuf::Decoder& decoder,
+ std::span<const Filter::Rule> expected_rules) {
+ size_t rules_found = 0;
+ while (decoder.Next().ok()) {
+ ConstByteSpan rule;
+ EXPECT_TRUE(decoder.ReadBytes(&rule).ok());
+ protobuf::Decoder rule_decoder(rule);
+ if (rules_found >= expected_rules.size()) {
+ break;
+ }
+ VerifyFilterRule(rule_decoder, expected_rules[rules_found]);
+ ++rules_found;
+ }
+ EXPECT_EQ(rules_found, expected_rules.size());
+}
+
+TEST_F(LogServiceTest, GetFilterRules) {
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, GetFilter, 1, 128)
+ context(drain_map_, &filter_map_);
+
+ std::byte request_buffer[64];
+ log::GetFilterRequest::MemoryEncoder encoder(request_buffer);
+ encoder.WriteFilterId(filter_id1_);
+ const auto request = ConstByteSpan(encoder);
+ context.call(request);
+ ASSERT_TRUE(context.done());
+ ASSERT_EQ(context.responses().size(), 1u);
+
+ // Verify against empty rules.
+ protobuf::Decoder decoder(context.responses()[0]);
+ VerifyFilterRules(decoder, rules1_);
+
+ // Partially populate rules.
+ rules1_[0].action = Filter::Rule::Action::kKeep;
+ rules1_[0].level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL;
+ rules1_[0].any_flags_set = 0xab;
+ const std::array<std::byte, 2> module1{std::byte(123), std::byte(0xab)};
+ rules1_[0].module_equals.assign(module1.begin(), module1.end());
+ rules1_[1].action = Filter::Rule::Action::kDrop;
+ rules1_[1].level_greater_than_or_equal = log::FilterRule::Level::ERROR_LEVEL;
+ rules1_[1].any_flags_set = 0;
+
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, GetFilter, 1, 128)
+ context2(drain_map_, &filter_map_);
+ context2.call(request);
+ ASSERT_EQ(context2.responses().size(), 1u);
+ protobuf::Decoder decoder2(context2.responses()[0]);
+ VerifyFilterRules(decoder2, rules1_);
+
+ // Modify the rest of the filter rules.
+ rules1_[2].action = Filter::Rule::Action::kKeep;
+ rules1_[2].level_greater_than_or_equal = log::FilterRule::Level::FATAL_LEVEL;
+ rules1_[2].any_flags_set = 0xcd;
+ const std::array<std::byte, 2> module2{std::byte(1), std::byte(2)};
+ rules1_[2].module_equals.assign(module2.begin(), module2.end());
+ rules1_[3].action = Filter::Rule::Action::kInactive;
+
+ PW_RAW_TEST_METHOD_CONTEXT(LogService, GetFilter, 1, 128)
+ context3(drain_map_, &filter_map_);
+ context3.call(request);
+ ASSERT_EQ(context3.responses().size(), 1u);
+ protobuf::Decoder decoder3(context3.responses()[0]);
+ VerifyFilterRules(decoder3, rules1_);
+}
+
} // namespace
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/internal/config.h b/pw_log_rpc/public/pw_log_rpc/internal/config.h
new file mode 100644
index 0000000..938ebc2
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/internal/config.h
@@ -0,0 +1,40 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+// Configuration macros for the pw_rpc module.
+#pragma once
+
+// Log filter modules are optionally tokenized, and thus their backing on-device
+// container can have different sizes. A token may be represented by a 32-bit
+// integer (though it is usually 2 bytes). Default the max module name size to
+// 4 bytes.
+#ifndef PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE
+#define PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE 4
+#endif // PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE
+
+// Log filter IDs are optionally tokenized, and thus their backing on-device
+// container can have different sizes. A token may be represented by a 32-bit
+// integer (though it is usually 2 bytes). Default the max module name size to
+// 4 bytes.
+#ifndef PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE
+#define PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE 4
+#endif // PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE
+
+namespace pw::log_rpc::cfg {
+inline constexpr size_t kMaxModuleNameBytes =
+ PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE;
+
+inline constexpr size_t kMaxFilterIdBytes =
+ PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE;
+} // namespace pw::log_rpc::cfg
diff --git a/pw_log_rpc/public/pw_log_rpc/log_filter.h b/pw_log_rpc/public/pw_log_rpc/log_filter.h
new file mode 100644
index 0000000..56e01f7
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/log_filter.h
@@ -0,0 +1,88 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstring>
+#include <span>
+
+#include "pw_assert/assert.h"
+#include "pw_bytes/span.h"
+#include "pw_containers/vector.h"
+#include "pw_log/proto/log.pwpb.h"
+#include "pw_log_rpc/internal/config.h"
+#include "pw_status/status.h"
+
+namespace pw::log_rpc {
+
+// A Filter is a collection of rules used to check if a log entry can be kept
+// or dropped wherever the filter is placed in the log path.
+class Filter {
+ public:
+ struct Rule {
+ // Action to perform if the rule is met.
+ enum class Action {
+ kInactive = 0, // Ignore this rule.
+ kKeep = 1,
+ kDrop = 2,
+ };
+ Action action = Action::kInactive;
+
+ // Checks if the log level is greater or equal to this value when it
+ // does not equal NOT_SET.
+ log::FilterRule::Level level_greater_than_or_equal =
+ log::FilterRule::Level::ANY_LEVEL;
+
+ // Checks if the log entry has any flag is set when it doesn't equal 0.
+ uint32_t any_flags_set = 0;
+
+ // Checks if the log entry module equals this value when not empty.
+ Vector<std::byte, cfg::kMaxModuleNameBytes> module_equals{};
+ };
+
+ Filter(std::span<const std::byte> id, std::span<Rule> rules) : rules_(rules) {
+ PW_ASSERT(!id.empty());
+ id_.assign(id.begin(), id.end());
+ }
+
+ // Not copyable.
+ Filter(const Filter&) = delete;
+ Filter& operator=(const Filter&) = delete;
+
+ ConstByteSpan id() const { return ConstByteSpan(id_.data(), id_.size()); }
+ std::span<const Rule> rules() const { return rules_; }
+
+ // Verifies a log entry against the filter's rules in the order they were
+ // provided, stopping at the first rule that matches.
+ // Returns true when the log should be dropped, false otherwise. Defaults to
+ // false if there are no rules, or no rules were matched.
+ bool ShouldDropLog(ConstByteSpan entry) const;
+
+ // Decodes and updates the filter's rules given a buffer with a proto-encoded
+ // log::Filter message. If there are more rules than this filter can hold, the
+ // extra rules are discarded.
+ //
+ // Return values:
+ // OK - rules were updated successfully.
+ // FAILED_PRECONDITION - the provided buffer is empty.
+ // Forwarded errors from protobuff::decoder.
+ Status UpdateRulesFromProto(ConstByteSpan buffer);
+
+ private:
+ Vector<std::byte, cfg::kMaxFilterIdBytes> id_;
+ std::span<Rule> rules_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_filter_map.h b/pw_log_rpc/public/pw_log_rpc/log_filter_map.h
new file mode 100644
index 0000000..5a1be6e
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/log_filter_map.h
@@ -0,0 +1,53 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include <cstring>
+#include <span>
+
+#include "pw_bytes/span.h"
+#include "pw_log_rpc/log_filter.h"
+#include "pw_result/result.h"
+
+namespace pw::log_rpc {
+
+// Holds an inmutable Filter map, ordered by filter ID to facilitate set up.
+class FilterMap {
+ public:
+ explicit constexpr FilterMap(std::span<Filter> filters) : filters_(filters) {}
+
+ // Not copyable nor movable.
+ FilterMap(FilterMap const&) = delete;
+ FilterMap& operator=(FilterMap const&) = delete;
+ FilterMap(FilterMap&&) = delete;
+ FilterMap& operator=(FilterMap&&) = delete;
+
+ Result<Filter*> GetFilterFromId(ConstByteSpan id) const {
+ for (auto& filter : filters_) {
+ if (id.size() == filter.id().size() &&
+ std::memcmp(id.data(), filter.id().data(), id.size()) == 0) {
+ return &filter;
+ }
+ }
+ return Status::NotFound();
+ }
+
+ const std::span<Filter>& filters() const { return filters_; }
+
+ protected:
+ const std::span<Filter> filters_;
+};
+
+} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_service.h b/pw_log_rpc/public/pw_log_rpc/log_service.h
index bd341d1..50555a7 100644
--- a/pw_log_rpc/public/pw_log_rpc/log_service.h
+++ b/pw_log_rpc/public/pw_log_rpc/log_service.h
@@ -15,7 +15,9 @@
#pragma once
#include "pw_log/proto/log.raw_rpc.pb.h"
+#include "pw_log_rpc/log_filter_map.h"
#include "pw_log_rpc/rpc_log_drain_map.h"
+#include "pw_status/status.h"
namespace pw::log_rpc {
@@ -24,7 +26,8 @@
// and delegated outside the service.
class LogService final : public log::generated::Logs<LogService> {
public:
- LogService(RpcLogDrainMap& drains) : drains_(drains) {}
+ LogService(RpcLogDrainMap& drains, FilterMap* filters = nullptr)
+ : drains_(drains), filters_(filters) {}
// Starts listening to logs on the given RPC channel and writer. The call is
// ignored if the channel was not pre-registered in the drain map. If there is
@@ -33,8 +36,24 @@
// stream using the previous writer continues.
void Listen(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer);
+ // TODO(pwbug/570): make log filter be its own service.
+ // Modifies a log filter and its rules. The filter must be registered in the
+ // provided filter map.
+ StatusWithSize SetFilter(ServerContext&, ConstByteSpan request, ByteSpan);
+
+ // Retrieves a log filter and its rules. The filter must be registered in the
+ // provided filter map.
+ StatusWithSize GetFilter(ServerContext&,
+ ConstByteSpan request,
+ ByteSpan response);
+
+ StatusWithSize ListFilterIds(ServerContext&,
+ ConstByteSpan,
+ ByteSpan response);
+
private:
RpcLogDrainMap& drains_;
+ FilterMap* filters_;
};
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
index 87071d7..eb59092 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -20,6 +20,7 @@
#include "pw_assert/assert.h"
#include "pw_bytes/span.h"
#include "pw_log/proto/log.pwpb.h"
+#include "pw_log_rpc/log_filter.h"
#include "pw_multisink/multisink.h"
#include "pw_protobuf/serialized_size.h"
#include "pw_result/result.h"
@@ -37,11 +38,12 @@
// Flush(), which, on every call, packs as many log::LogEntry items as possible
// into a log::LogEntries message, writes the message to the provided writer,
// then repeats the process until there are no more entries in the MultiSink or
-// the writer failed to write the outgoing package, in which case the RPC on
-// the writer is closed. When error_handling is `kIgnoreWriterErrors` the drain
-// will continue to retrieve log entries out of the MultiSink and attempt to
-// send them out ignoring the writer errors without sending a drop count.
-// Note: this behavior might change or be removed in the future.
+// the writer failed to write the outgoing package and error_handling is set to
+// `kCloseStreamOnWriterError`. When error_handling is `kIgnoreWriterErrors` the
+// drain will continue to retrieve log entries out of the MultiSink and attempt
+// to send them out ignoring the writer errors without sending a drop count.
+// Note: the error handling and drop count reporting might change in the future.
+// Log filtering is done using the rules of the Filter provided if any.
class RpcLogDrain : public multisink::MultiSink::Drain {
public:
// Dictates how to handle server writer errors.
@@ -80,42 +82,23 @@
protobuf::SizeOfFieldKey(1) // LogEntry
+ protobuf::kMaxSizeOfLength;
- // Creates a log stream with the provided open writer. Useful for streaming
- // logs without a request.
- // The provided buffer must be large enough to hold the largest transmittable
- // log::LogEntry or a drop count message at the very least. The user can
- // choose to provide a unique mutex for the drain, or share it to save RAM as
- // long as they are aware of contengency issues.
- RpcLogDrain(uint32_t channel_id,
- ByteSpan log_entry_buffer,
- rpc::RawServerWriter writer,
- sync::Mutex& mutex,
- LogDrainErrorHandling error_handling)
- : channel_id_(channel_id),
- error_handling_(error_handling),
- server_writer_(std::move(writer)),
- log_entry_buffer_(log_entry_buffer),
- committed_entry_drop_count_(0),
- mutex_(mutex) {
- PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
- PW_ASSERT(writer.active());
- }
-
// Creates a closed log stream with a writer that can be set at a later time.
// The provided buffer must be large enough to hold the largest transmittable
// log::LogEntry or a drop count message at the very least. The user can
// choose to provide a unique mutex for the drain, or share it to save RAM as
// long as they are aware of contengency issues.
- RpcLogDrain(uint32_t channel_id,
+ RpcLogDrain(const uint32_t channel_id,
ByteSpan log_entry_buffer,
sync::Mutex& mutex,
- LogDrainErrorHandling error_handling)
+ LogDrainErrorHandling error_handling,
+ Filter* filter = nullptr)
: channel_id_(channel_id),
error_handling_(error_handling),
server_writer_(),
log_entry_buffer_(log_entry_buffer),
committed_entry_drop_count_(0),
- mutex_(mutex) {
+ mutex_(mutex),
+ filter_(filter) {
PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
}
@@ -173,6 +156,7 @@
const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_);
sync::Mutex& mutex_;
+ Filter* filter_;
};
} // namespace pw::log_rpc
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index fe8c025..2ba6330 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -56,6 +56,10 @@
log::LogEntries::MemoryEncoder encoder(server_writer_.PayloadBuffer());
uint32_t packed_entry_count = 0;
log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
+ // Avoid sending empty packets.
+ if (encoder.size() == 0) {
+ continue;
+ }
if (const Status status = server_writer_.Write(encoder); !status.ok()) {
if (error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
// Only update this drop count when writer errors are not ignored.
@@ -106,6 +110,14 @@
// At this point all expected error modes have been handled.
PW_CHECK_OK(possible_entry.status());
+ // TODO(pwbug/559): avoid sending multiple drop counts between filtered out
+ // log entries.
+ if (filter_ != nullptr &&
+ filter_->ShouldDropLog(possible_entry.value().entry())) {
+ PW_CHECK_OK(PopEntry(possible_entry.value()));
+ return LogDrainState::kMoreEntriesRemaining;
+ }
+
// Check if the entry fits in encoder buffer.
const size_t encoded_entry_size =
possible_entry.value().entry().size() + kLogEntryEncodeFrameSize;
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index aec1617..3d8690d 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -19,6 +19,7 @@
#include <span>
#include "gtest/gtest.h"
+#include "pw_log_rpc/log_filter.h"
#include "pw_log_rpc/log_service.h"
#include "pw_log_rpc/rpc_log_drain_map.h"
#include "pw_multisink/multisink.h"
@@ -42,7 +43,8 @@
drain_id,
buffer,
mutex,
- RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError);
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ nullptr);
EXPECT_EQ(drain.channel_id(), drain_id);
// Attach drain to a MultiSink.
@@ -62,20 +64,21 @@
sync::Mutex mutex;
std::array<std::array<std::byte, kBufferSize>, kMaxDrains> buffers;
std::array<RpcLogDrain, kMaxDrains> drains{
- RpcLogDrain(
- 0,
- buffers[0],
- mutex,
- RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
- RpcLogDrain(
- 1,
- buffers[1],
- mutex,
- RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+ RpcLogDrain(0,
+ buffers[0],
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ nullptr),
+ RpcLogDrain(1,
+ buffers[1],
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ nullptr),
RpcLogDrain(2,
buffers[2],
mutex,
- RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+ RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
+ nullptr),
};
RpcLogDrainMap drain_map(drains);
@@ -96,14 +99,14 @@
std::array<std::byte, kBufferSize> buffer;
sync::Mutex mutex;
std::array<RpcLogDrain, 1> drains{
- RpcLogDrain(
- drain_id,
- buffer,
- mutex,
- RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+ RpcLogDrain(drain_id,
+ buffer,
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ nullptr),
};
RpcLogDrainMap drain_map(drains);
- LogService log_service(drain_map);
+ LogService log_service(drain_map, nullptr);
rpc::RawFakeChannelOutput<3, 128> output;
rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
@@ -135,14 +138,14 @@
std::array<std::byte, kBufferSize> buffer;
sync::Mutex mutex;
std::array<RpcLogDrain, 1> drains{
- RpcLogDrain(
- drain_id,
- buffer,
- mutex,
- RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+ RpcLogDrain(drain_id,
+ buffer,
+ mutex,
+ RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+ nullptr),
};
RpcLogDrainMap drain_map(drains);
- LogService log_service(drain_map);
+ LogService log_service(drain_map, nullptr);
rpc::RawFakeChannelOutput<1, 128> output;
rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));