pw_{log, log_rpc}: Add log filters

- Add log filter proto message that contains rules for filtering logs
based on level, flags, and module information. Each filter has an
identifiable byte array, which can be a token or human readable string,
and is used to configure new filter rules.
- Add LogService methods to get and modify log filters.
- RpcLogDrain can get an optional filter to check if a log should be
dropped.
- Remove unused RpcLogDrain constructor with open server writer.
- Avoid sending empty packets in RpcLogEntry if all messages are
filtered out.

Requires: pigweed-internal:18160
Change-Id: I7fd299e6f281432f99c6a1bd8ede4223ca8e0a3d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/65000
Reviewed-by: Ewout van Bekkum <ewout@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_log/BUILD.bazel b/pw_log/BUILD.bazel
index 6bb0a87..adc8f9e 100644
--- a/pw_log/BUILD.bazel
+++ b/pw_log/BUILD.bazel
@@ -71,7 +71,10 @@
     ],
     import_prefix = "pw_log/proto",
     strip_import_prefix = "//pw_log",
-    deps = ["//pw_tokenizer:tokenizer_proto"],
+    deps = [
+        "//pw_protobuf:common_protos",
+        "//pw_tokenizer:tokenizer_proto",
+    ],
 )
 
 pw_proto_library(
diff --git a/pw_log/BUILD.gn b/pw_log/BUILD.gn
index 9b64284..0da366b 100644
--- a/pw_log/BUILD.gn
+++ b/pw_log/BUILD.gn
@@ -123,7 +123,10 @@
 pw_proto_library("protos") {
   sources = [ "log.proto" ]
   prefix = "pw_log/proto"
-  deps = [ "$dir_pw_tokenizer:proto" ]
+  deps = [
+    "$dir_pw_protobuf:common_protos",
+    "$dir_pw_tokenizer:proto",
+  ]
 }
 
 pw_doc_group("docs") {
diff --git a/pw_log/log.proto b/pw_log/log.proto
index 6226d2b..046041f 100644
--- a/pw_log/log.proto
+++ b/pw_log/log.proto
@@ -16,6 +16,7 @@
 
 package pw.log;
 
+import "pw_protobuf_protos/common.proto";
 import "pw_tokenizer/proto/options.proto";
 
 option java_outer_classname = "Log";
@@ -167,7 +168,62 @@
   repeated LogEntry entries = 1;
 }
 
+message FilterRule {
+  // Log level values match pw_log/levels.h. Enum names avoid collissions with
+  // possible macros.
+  enum Level {
+    ANY_LEVEL = 0;
+    DEBUG_LEVEL = 1;
+    INFO_LEVEL = 2;
+    WARN_LEVEL = 3;
+    ERROR_LEVEL = 4;
+    CRITICAL_LEVEL = 5;
+    FATAL_LEVEL = 7;
+  };
+  // Condition 1: log.level >= level_greater_than_or_equal.
+  Level level_greater_than_or_equal = 1;
+
+  // Condition 2: (module_equals.size() == 0) || (log.module == module_equals);
+  bytes module_equals = 2 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+
+  // Condition 3: (any_flags_set == 0) || (log.flags & any_flags_set) != 0))
+  uint32 any_flags_set = 3;
+
+  // Action to take if all conditions are met and rule is not inactive.
+  enum Action {
+    INACTIVE = 0;  // Ignore the rule entirely.
+    KEEP = 1;      // Keep the log entry if all conditions are met.
+    DROP = 2;      // Drop the log entry if all conditions are met
+  };
+  Action action = 4;
+}
+
+// A filter is a series of rules. First matching rule wins.
+message Filter {
+  repeated FilterRule rule = 1;
+}
+
+message SetFilterRequest {
+  // A filter can be identified by a human readable string, token, or number.
+  bytes filter_id = 1 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+
+  Filter filter = 2;
+}
+
+message GetFilterRequest {
+  bytes filter_id = 1 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+}
+
+message FilterIdListRequest {}
+
+message FilterIdListResponse {
+  repeated bytes filter_id = 1 [(tokenizer.format) = TOKENIZATION_OPTIONAL];
+}
+
 // RPC service for accessing logs.
 service Logs {
   rpc Listen(LogRequest) returns (stream LogEntries);
+  rpc SetFilter(SetFilterRequest) returns (pw.protobuf.Empty);
+  rpc GetFilter(GetFilterRequest) returns (Filter);
+  rpc ListFilterIds(FilterIdListRequest) returns (FilterIdListResponse);
 }
diff --git a/pw_log_rpc/BUILD.bazel b/pw_log_rpc/BUILD.bazel
index 517b457..64dce44 100644
--- a/pw_log_rpc/BUILD.bazel
+++ b/pw_log_rpc/BUILD.bazel
@@ -28,10 +28,34 @@
     hdrs = ["public/pw_log_rpc/log_service.h"],
     includes = ["public"],
     deps = [
+        ":log_filter",
         ":rpc_log_drain",
         "//pw_log",
         "//pw_log:log_pwpb",
         "//pw_log:protos.raw_rpc",
+        "//pw_protobuf",
+        "//pw_protobuf:bytes_utils",
+    ],
+)
+
+pw_cc_library(
+    name = "log_filter",
+    srcs = ["log_filter.cc"],
+    hdrs = [
+        "public/pw_log_rpc/log_filter.h",
+        "public/pw_log_rpc/log_filter_map.h",
+    ],
+    includes = ["public"],
+    deps = [
+        "public/pw_log_rpc/internal/config.h",
+        "//pw_assert",
+        "//pw_bytes",
+        "//pw_containers:vector",
+        "//pw_log",
+        "//pw_log:log_pwpb",
+        "//pw_log:protos.pwpb",
+        "//pw_protobuf",
+        "//pw_status",
     ],
 )
 
@@ -44,6 +68,7 @@
     ],
     includes = ["public"],
     deps = [
+        ":log_filter",
         "//pw_assert",
         "//pw_log:log_pwpb",
         "//pw_log:protos.raw_rpc",
@@ -74,15 +99,15 @@
 
 pw_cc_test(
     name = "log_service_test",
-    srcs = [
-        "log_service_test.cc",
-    ],
+    srcs = ["log_service_test.cc"],
     deps = [
+        ":log_filter",
         ":log_service",
         "//pw_containers:vector",
         "//pw_log",
         "//pw_log:log_pwpb",
         "//pw_log:proto_utils",
+        "//pw_log_tokenized:metadata",
         "//pw_protobuf",
         "//pw_protobuf:bytes_utils",
         "//pw_result",
@@ -93,10 +118,22 @@
 )
 
 pw_cc_test(
-    name = "rpc_log_drain_test",
-    srcs = [
-        "rpc_log_drain_test.cc",
+    name = "log_filter_test",
+    srcs = ["log_filter_test.cc"],
+    deps = [
+        ":log_filter",
+        "//pw_log:log_pwpb",
+        "//pw_log:proto_utils",
+        "//pw_log_tokenized:metadata",
+        "//pw_result",
+        "//pw_status",
+        "//pw_unit_test",
     ],
+)
+
+pw_cc_test(
+    name = "rpc_log_drain_test",
+    srcs = ["rpc_log_drain_test.cc"],
     deps = [
         ":log_service",
         ":rpc_log_drain",
diff --git a/pw_log_rpc/BUILD.gn b/pw_log_rpc/BUILD.gn
index 1d05ed5..d974183 100644
--- a/pw_log_rpc/BUILD.gn
+++ b/pw_log_rpc/BUILD.gn
@@ -18,33 +18,66 @@
 import("$dir_pw_docgen/docs.gni")
 import("$dir_pw_unit_test/test.gni")
 
-config("default_config") {
+config("public_include_path") {
   include_dirs = [ "public" ]
   visibility = [ ":*" ]
 }
 
+pw_source_set("config") {
+  sources = [ "public/pw_log_rpc/internal/config.h" ]
+  public_configs = [ ":public_include_path" ]
+  visibility = [ "./*" ]
+  friend = [ "./*" ]
+}
+
 pw_source_set("log_service") {
-  public_configs = [ ":default_config" ]
+  public_configs = [ ":public_include_path" ]
   public = [ "public/pw_log_rpc/log_service.h" ]
   sources = [ "log_service.cc" ]
   deps = [
     "$dir_pw_log",
     "$dir_pw_log:protos.pwpb",
+    "$dir_pw_protobuf",
   ]
   public_deps = [
+    ":log_filter",
     ":rpc_log_drain",
     "$dir_pw_log:protos.raw_rpc",
+    "$dir_pw_protobuf:bytes_utils",
+  ]
+}
+
+pw_source_set("log_filter") {
+  public_configs = [ ":public_include_path" ]
+  public = [
+    "public/pw_log_rpc/log_filter.h",
+    "public/pw_log_rpc/log_filter_map.h",
+  ]
+  sources = [ "log_filter.cc" ]
+  deps = [
+    "$dir_pw_log",
+    "$dir_pw_protobuf",
+  ]
+  public_deps = [
+    ":config",
+    "$dir_pw_assert",
+    "$dir_pw_bytes",
+    "$dir_pw_containers:vector",
+    "$dir_pw_log:protos.pwpb",
+    "$dir_pw_protobuf",
+    "$dir_pw_status",
   ]
 }
 
 pw_source_set("rpc_log_drain") {
-  public_configs = [ ":default_config" ]
+  public_configs = [ ":public_include_path" ]
   public = [
     "public/pw_log_rpc/rpc_log_drain.h",
     "public/pw_log_rpc/rpc_log_drain_map.h",
   ]
   sources = [ "rpc_log_drain.cc" ]
   public_deps = [
+    ":log_filter",
     "$dir_pw_assert",
     "$dir_pw_log:protos.pwpb",
     "$dir_pw_log:protos.raw_rpc",
@@ -58,7 +91,7 @@
 }
 
 pw_source_set("rpc_log_drain_thread") {
-  public_configs = [ ":default_config" ]
+  public_configs = [ ":public_include_path" ]
   public = [ "public/pw_log_rpc/rpc_log_drain_thread.h" ]
   sources = []
   public_deps = [
@@ -76,11 +109,13 @@
 pw_test("log_service_test") {
   sources = [ "log_service_test.cc" ]
   deps = [
+    ":log_filter",
     ":log_service",
     "$dir_pw_containers:vector",
     "$dir_pw_log",
     "$dir_pw_log:proto_utils",
     "$dir_pw_log:protos.pwpb",
+    "$dir_pw_log_tokenized:metadata",
     "$dir_pw_protobuf",
     "$dir_pw_protobuf:bytes_utils",
     "$dir_pw_result",
@@ -89,6 +124,18 @@
   ]
 }
 
+pw_test("log_filter_test") {
+  sources = [ "log_filter_test.cc" ]
+  deps = [
+    ":log_filter",
+    "$dir_pw_log:proto_utils",
+    "$dir_pw_log:protos.pwpb",
+    "$dir_pw_log_tokenized:metadata",
+    "$dir_pw_result",
+    "$dir_pw_status",
+  ]
+}
+
 pw_test("rpc_log_drain_test") {
   sources = [ "rpc_log_drain_test.cc" ]
   deps = [
@@ -105,6 +152,7 @@
 
 pw_test_group("tests") {
   tests = [
+    ":log_filter_test",
     ":log_service_test",
     ":rpc_log_drain_test",
   ]
diff --git a/pw_log_rpc/docs.rst b/pw_log_rpc/docs.rst
index af866a9..d0dfd91 100644
--- a/pw_log_rpc/docs.rst
+++ b/pw_log_rpc/docs.rst
@@ -1,14 +1,15 @@
 .. _module-pw_log_rpc:
 
-----------
+==========
 pw_log_rpc
-----------
-An RPC-based logging backend for Pigweed.
+==========
+An RPC-based logging solution for Pigweed with log filtering and log drops
+reporting -- coming soon!
 
 .. warning::
   This module is under construction and might change in the future.
 
-How to use
+How to Use
 ==========
 1. Set up RPC
 -------------
@@ -27,13 +28,15 @@
 ``pw_tokenizer_HandleEncodedMessageWithPayload``, encode log entries in the
 ``log::LogEntry`` format, and add them to the ``MultiSink``.
 
-4. Create log drains
---------------------
+4. Create log drains and filters
+--------------------------------
 Create an ``RpcLogDrainMap`` with one ``RpcLogDrain`` for each RPC channel used
-to stream logs. Provide this map to the ``LogService`` and register the latter
+to stream logs. Optionally, create a ``FilterMap`` with ``Filter`` objects with
+different IDs. Provide these map to the ``LogService`` and register the latter
 with the application's RPC service. The ``RpcLogDrainMap`` provides a convenient
 way to access and maintain each ``RpcLogDrain``. Attach each ``RpcLogDrain`` to
-the ``MultiSink``.
+the ``MultiSink``. Optionally, set the ``RpcLogDrain`` callback to decide if a
+log should be kept or dropped. This callback can be ``Filter::ShouldDropLog``.
 
 5. Flush the log drains in the background
 -----------------------------------------
@@ -80,16 +83,19 @@
     pw_rpc-->computer[Computer];
     pw_rpc-->other_listener[Other log<br>listener];
 
+Components Overview
+===================
 RPC log service
-===============
+---------------
 The ``LogService`` class is an RPC service that provides a way to request a log
-stream sent via RPC. Thus, it helps avoid using a different protocol for logs
-and RPCs over the same interface(s). It requires a map of ``RpcLogDrains`` to
-assign stream writers and delegate the log stream flushing to the user's
-preferred method.
+stream sent via RPC and configure log filters. Thus, it helps avoid using a
+different protocol for logs and RPCs over the same interface(s).
+It requires a ``RpcLogDrainMap`` to assign stream writers and delegate the
+log stream flushing to the user's preferred method, as well as a ``FilterMap``
+to retrieve and modify filters.
 
 RpcLogDrain
-===========
+-----------
 An ``RpcLogDrain`` reads from the ``MultiSink`` instance that buffers logs, then
 packs, and sends the retrieved log entries to the log listener. One
 ``RpcLogDrain`` is needed for each log listener. An ``RpcLogDrain`` needs a
@@ -124,12 +130,12 @@
 count with the logs if desired.
 
 RpcLogDrainMap
-==============
+--------------
 Provides a convenient way to access all or a single ``RpcLogDrain`` by its RPC
 channel ID.
 
 RpcLogDrainThread
-=================
+-----------------
 The module includes a sample thread that flushes each drain sequentially. Future
 work might replace this with enqueueing the flush work on a work queue. The user
 can also choose to have different threads flushing individual ``RpcLogDrain``\s
@@ -138,6 +144,32 @@
 Calling ``OpenUnrequestedLogStream()`` is a convenient way to set up a log
 stream that is started without the need to receive an RCP request for logs.
 
+Filter::Rule
+------------
+Contains a set of values that are compared against a log when set. All
+conditions must be met for the rule to be met.
+
+- ``action``: drops or keeps the log if the other conditions match.
+  The rule is ignored when inactive.
+
+- ``any_flags_set``: the condition is met if this value is 0 or the log has any
+  of these flags set.
+
+- ``level_greater_than_or_equal``: the condition is met when the log level is
+  greater than or equal to this value.
+
+- ``module_equals``: the condition is met if this byte array is empty, or the
+  log module equals the contents of this byte array.
+
+Filter
+------
+``Filter`` encapsulates a collection of zero or more ``Filter::Rule``\s and has
+an ID used to modify or retrieve its contents.
+
+FilterMap
+---------
+Provides a convenient way to retrieve register filters by ID.
+
 Logging example
 ===============
 The following code shows a sample setup to defer the log handling to the
diff --git a/pw_log_rpc/log_filter.cc b/pw_log_rpc/log_filter.cc
new file mode 100644
index 0000000..5385be4
--- /dev/null
+++ b/pw_log_rpc/log_filter.cc
@@ -0,0 +1,131 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/log_filter.h"
+
+#include "pw_log/levels.h"
+#include "pw_protobuf/decoder.h"
+#include "pw_status/try.h"
+
+namespace pw::log_rpc {
+namespace {
+
+// Returns true if the provided log parameters match the given filter rule.
+bool IsRuleMet(const Filter::Rule& rule,
+               uint32_t level,
+               ConstByteSpan module,
+               uint32_t flags) {
+  if (level < static_cast<uint32_t>(rule.level_greater_than_or_equal)) {
+    return false;
+  }
+  if ((rule.any_flags_set != 0) && ((flags & rule.any_flags_set) == 0)) {
+    return false;
+  }
+  if (!rule.module_equals.empty() && !std::equal(module.begin(),
+                                                 module.end(),
+                                                 rule.module_equals.begin(),
+                                                 rule.module_equals.end())) {
+    return false;
+  }
+  return true;
+}
+
+}  // namespace
+
+Status Filter::UpdateRulesFromProto(ConstByteSpan buffer) {
+  if (rules_.empty()) {
+    return Status::FailedPrecondition();
+  }
+
+  // Reset rules.
+  for (auto& rule : rules_) {
+    rule = {};
+  }
+
+  protobuf::Decoder decoder(buffer);
+  Status status;
+  for (size_t i = 0; (i < rules_.size()) && (status = decoder.Next()).ok();
+       ++i) {
+    ConstByteSpan rule_buffer;
+    PW_TRY(decoder.ReadBytes(&rule_buffer));
+    protobuf::Decoder rule_decoder(rule_buffer);
+    while ((status = rule_decoder.Next()).ok()) {
+      switch (
+          static_cast<log::FilterRule::Fields>(rule_decoder.FieldNumber())) {
+        case log::FilterRule::Fields::LEVEL_GREATER_THAN_OR_EQUAL:
+          PW_TRY(rule_decoder.ReadUint32(reinterpret_cast<uint32_t*>(
+              &rules_[i].level_greater_than_or_equal)));
+          break;
+        case log::FilterRule::Fields::MODULE_EQUALS: {
+          ConstByteSpan module;
+          PW_TRY(rule_decoder.ReadBytes(&module));
+          if (module.size() > rules_[i].module_equals.max_size()) {
+            return Status::InvalidArgument();
+          }
+          rules_[i].module_equals.assign(module.begin(), module.end());
+        } break;
+        case log::FilterRule::Fields::ANY_FLAGS_SET:
+          PW_TRY(rule_decoder.ReadUint32(&rules_[i].any_flags_set));
+          break;
+        case log::FilterRule::Fields::ACTION:
+          PW_TRY(rule_decoder.ReadUint32(
+              reinterpret_cast<uint32_t*>(&rules_[i].action)));
+          break;
+      }
+    }
+  }
+  return status.IsOutOfRange() ? OkStatus() : status;
+}
+
+bool Filter::ShouldDropLog(ConstByteSpan entry) const {
+  if (rules_.empty()) {
+    return false;
+  }
+
+  uint32_t log_level = 0;
+  ConstByteSpan log_module;
+  uint32_t log_flags = 0;
+  protobuf::Decoder decoder(entry);
+  while (decoder.Next().ok()) {
+    switch (static_cast<log::LogEntry::Fields>(decoder.FieldNumber())) {
+      case log::LogEntry::Fields::LINE_LEVEL:
+        if (decoder.ReadUint32(&log_level).ok()) {
+          log_level &= PW_LOG_LEVEL_BITMASK;
+        }
+        break;
+      case log::LogEntry::Fields::MODULE:
+        decoder.ReadBytes(&log_module).IgnoreError();
+        break;
+      case log::LogEntry::Fields::FLAGS:
+        decoder.ReadUint32(&log_flags).IgnoreError();
+        break;
+      default:
+        break;
+    }
+  }
+
+  // Follow the action of the first rule whose condition is met.
+  for (const auto& rule : rules_) {
+    if (rule.action == Filter::Rule::Action::kInactive) {
+      continue;
+    }
+    if (IsRuleMet(rule, log_level, log_module, log_flags)) {
+      return rule.action == Filter::Rule::Action::kDrop;
+    }
+  }
+
+  return false;
+}
+
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_filter_test.cc b/pw_log_rpc/log_filter_test.cc
new file mode 100644
index 0000000..fa41215
--- /dev/null
+++ b/pw_log_rpc/log_filter_test.cc
@@ -0,0 +1,514 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_log_rpc/log_filter.h"
+
+#include <array>
+#include <cstddef>
+#include <cstring>
+
+#include "gtest/gtest.h"
+#include "pw_bytes/endian.h"
+#include "pw_log/levels.h"
+#include "pw_log/log.h"
+#include "pw_log/proto/log.pwpb.h"
+#include "pw_log/proto_utils.h"
+#include "pw_log_rpc/log_filter_map.h"
+#include "pw_log_tokenized/metadata.h"
+#include "pw_result/result.h"
+#include "pw_status/status.h"
+#include "pw_status/try.h"
+
+namespace pw::log_rpc {
+namespace {
+
+constexpr uint32_t kSampleModule = 0x1234;
+constexpr uint32_t kSampleFlags = 0x3;
+constexpr char kSampleMessage[] = "message";
+constexpr auto kSampleModuleLittleEndian =
+    bytes::CopyInOrder<uint32_t>(std::endian::little, kSampleModule);
+
+// Creates and encodes a log entry in the provided buffer.
+template <uintptr_t log_level, uintptr_t module, uintptr_t flags>
+Result<ConstByteSpan> EncodeLogEntry(std::string_view message,
+                                     ByteSpan buffer) {
+  auto metadata = log_tokenized::Metadata::Set<log_level, module, flags, 0>();
+  return log::EncodeTokenizedLog(metadata,
+                                 std::as_bytes(std::span(message)),
+                                 /*ticks_since_epoch=*/0,
+                                 buffer);
+}
+
+Status EncodeFilterRule(const Filter::Rule& rule,
+                        log::FilterRule::StreamEncoder& encoder) {
+  PW_TRY(
+      encoder.WriteLevelGreaterThanOrEqual(rule.level_greater_than_or_equal));
+  PW_TRY(encoder.WriteModuleEquals(rule.module_equals));
+  PW_TRY(encoder.WriteAnyFlagsSet(rule.any_flags_set));
+  return encoder.WriteAction(static_cast<log::FilterRule::Action>(rule.action));
+}
+
+Result<ConstByteSpan> EncodeFilter(const Filter& filter, ByteSpan buffer) {
+  log::Filter::MemoryEncoder encoder(buffer);
+  for (auto& rule : filter.rules()) {
+    log::FilterRule::StreamEncoder rule_encoder = encoder.GetRuleEncoder();
+    PW_TRY(EncodeFilterRule(rule, rule_encoder));
+  }
+  return ConstByteSpan(encoder);
+}
+
+void VerifyRule(const Filter::Rule& rule, const Filter::Rule& expected_rule) {
+  EXPECT_EQ(rule.level_greater_than_or_equal,
+            expected_rule.level_greater_than_or_equal);
+  EXPECT_EQ(rule.module_equals, expected_rule.module_equals);
+  EXPECT_EQ(rule.any_flags_set, expected_rule.any_flags_set);
+  EXPECT_EQ(rule.action, expected_rule.action);
+}
+
+TEST(FilterMap, RetrieveFiltersById) {
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1{
+      std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2{
+      std::byte(0xca), std::byte(0xfe), std::byte(0xc0), std::byte(0xc0)};
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3{
+      std::byte(0xfa), std::byte(0xfe), std::byte(0xf1), std::byte(0xf0)};
+  std::array<Filter, 3> filters = {
+      Filter(filter_id1, {}),
+      Filter(filter_id2, {}),
+      Filter(filter_id3, {}),
+  };
+
+  FilterMap filter_map(filters);
+
+  // Check that each filters() element points to the same object provided.
+  std::span<const Filter> filter_list = filter_map.filters();
+  ASSERT_EQ(filter_list.size(), filters.size());
+  size_t i = 0;
+  for (auto& filter : filter_list) {
+    EXPECT_EQ(&filter, &filters[i++]);
+  }
+
+  auto filter_result = filter_map.GetFilterFromId(filter_id3);
+  ASSERT_TRUE(filter_result.ok());
+  EXPECT_EQ(filter_result.value(), &filters[2]);
+
+  filter_result = filter_map.GetFilterFromId(filter_id2);
+  ASSERT_TRUE(filter_result.ok());
+  EXPECT_EQ(filter_result.value(), &filters[1]);
+
+  filter_result = filter_map.GetFilterFromId(filter_id1);
+  ASSERT_TRUE(filter_result.ok());
+  EXPECT_EQ(filter_result.value(), &filters[0]);
+
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> invalid_id{
+      std::byte(0xd0), std::byte(0x1c), std::byte(0xe7), std::byte(0xea)};
+  filter_result = filter_map.GetFilterFromId(invalid_id);
+  ASSERT_EQ(filter_result.status(), Status::NotFound());
+}
+
+TEST(Filter, UpdateFilterRules) {
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+      std::byte(0xba), std::byte(0x1d), std::byte(0xba), std::byte(0xb1)};
+  std::array<Filter::Rule, 4> rules;
+  const std::array<Filter::Rule, 4> new_rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+          .any_flags_set = 0x0f,
+          .module_equals{std::byte(123)},
+      },
+      {
+          .action = Filter::Rule::Action::kInactive,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xef,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = 0x1234,
+          .module_equals{std::byte(99)},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0,
+          .module_equals{std::byte(4)},
+      },
+  }};
+
+  Filter filter(filter_id, rules);
+  const Filter new_filter(filter_id,
+                          const_cast<std::array<Filter::Rule, 4>&>(new_rules));
+  std::byte buffer[256];
+  auto encode_result = EncodeFilter(new_filter, buffer);
+  ASSERT_EQ(encode_result.status(), OkStatus());
+  EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+
+  size_t i = 0;
+  for (const auto& rule : filter.rules()) {
+    VerifyRule(rule, new_rules[i++]);
+  }
+
+  // A new filter with no rules should clear filter.
+  const Filter empty_filter(filter_id, {});
+  std::memset(buffer, 0, sizeof(buffer));
+  encode_result = EncodeFilter(empty_filter, buffer);
+  ASSERT_EQ(encode_result.status(), OkStatus());
+  EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+  const Filter::Rule empty_rule{
+      .action = Filter::Rule::Action::kInactive,
+      .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+      .any_flags_set = 0,
+      .module_equals{},
+  };
+  for (const auto& rule : filter.rules()) {
+    VerifyRule(rule, empty_rule);
+  }
+  EXPECT_TRUE(empty_filter.rules().empty());
+
+  // Passing a new filter with less rules.
+  const std::array<Filter::Rule, 2> few_rules{{
+      {
+          .action = Filter::Rule::Action::kInactive,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xef,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = 0x1234,
+          .module_equals{std::byte(99)},
+      },
+  }};
+  const Filter filter_few_rules(
+      filter_id, const_cast<std::array<Filter::Rule, 2>&>(few_rules));
+  std::memset(buffer, 0, sizeof(buffer));
+  encode_result = EncodeFilter(filter_few_rules, buffer);
+  ASSERT_EQ(encode_result.status(), OkStatus());
+  EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+  i = 0;
+  for (const auto& rule : filter.rules()) {
+    if (i >= few_rules.size()) {
+      VerifyRule(rule, empty_rule);
+    } else {
+      VerifyRule(rule, few_rules[i++]);
+    }
+  }
+
+  // Passing a new filter with extra rules.
+  const std::array<Filter::Rule, 6> extra_rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+          .any_flags_set = 0x0f,
+          .module_equals{std::byte(123)},
+      },
+      {
+          .action = Filter::Rule::Action::kInactive,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xef,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kInactive,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xef,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = 0x1234,
+          .module_equals{std::byte(99)},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0,
+          .module_equals{std::byte(4)},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = 0x1234,
+          .module_equals{std::byte(99)},
+      },
+  }};
+  const Filter filter_extra_rules(
+      filter_id, const_cast<std::array<Filter::Rule, 6>&>(extra_rules));
+  std::memset(buffer, 0, sizeof(buffer));
+  encode_result = EncodeFilter(filter_extra_rules, buffer);
+  ASSERT_EQ(encode_result.status(), OkStatus());
+  EXPECT_EQ(filter.UpdateRulesFromProto(encode_result.value()), OkStatus());
+  i = 0;
+  for (const auto& rule : filter.rules()) {
+    VerifyRule(rule, extra_rules[i++]);
+  }
+
+  // A filter with no rules buffer cannot get rules updated.
+  Filter filter_no_rules(filter_id, {});
+  EXPECT_EQ(filter_no_rules.UpdateRulesFromProto(encode_result.value()),
+            Status::FailedPrecondition());
+}
+
+TEST(FilterTest, FilterLogsRuleDefaultDrop) {
+  const std::array<Filter::Rule, 2> rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = kSampleFlags,
+          .module_equals{kSampleModuleLittleEndian.begin(),
+                         kSampleModuleLittleEndian.end()},
+      },
+      // This rule catches all logs.
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0,
+          .module_equals = {},
+      },
+  }};
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+      std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+  const Filter filter(filter_id,
+                      const_cast<std::array<Filter::Rule, 2>&>(rules));
+
+  std::array<std::byte, 50> buffer;
+  const Result<ConstByteSpan> log_entry_info =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_debug =
+      EncodeLogEntry<PW_LOG_LEVEL_DEBUG, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_debug.status(), OkStatus());
+  EXPECT_TRUE(filter.ShouldDropLog(log_entry_debug.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_warn =
+      EncodeLogEntry<PW_LOG_LEVEL_WARN, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_warn.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_warn.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_error =
+      EncodeLogEntry<PW_LOG_LEVEL_ERROR, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_error.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_error.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_info_different =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, 0, 0>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info_different.status(), OkStatus());
+  EXPECT_TRUE(filter.ShouldDropLog(log_entry_info_different.value()));
+  // Because the last rule catches all logs, the filter default action is not
+  // applied.
+  const Filter filter_default_drop(
+      filter_id, const_cast<std::array<Filter::Rule, 2>&>(rules));
+  EXPECT_TRUE(
+      filter_default_drop.ShouldDropLog(log_entry_info_different.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_same_flags =
+      EncodeLogEntry<0, 0, kSampleFlags>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_same_flags.status(), OkStatus());
+  EXPECT_TRUE(filter.ShouldDropLog(log_entry_same_flags.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_same_module =
+      EncodeLogEntry<0, kSampleModule, 0>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_same_module.status(), OkStatus());
+  EXPECT_TRUE(filter.ShouldDropLog(log_entry_same_module.value()));
+}
+
+TEST(FilterTest, FilterLogsKeepLogsWhenNoRuleMatches) {
+  // There is no rule that catches all logs.
+  const std::array<Filter::Rule, 1> rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = kSampleFlags,
+          .module_equals = {kSampleModuleLittleEndian.begin(),
+                            kSampleModuleLittleEndian.end()},
+      },
+  }};
+
+  // Filters should not share rules if they are mutable, to avoid race
+  // conditions.
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+      std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+  const Filter filter(filter_id,
+                      const_cast<std::array<Filter::Rule, 1>&>(rules));
+
+  std::array<std::byte, 50> buffer;
+  const Result<ConstByteSpan> log_entry_info =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_debug =
+      EncodeLogEntry<PW_LOG_LEVEL_DEBUG, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_debug.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_debug.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_warn =
+      EncodeLogEntry<PW_LOG_LEVEL_WARN, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_warn.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_warn.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_error =
+      EncodeLogEntry<PW_LOG_LEVEL_ERROR, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_error.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_error.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_info_different =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, 0, 0>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info_different.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_info_different.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_same_flags =
+      EncodeLogEntry<0, 0, kSampleFlags>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_same_flags.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_flags.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_same_module =
+      EncodeLogEntry<0, kSampleModule, 0>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_same_module.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_module.value()));
+}
+
+TEST(FilterTest, FilterLogsKeepLogsWhenRulesEmpty) {
+  // Filters should not share rules if they are mutable, to avoid race
+  // conditions.
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id{
+      std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+  const Filter filter(filter_id, {});
+
+  std::array<std::byte, 50> buffer;
+  const Result<ConstByteSpan> log_entry_info =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_debug =
+      EncodeLogEntry<PW_LOG_LEVEL_DEBUG, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_debug.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_debug.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_warn =
+      EncodeLogEntry<PW_LOG_LEVEL_WARN, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_warn.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_warn.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_error =
+      EncodeLogEntry<PW_LOG_LEVEL_ERROR, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_error.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_error.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_info_different =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, 0, 0>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info_different.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_info_different.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_same_flags =
+      EncodeLogEntry<0, 0, kSampleFlags>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_same_flags.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_flags.value()));
+
+  buffer.fill(std::byte(0));
+  const Result<ConstByteSpan> log_entry_same_module =
+      EncodeLogEntry<0, kSampleModule, 0>(kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_same_module.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_same_module.value()));
+}
+
+TEST(FilterTest, FilterLogsFirstRuleWins) {
+  const std::array<Filter::Rule, 2> rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = kSampleFlags,
+          .module_equals = {kSampleModuleLittleEndian.begin(),
+                            kSampleModuleLittleEndian.end()},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = kSampleFlags,
+          .module_equals = {kSampleModuleLittleEndian.begin(),
+                            kSampleModuleLittleEndian.end()},
+      },
+  }};
+  const std::array<Filter::Rule, 2> rules_reversed{{
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = kSampleFlags,
+          .module_equals = {kSampleModuleLittleEndian.begin(),
+                            kSampleModuleLittleEndian.end()},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = kSampleFlags,
+          .module_equals = {kSampleModuleLittleEndian.begin(),
+                            kSampleModuleLittleEndian.end()},
+      },
+  }};
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1{
+      std::byte(0xfe), std::byte(0xed), std::byte(0xba), std::byte(0xb1)};
+  const std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2{
+      std::byte(0), std::byte(0), std::byte(0), std::byte(2)};
+  const Filter filter(filter_id1,
+                      const_cast<std::array<Filter::Rule, 2>&>(rules));
+  const Filter filter_reverse_rules(
+      filter_id2, const_cast<std::array<Filter::Rule, 2>&>(rules_reversed));
+
+  std::array<std::byte, 50> buffer;
+  const Result<ConstByteSpan> log_entry_info =
+      EncodeLogEntry<PW_LOG_LEVEL_INFO, kSampleModule, kSampleFlags>(
+          kSampleMessage, buffer);
+  ASSERT_EQ(log_entry_info.status(), OkStatus());
+  EXPECT_FALSE(filter.ShouldDropLog(log_entry_info.value()));
+  EXPECT_TRUE(filter_reverse_rules.ShouldDropLog(log_entry_info.value()));
+}
+
+}  // namespace
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_service.cc b/pw_log_rpc/log_service.cc
index 1da5471..eef904c 100644
--- a/pw_log_rpc/log_service.cc
+++ b/pw_log_rpc/log_service.cc
@@ -16,6 +16,8 @@
 
 #include "pw_log/log.h"
 #include "pw_log/proto/log.pwpb.h"
+#include "pw_log_rpc/log_filter.h"
+#include "pw_protobuf/decoder.h"
 
 namespace pw::log_rpc {
 
@@ -34,4 +36,83 @@
   }
 }
 
+StatusWithSize LogService::SetFilter(ServerContext&,
+                                     ConstByteSpan request,
+                                     ByteSpan) {
+  if (filters_ == nullptr) {
+    return StatusWithSize::NotFound();
+  }
+
+  protobuf::Decoder decoder(request);
+  PW_TRY_WITH_SIZE(decoder.Next());
+  if (static_cast<log::SetFilterRequest::Fields>(decoder.FieldNumber()) !=
+      log::SetFilterRequest::Fields::FILTER_ID) {
+    return StatusWithSize::InvalidArgument();
+  }
+  ConstByteSpan filter_id;
+  PW_TRY_WITH_SIZE(decoder.ReadBytes(&filter_id));
+  Result<Filter*> filter = filters_->GetFilterFromId(filter_id);
+  if (!filter.ok()) {
+    return StatusWithSize::NotFound();
+  }
+
+  PW_TRY_WITH_SIZE(decoder.Next());
+  ConstByteSpan filter_buffer;
+  if (static_cast<log::SetFilterRequest::Fields>(decoder.FieldNumber()) !=
+      log::SetFilterRequest::Fields::FILTER) {
+    return StatusWithSize::InvalidArgument();
+  }
+  PW_TRY_WITH_SIZE(decoder.ReadBytes(&filter_buffer));
+  PW_TRY_WITH_SIZE(filter.value()->UpdateRulesFromProto(filter_buffer));
+  return StatusWithSize();
+}
+
+StatusWithSize LogService::GetFilter(ServerContext&,
+                                     ConstByteSpan request,
+                                     ByteSpan response) {
+  if (filters_ == nullptr) {
+    return StatusWithSize::NotFound();
+  }
+  protobuf::Decoder decoder(request);
+  PW_TRY_WITH_SIZE(decoder.Next());
+  if (static_cast<log::GetFilterRequest::Fields>(decoder.FieldNumber()) !=
+      log::GetFilterRequest::Fields::FILTER_ID) {
+    return StatusWithSize::InvalidArgument();
+  }
+  ConstByteSpan filter_id;
+  PW_TRY_WITH_SIZE(decoder.ReadBytes(&filter_id));
+  Result<Filter*> filter = filters_->GetFilterFromId(filter_id);
+  if (!filter.ok()) {
+    return StatusWithSize::NotFound();
+  }
+
+  log::Filter::MemoryEncoder encoder(response);
+  for (auto& rule : (*filter)->rules()) {
+    log::FilterRule::StreamEncoder rule_encoder = encoder.GetRuleEncoder();
+    rule_encoder.WriteLevelGreaterThanOrEqual(rule.level_greater_than_or_equal)
+        .IgnoreError();
+    rule_encoder.WriteModuleEquals(rule.module_equals).IgnoreError();
+    rule_encoder.WriteAnyFlagsSet(rule.any_flags_set).IgnoreError();
+    rule_encoder.WriteAction(static_cast<log::FilterRule::Action>(rule.action))
+        .IgnoreError();
+    PW_TRY_WITH_SIZE(rule_encoder.status());
+  }
+  PW_TRY_WITH_SIZE(encoder.status());
+
+  return StatusWithSize(encoder.size());
+}
+
+StatusWithSize LogService::ListFilterIds(ServerContext&,
+                                         ConstByteSpan,
+                                         ByteSpan response) {
+  if (filters_ == nullptr) {
+    return StatusWithSize::NotFound();
+  }
+  log::FilterIdListResponse::MemoryEncoder encoder(response);
+  for (auto& filter : filters_->filters()) {
+    PW_TRY_WITH_SIZE(encoder.WriteFilterId(filter.id()));
+  }
+  return StatusWithSize(encoder.size());
+}
+
 }  // namespace pw::log_rpc
diff --git a/pw_log_rpc/log_service_test.cc b/pw_log_rpc/log_service_test.cc
index 992ee4d..e2e7db9 100644
--- a/pw_log_rpc/log_service_test.cc
+++ b/pw_log_rpc/log_service_test.cc
@@ -25,6 +25,7 @@
 #include "pw_log/log.h"
 #include "pw_log/proto/log.pwpb.h"
 #include "pw_log/proto_utils.h"
+#include "pw_log_rpc/log_filter.h"
 #include "pw_log_tokenized/metadata.h"
 #include "pw_protobuf/bytes_utils.h"
 #include "pw_protobuf/decoder.h"
@@ -69,7 +70,10 @@
 // add to the multisink, and which drain to use.
 class LogServiceTest : public ::testing::Test {
  public:
-  LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
+  LogServiceTest()
+      : multisink_(multisink_buffer_),
+        drain_map_(drains_),
+        filter_map_(filters_) {
     for (auto& drain : drain_map_.drains()) {
       multisink_.AttachDrain(drain);
     }
@@ -102,6 +106,22 @@
   multisink::MultiSink multisink_;
   RpcLogDrainMap drain_map_;
   std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_;
+  FilterMap filter_map_;
+  static constexpr size_t kMaxFilterRules = 4;
+  std::array<Filter::Rule, kMaxFilterRules> rules1_;
+  std::array<Filter::Rule, kMaxFilterRules> rules2_;
+  std::array<Filter::Rule, kMaxFilterRules> rules3_;
+  static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
+      std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
+  static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
+      std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
+  static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
+      std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
+  std::array<Filter, kMaxDrains> filters_ = {
+      Filter(filter_id1_, rules1_),
+      Filter(filter_id2_, rules2_),
+      Filter(filter_id3_, rules3_),
+  };
 
   // Drain Buffers
   std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
@@ -115,16 +135,18 @@
       RpcLogDrain(kIgnoreWriterErrorsDrainId,
                   drain_buffer1_,
                   shared_mutex_,
-                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
-      RpcLogDrain(
-          kCloseWriterOnErrorDrainId,
-          drain_buffer2_,
-          shared_mutex_,
-          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
+                  &filters_[0]),
+      RpcLogDrain(kCloseWriterOnErrorDrainId,
+                  drain_buffer2_,
+                  shared_mutex_,
+                  RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+                  &filters_[1]),
       RpcLogDrain(kSmallBufferDrainId,
                   small_buffer_,
                   shared_mutex_,
-                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
+                  &filters_[2]),
   };
 };
 struct TestLogEntry {
@@ -231,7 +253,7 @@
   // Create context directed to drain with ID 1.
   RpcLogDrain& active_drain = drains_[0];
   const uint32_t drain_channel_id = active_drain.channel_id();
-  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
   context.set_channel_id(drain_channel_id);
 
   // Call RPC, which sets the drain's writer.
@@ -247,7 +269,7 @@
 
   // Calling an ongoing log stream must not change the active drain's
   // writer, and the second writer must not get any responses.
-  LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_, &filter_map_);
   second_call_context.set_channel_id(drain_channel_id);
   second_call_context.call(rpc_request_buffer);
   EXPECT_EQ(active_drain.Flush(), OkStatus());
@@ -256,19 +278,19 @@
 
   // Setting a new writer on a closed stream is allowed.
   ASSERT_EQ(active_drain.Close(), OkStatus());
-  LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_, &filter_map_);
   third_call_context.set_channel_id(drain_channel_id);
   third_call_context.call(rpc_request_buffer);
   EXPECT_EQ(active_drain.Flush(), OkStatus());
   ASSERT_FALSE(third_call_context.done());
-  EXPECT_EQ(third_call_context.responses().size(), 1u);
+  EXPECT_EQ(third_call_context.responses().size(), 0u);
   EXPECT_EQ(active_drain.Close(), OkStatus());
 }
 
 TEST_F(LogServiceTest, StartAndEndStream) {
   RpcLogDrain& active_drain = drains_[2];
   const uint32_t drain_channel_id = active_drain.channel_id();
-  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
   context.set_channel_id(drain_channel_id);
 
   // Add log entries.
@@ -306,7 +328,7 @@
 TEST_F(LogServiceTest, HandleDropped) {
   RpcLogDrain& active_drain = drains_[0];
   const uint32_t drain_channel_id = active_drain.channel_id();
-  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
   context.set_channel_id(drain_channel_id);
 
   // Add log entries.
@@ -344,7 +366,7 @@
 }
 
 TEST_F(LogServiceTest, HandleSmallBuffer) {
-  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
   context.set_channel_id(kSmallBufferDrainId);
   auto small_buffer_drain =
       drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
@@ -378,7 +400,7 @@
 TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
   auto& detached_drain = drains_[0];
   multisink_.DetachDrain(detached_drain);
-  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
   context.set_channel_id(detached_drain.channel_id());
 
   // Add log entries.
@@ -423,7 +445,7 @@
   // Start log stream.
   RpcLogDrain& active_drain = drains_[0];
   const uint32_t drain_channel_id = active_drain.channel_id();
-  LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
   context.set_channel_id(drain_channel_id);
   context.call(rpc_request_buffer);
   ASSERT_EQ(active_drain.Flush(), OkStatus());
@@ -445,7 +467,7 @@
   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
   ASSERT_TRUE(drain.ok());
 
-  LogService log_service(drain_map_);
+  LogService log_service(drain_map_, &filter_map_);
   const size_t output_buffer_size = 128;
   const size_t max_packets = 10;
   rpc::RawFakeChannelOutput<10, output_buffer_size, 512> output;
@@ -533,7 +555,7 @@
   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
   ASSERT_TRUE(drain.ok());
 
-  LogService log_service(drain_map_);
+  LogService log_service(drain_map_, &filter_map_);
   const size_t output_buffer_size = 128;
   const size_t max_packets = 20;
   rpc::RawFakeChannelOutput<max_packets, output_buffer_size, 512> output;
@@ -607,5 +629,414 @@
   EXPECT_TRUE(output.done());
 }
 
+TEST_F(LogServiceTest, GetFilterIds) {
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, ListFilterIds, 1, 128)
+  context(drain_map_, &filter_map_);
+  context.call({});
+  ASSERT_TRUE(context.done());
+  ASSERT_EQ(context.responses().size(), 1u);
+  protobuf::Decoder decoder(context.responses()[0]);
+
+  for (const auto& filter : filter_map_.filters()) {
+    ASSERT_EQ(decoder.Next(), OkStatus());
+    ASSERT_EQ(decoder.FieldNumber(), 1u);  // filter_id
+    ConstByteSpan filter_id;
+    ASSERT_EQ(decoder.ReadBytes(&filter_id), OkStatus());
+    ASSERT_EQ(filter_id.size(), filter.id().size());
+    EXPECT_EQ(
+        std::memcmp(filter_id.data(), filter.id().data(), filter_id.size()), 0);
+  }
+  EXPECT_FALSE(decoder.Next().ok());
+
+  // No IDs reported when none registered in the filter map.
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, ListFilterIds, 1, 128)
+  no_filter_context(drain_map_, nullptr);
+  no_filter_context.call({});
+  ASSERT_TRUE(no_filter_context.done());
+  ASSERT_EQ(no_filter_context.responses().size(), 1u);
+  protobuf::Decoder no_filter_decoder(no_filter_context.responses()[0]);
+  uint32_t filter_count = 0;
+  while (no_filter_decoder.Next().ok()) {
+    EXPECT_EQ(no_filter_decoder.FieldNumber(), 1u);  // filter_id
+    ++filter_count;
+  }
+  EXPECT_EQ(filter_count, 0u);
+}
+
+Status EncodeFilterRule(const Filter::Rule& rule,
+                        log::FilterRule::StreamEncoder& encoder) {
+  PW_TRY(
+      encoder.WriteLevelGreaterThanOrEqual(rule.level_greater_than_or_equal));
+  PW_TRY(encoder.WriteModuleEquals(rule.module_equals));
+  PW_TRY(encoder.WriteAnyFlagsSet(rule.any_flags_set));
+  return encoder.WriteAction(static_cast<log::FilterRule::Action>(rule.action));
+}
+
+Status EncodeFilter(const Filter& filter, log::Filter::StreamEncoder& encoder) {
+  for (auto& rule : filter.rules()) {
+    log::FilterRule::StreamEncoder rule_encoder = encoder.GetRuleEncoder();
+    PW_TRY(EncodeFilterRule(rule, rule_encoder));
+  }
+  return OkStatus();
+}
+
+Result<ConstByteSpan> EncodeFilterRequest(const Filter& filter,
+                                          ByteSpan buffer) {
+  stream::MemoryWriter writer(buffer);
+  std::byte encode_buffer[256];
+  protobuf::StreamEncoder encoder(writer, encode_buffer);
+  PW_TRY(encoder.WriteBytes(
+      static_cast<uint32_t>(log::SetFilterRequest::Fields::FILTER_ID),
+      filter.id()));
+  {
+    log::Filter::StreamEncoder filter_encoder = encoder.GetNestedEncoder(
+        static_cast<uint32_t>(log::SetFilterRequest::Fields::FILTER));
+    PW_TRY(EncodeFilter(filter, filter_encoder));
+  }  // Let the StreamEncoder destructor finalize the data.
+  return ConstByteSpan(writer.data(), writer.bytes_written());
+}
+
+void VerifyRule(const Filter::Rule& rule, const Filter::Rule& expected_rule) {
+  EXPECT_EQ(rule.level_greater_than_or_equal,
+            expected_rule.level_greater_than_or_equal);
+  EXPECT_EQ(rule.module_equals, expected_rule.module_equals);
+  EXPECT_EQ(rule.any_flags_set, expected_rule.any_flags_set);
+  EXPECT_EQ(rule.action, expected_rule.action);
+}
+
+TEST_F(LogServiceTest, SetFilterRules) {
+  const std::array<Filter::Rule, 4> new_rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+          .any_flags_set = 0x0f,
+          .module_equals{std::byte(123)},
+      },
+      {
+          .action = Filter::Rule::Action::kInactive,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xef,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = 0x1234,
+          .module_equals{std::byte(99)},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0,
+          .module_equals{std::byte(4)},
+      },
+  }};
+  const Filter new_filter(filters_[0].id(),
+                          const_cast<std::array<Filter::Rule, 4>&>(new_rules));
+
+  std::byte request_buffer[512];
+  const auto request = EncodeFilterRequest(new_filter, request_buffer);
+  ASSERT_EQ(request.status(), OkStatus());
+
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+  context(drain_map_, &filter_map_);
+  context.call(request.value());
+
+  size_t i = 0;
+  for (const auto& rule : filters_[0].rules()) {
+    VerifyRule(rule, new_rules[i++]);
+  }
+}
+
+TEST_F(LogServiceTest, SetFilterRulesWhenUsedByDrain) {
+  const std::array<Filter::Rule, 4> new_filter_rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::CRITICAL_LEVEL,
+          .any_flags_set = 0xfd,
+          .module_equals{std::byte(543)},
+      },
+      {
+          .action = Filter::Rule::Action::kInactive,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xca,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+          .any_flags_set = 0xabcd,
+          .module_equals{std::byte(9000)},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0,
+          .module_equals{std::byte(123)},
+      },
+  }};
+  Filter& filter = filters_[0];
+  const Filter new_filter(
+      filter.id(), const_cast<std::array<Filter::Rule, 4>&>(new_filter_rules));
+
+  // Add callback to drain.
+  RpcLogDrain& drain = drains_[0];
+
+  std::byte request_buffer[256];
+  const auto request = EncodeFilterRequest(new_filter, request_buffer);
+  ASSERT_EQ(request.status(), OkStatus());
+
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+  context(drain_map_, &filter_map_);
+  context.set_channel_id(drain.channel_id());
+  context.call(request.value());
+
+  size_t i = 0;
+  for (const auto& rule : filter.rules()) {
+    VerifyRule(rule, new_filter_rules[i++]);
+  }
+
+  // A request for logs without a filter should not modify the filter.
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+  context_no_filter(drain_map_, &filter_map_);
+  context_no_filter.set_channel_id(drain.channel_id());
+  context_no_filter.call({});
+  i = 0;
+  for (const auto& rule : filter.rules()) {
+    VerifyRule(rule, new_filter_rules[i++]);
+  }
+
+  // A new request for logs with a new filter updates filter.
+  const std::array<Filter::Rule, 4> second_filter_rules{{
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL,
+          .any_flags_set = 0xab,
+          .module_equals{},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0x11,
+          .module_equals{std::byte(34)},
+      },
+      {
+          .action = Filter::Rule::Action::kKeep,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0xef,
+          .module_equals{std::byte(23)},
+      },
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0x0f,
+          .module_equals{},
+      },
+  }};
+  const Filter second_filter(
+      filter.id(),
+      const_cast<std::array<Filter::Rule, 4>&>(second_filter_rules));
+
+  std::memset(request_buffer, 0, sizeof(request_buffer));
+  const auto second_filter_request =
+      EncodeFilterRequest(second_filter, request_buffer);
+  ASSERT_EQ(second_filter_request.status(), OkStatus());
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+  context_new_filter(drain_map_, &filter_map_);
+  context_new_filter.set_channel_id(drain.channel_id());
+  context_new_filter.call(second_filter_request.value());
+
+  i = 0;
+  for (const auto& rule : filter.rules()) {
+    VerifyRule(rule, second_filter_rules[i++]);
+  }
+}
+
+TEST_F(LogServiceTest, FilterLogs) {
+  // Add a variety of logs.
+  const uint32_t module = 0xcafe;
+  const uint32_t flags = 0x02;
+  const uint32_t line_number = 100;
+  const auto debug_metadata = log_tokenized::Metadata::
+      Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
+  ASSERT_TRUE(AddLogEntry(kMessage, debug_metadata, kSampleTimestamp).ok());
+  const auto info_metadata = log_tokenized::Metadata::
+      Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
+  ASSERT_TRUE(AddLogEntry(kMessage, info_metadata, kSampleTimestamp).ok());
+  const auto warn_metadata = log_tokenized::Metadata::
+      Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
+  ASSERT_TRUE(AddLogEntry(kMessage, warn_metadata, kSampleTimestamp).ok());
+  const auto error_metadata = log_tokenized::Metadata::
+      Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
+  ASSERT_TRUE(AddLogEntry(kMessage, error_metadata, kSampleTimestamp).ok());
+  const auto different_flags_metadata = log_tokenized::Metadata::
+      Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
+  ASSERT_TRUE(
+      AddLogEntry(kMessage, different_flags_metadata, kSampleTimestamp).ok());
+  const auto different_module_metadata = log_tokenized::Metadata::
+      Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
+  ASSERT_TRUE(
+      AddLogEntry(kMessage, different_module_metadata, kSampleTimestamp).ok());
+
+  // Add messages to the stack in the reverse order they are sent.
+  Vector<TestLogEntry, 6> message_stack;
+  message_stack.push_back(
+      {.metadata = error_metadata,
+       .timestamp = kSampleTimestamp,
+       .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+  message_stack.push_back(
+      {.metadata = warn_metadata,
+       .timestamp = kSampleTimestamp,
+       .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+  message_stack.push_back(
+      {.metadata = info_metadata,
+       .timestamp = kSampleTimestamp,
+       .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
+
+  // Create request with filter.
+  const auto module_little_endian =
+      bytes::CopyInOrder<uint32_t>(std::endian::little, module);
+  const std::array<Filter::Rule, 2> rules{{
+      {.action = Filter::Rule::Action::kKeep,
+       .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
+       .any_flags_set = flags,
+       .module_equals{module_little_endian.begin(),
+                      module_little_endian.end()}},
+      {
+          .action = Filter::Rule::Action::kDrop,
+          .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
+          .any_flags_set = 0,
+          .module_equals{},
+      },
+  }};
+
+  RpcLogDrain& drain = drains_[1];
+  Filter& filter = filters_[1];
+  const Filter new_filter(filter.id(),
+                          const_cast<std::array<Filter::Rule, 2>&>(rules));
+
+  // Set filter.
+  std::byte request_buffer[256];
+  const auto request = EncodeFilterRequest(new_filter, request_buffer);
+  ASSERT_EQ(request.status(), OkStatus());
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, SetFilter, 1, 128)
+  set_filter_context(drain_map_, &filter_map_);
+  set_filter_context.set_channel_id(drain.channel_id());
+  set_filter_context.call(request.value());
+
+  // Request logs.
+  LOG_SERVICE_METHOD_CONTEXT context(drain_map_, &filter_map_);
+  context.set_channel_id(drain.channel_id());
+  context.call(request.value());
+  ASSERT_EQ(drain.Flush(), OkStatus());
+
+  size_t entries_found = 0;
+  for (auto& response : context.responses()) {
+    protobuf::Decoder entry_decoder(response);
+    entries_found += VerifyLogEntries(entry_decoder, message_stack);
+  }
+  EXPECT_EQ(entries_found, 3u);
+}
+
+void VerifyFilterRule(protobuf::Decoder& decoder,
+                      const Filter::Rule& expected_rule) {
+  ASSERT_TRUE(decoder.Next().ok());
+  ASSERT_EQ(decoder.FieldNumber(), 1u);  // level_greater_than_or_equal
+  log::FilterRule::Level level_greater_than_or_equal;
+  ASSERT_EQ(decoder.ReadUint32(
+                reinterpret_cast<uint32_t*>(&level_greater_than_or_equal)),
+            OkStatus());
+  EXPECT_EQ(level_greater_than_or_equal,
+            expected_rule.level_greater_than_or_equal);
+
+  ASSERT_TRUE(decoder.Next().ok());
+  ASSERT_EQ(decoder.FieldNumber(), 2u);  // module_equals
+  ConstByteSpan module_equals;
+  ASSERT_EQ(decoder.ReadBytes(&module_equals), OkStatus());
+  ASSERT_EQ(module_equals.size(), expected_rule.module_equals.size());
+  EXPECT_EQ(std::memcmp(module_equals.data(),
+                        expected_rule.module_equals.data(),
+                        module_equals.size()),
+            0);
+
+  ASSERT_TRUE(decoder.Next().ok());
+  ASSERT_EQ(decoder.FieldNumber(), 3u);  // any_flags_set
+  uint32_t any_flags_set;
+  ASSERT_EQ(decoder.ReadUint32(&any_flags_set), OkStatus());
+  EXPECT_EQ(any_flags_set, expected_rule.any_flags_set);
+
+  ASSERT_TRUE(decoder.Next().ok());
+  ASSERT_EQ(decoder.FieldNumber(), 4u);  // action
+  Filter::Rule::Action action;
+  ASSERT_EQ(decoder.ReadUint32(reinterpret_cast<uint32_t*>(&action)),
+            OkStatus());
+  EXPECT_EQ(action, expected_rule.action);
+}
+
+void VerifyFilterRules(protobuf::Decoder& decoder,
+                       std::span<const Filter::Rule> expected_rules) {
+  size_t rules_found = 0;
+  while (decoder.Next().ok()) {
+    ConstByteSpan rule;
+    EXPECT_TRUE(decoder.ReadBytes(&rule).ok());
+    protobuf::Decoder rule_decoder(rule);
+    if (rules_found >= expected_rules.size()) {
+      break;
+    }
+    VerifyFilterRule(rule_decoder, expected_rules[rules_found]);
+    ++rules_found;
+  }
+  EXPECT_EQ(rules_found, expected_rules.size());
+}
+
+TEST_F(LogServiceTest, GetFilterRules) {
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, GetFilter, 1, 128)
+  context(drain_map_, &filter_map_);
+
+  std::byte request_buffer[64];
+  log::GetFilterRequest::MemoryEncoder encoder(request_buffer);
+  encoder.WriteFilterId(filter_id1_);
+  const auto request = ConstByteSpan(encoder);
+  context.call(request);
+  ASSERT_TRUE(context.done());
+  ASSERT_EQ(context.responses().size(), 1u);
+
+  // Verify against empty rules.
+  protobuf::Decoder decoder(context.responses()[0]);
+  VerifyFilterRules(decoder, rules1_);
+
+  // Partially populate rules.
+  rules1_[0].action = Filter::Rule::Action::kKeep;
+  rules1_[0].level_greater_than_or_equal = log::FilterRule::Level::DEBUG_LEVEL;
+  rules1_[0].any_flags_set = 0xab;
+  const std::array<std::byte, 2> module1{std::byte(123), std::byte(0xab)};
+  rules1_[0].module_equals.assign(module1.begin(), module1.end());
+  rules1_[1].action = Filter::Rule::Action::kDrop;
+  rules1_[1].level_greater_than_or_equal = log::FilterRule::Level::ERROR_LEVEL;
+  rules1_[1].any_flags_set = 0;
+
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, GetFilter, 1, 128)
+  context2(drain_map_, &filter_map_);
+  context2.call(request);
+  ASSERT_EQ(context2.responses().size(), 1u);
+  protobuf::Decoder decoder2(context2.responses()[0]);
+  VerifyFilterRules(decoder2, rules1_);
+
+  // Modify the rest of the filter rules.
+  rules1_[2].action = Filter::Rule::Action::kKeep;
+  rules1_[2].level_greater_than_or_equal = log::FilterRule::Level::FATAL_LEVEL;
+  rules1_[2].any_flags_set = 0xcd;
+  const std::array<std::byte, 2> module2{std::byte(1), std::byte(2)};
+  rules1_[2].module_equals.assign(module2.begin(), module2.end());
+  rules1_[3].action = Filter::Rule::Action::kInactive;
+
+  PW_RAW_TEST_METHOD_CONTEXT(LogService, GetFilter, 1, 128)
+  context3(drain_map_, &filter_map_);
+  context3.call(request);
+  ASSERT_EQ(context3.responses().size(), 1u);
+  protobuf::Decoder decoder3(context3.responses()[0]);
+  VerifyFilterRules(decoder3, rules1_);
+}
+
 }  // namespace
 }  // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/internal/config.h b/pw_log_rpc/public/pw_log_rpc/internal/config.h
new file mode 100644
index 0000000..938ebc2
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/internal/config.h
@@ -0,0 +1,40 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+// Configuration macros for the pw_rpc module.
+#pragma once
+
+// Log filter modules are optionally tokenized, and thus their backing on-device
+// container can have different sizes. A token may be represented by a 32-bit
+// integer (though it is usually 2 bytes). Default the max module name size to
+// 4 bytes.
+#ifndef PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE
+#define PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE 4
+#endif  // PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE
+
+// Log filter IDs are optionally tokenized, and thus their backing on-device
+// container can have different sizes. A token may be represented by a 32-bit
+// integer (though it is usually 2 bytes). Default the max module name size to
+// 4 bytes.
+#ifndef PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE
+#define PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE 4
+#endif  // PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE
+
+namespace pw::log_rpc::cfg {
+inline constexpr size_t kMaxModuleNameBytes =
+    PW_LOG_RPC_CONFIG_MAX_FILTER_RULE_MODULE_NAME_SIZE;
+
+inline constexpr size_t kMaxFilterIdBytes =
+    PW_LOG_RPC_CONFIG_MAX_FILTER_ID_SIZE;
+}  // namespace pw::log_rpc::cfg
diff --git a/pw_log_rpc/public/pw_log_rpc/log_filter.h b/pw_log_rpc/public/pw_log_rpc/log_filter.h
new file mode 100644
index 0000000..56e01f7
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/log_filter.h
@@ -0,0 +1,88 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstring>
+#include <span>
+
+#include "pw_assert/assert.h"
+#include "pw_bytes/span.h"
+#include "pw_containers/vector.h"
+#include "pw_log/proto/log.pwpb.h"
+#include "pw_log_rpc/internal/config.h"
+#include "pw_status/status.h"
+
+namespace pw::log_rpc {
+
+// A Filter is a collection of rules used to check if a log entry can be kept
+// or dropped wherever the filter is placed in the log path.
+class Filter {
+ public:
+  struct Rule {
+    // Action to perform if the rule is met.
+    enum class Action {
+      kInactive = 0,  // Ignore this rule.
+      kKeep = 1,
+      kDrop = 2,
+    };
+    Action action = Action::kInactive;
+
+    // Checks if the log level is greater or equal to this value when it
+    // does not equal NOT_SET.
+    log::FilterRule::Level level_greater_than_or_equal =
+        log::FilterRule::Level::ANY_LEVEL;
+
+    // Checks if the log entry has any flag is set when it doesn't equal 0.
+    uint32_t any_flags_set = 0;
+
+    // Checks if the log entry module equals this value when not empty.
+    Vector<std::byte, cfg::kMaxModuleNameBytes> module_equals{};
+  };
+
+  Filter(std::span<const std::byte> id, std::span<Rule> rules) : rules_(rules) {
+    PW_ASSERT(!id.empty());
+    id_.assign(id.begin(), id.end());
+  }
+
+  // Not copyable.
+  Filter(const Filter&) = delete;
+  Filter& operator=(const Filter&) = delete;
+
+  ConstByteSpan id() const { return ConstByteSpan(id_.data(), id_.size()); }
+  std::span<const Rule> rules() const { return rules_; }
+
+  // Verifies a log entry against the filter's rules in the order they were
+  // provided, stopping at the first rule that matches.
+  // Returns true when the log should be dropped, false otherwise. Defaults to
+  // false if there are no rules, or no rules were matched.
+  bool ShouldDropLog(ConstByteSpan entry) const;
+
+  // Decodes and updates the filter's rules given a buffer with a proto-encoded
+  // log::Filter message. If there are more rules than this filter can hold, the
+  // extra rules are discarded.
+  //
+  // Return values:
+  // OK - rules were updated successfully.
+  // FAILED_PRECONDITION - the provided buffer is empty.
+  // Forwarded errors from protobuff::decoder.
+  Status UpdateRulesFromProto(ConstByteSpan buffer);
+
+ private:
+  Vector<std::byte, cfg::kMaxFilterIdBytes> id_;
+  std::span<Rule> rules_;
+};
+
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_filter_map.h b/pw_log_rpc/public/pw_log_rpc/log_filter_map.h
new file mode 100644
index 0000000..5a1be6e
--- /dev/null
+++ b/pw_log_rpc/public/pw_log_rpc/log_filter_map.h
@@ -0,0 +1,53 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#pragma once
+
+#include <cstring>
+#include <span>
+
+#include "pw_bytes/span.h"
+#include "pw_log_rpc/log_filter.h"
+#include "pw_result/result.h"
+
+namespace pw::log_rpc {
+
+// Holds an inmutable Filter map, ordered by filter ID to facilitate set up.
+class FilterMap {
+ public:
+  explicit constexpr FilterMap(std::span<Filter> filters) : filters_(filters) {}
+
+  // Not copyable nor movable.
+  FilterMap(FilterMap const&) = delete;
+  FilterMap& operator=(FilterMap const&) = delete;
+  FilterMap(FilterMap&&) = delete;
+  FilterMap& operator=(FilterMap&&) = delete;
+
+  Result<Filter*> GetFilterFromId(ConstByteSpan id) const {
+    for (auto& filter : filters_) {
+      if (id.size() == filter.id().size() &&
+          std::memcmp(id.data(), filter.id().data(), id.size()) == 0) {
+        return &filter;
+      }
+    }
+    return Status::NotFound();
+  }
+
+  const std::span<Filter>& filters() const { return filters_; }
+
+ protected:
+  const std::span<Filter> filters_;
+};
+
+}  // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/log_service.h b/pw_log_rpc/public/pw_log_rpc/log_service.h
index bd341d1..50555a7 100644
--- a/pw_log_rpc/public/pw_log_rpc/log_service.h
+++ b/pw_log_rpc/public/pw_log_rpc/log_service.h
@@ -15,7 +15,9 @@
 #pragma once
 
 #include "pw_log/proto/log.raw_rpc.pb.h"
+#include "pw_log_rpc/log_filter_map.h"
 #include "pw_log_rpc/rpc_log_drain_map.h"
+#include "pw_status/status.h"
 
 namespace pw::log_rpc {
 
@@ -24,7 +26,8 @@
 // and delegated outside the service.
 class LogService final : public log::generated::Logs<LogService> {
  public:
-  LogService(RpcLogDrainMap& drains) : drains_(drains) {}
+  LogService(RpcLogDrainMap& drains, FilterMap* filters = nullptr)
+      : drains_(drains), filters_(filters) {}
 
   // Starts listening to logs on the given RPC channel and writer. The call is
   // ignored if the channel was not pre-registered in the drain map. If there is
@@ -33,8 +36,24 @@
   // stream using the previous writer continues.
   void Listen(ServerContext&, ConstByteSpan, rpc::RawServerWriter& writer);
 
+  // TODO(pwbug/570): make log filter be its own service.
+  //  Modifies a log filter and its rules. The filter must be registered in the
+  //  provided filter map.
+  StatusWithSize SetFilter(ServerContext&, ConstByteSpan request, ByteSpan);
+
+  // Retrieves a log filter and its rules. The filter must be registered in the
+  // provided filter map.
+  StatusWithSize GetFilter(ServerContext&,
+                           ConstByteSpan request,
+                           ByteSpan response);
+
+  StatusWithSize ListFilterIds(ServerContext&,
+                               ConstByteSpan,
+                               ByteSpan response);
+
  private:
   RpcLogDrainMap& drains_;
+  FilterMap* filters_;
 };
 
 }  // namespace pw::log_rpc
diff --git a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
index 87071d7..eb59092 100644
--- a/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
+++ b/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h
@@ -20,6 +20,7 @@
 #include "pw_assert/assert.h"
 #include "pw_bytes/span.h"
 #include "pw_log/proto/log.pwpb.h"
+#include "pw_log_rpc/log_filter.h"
 #include "pw_multisink/multisink.h"
 #include "pw_protobuf/serialized_size.h"
 #include "pw_result/result.h"
@@ -37,11 +38,12 @@
 // Flush(), which, on every call, packs as many log::LogEntry items as possible
 // into a log::LogEntries message, writes the message to the provided writer,
 // then repeats the process until there are no more entries in the MultiSink or
-// the writer failed to write the outgoing package, in which case the RPC on
-// the writer is closed. When error_handling is `kIgnoreWriterErrors` the drain
-// will continue to retrieve log entries out of the MultiSink and attempt to
-// send them out ignoring the writer errors without sending a drop count.
-// Note: this behavior might change or be removed in the future.
+// the writer failed to write the outgoing package and error_handling is set to
+// `kCloseStreamOnWriterError`. When error_handling is `kIgnoreWriterErrors` the
+// drain will continue to retrieve log entries out of the MultiSink and attempt
+// to send them out ignoring the writer errors without sending a drop count.
+// Note: the error handling and drop count reporting might change in the future.
+// Log filtering is done using the rules of the Filter provided if any.
 class RpcLogDrain : public multisink::MultiSink::Drain {
  public:
   // Dictates how to handle server writer errors.
@@ -80,42 +82,23 @@
       protobuf::SizeOfFieldKey(1)  // LogEntry
       + protobuf::kMaxSizeOfLength;
 
-  // Creates a log stream with the provided open writer. Useful for streaming
-  // logs without a request.
-  // The provided buffer must be large enough to hold the largest transmittable
-  // log::LogEntry or a drop count message at the very least. The user can
-  // choose to provide a unique mutex for the drain, or share it to save RAM as
-  // long as they are aware of contengency issues.
-  RpcLogDrain(uint32_t channel_id,
-              ByteSpan log_entry_buffer,
-              rpc::RawServerWriter writer,
-              sync::Mutex& mutex,
-              LogDrainErrorHandling error_handling)
-      : channel_id_(channel_id),
-        error_handling_(error_handling),
-        server_writer_(std::move(writer)),
-        log_entry_buffer_(log_entry_buffer),
-        committed_entry_drop_count_(0),
-        mutex_(mutex) {
-    PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
-    PW_ASSERT(writer.active());
-  }
-
   // Creates a closed log stream with a writer that can be set at a later time.
   // The provided buffer must be large enough to hold the largest transmittable
   // log::LogEntry or a drop count message at the very least. The user can
   // choose to provide a unique mutex for the drain, or share it to save RAM as
   // long as they are aware of contengency issues.
-  RpcLogDrain(uint32_t channel_id,
+  RpcLogDrain(const uint32_t channel_id,
               ByteSpan log_entry_buffer,
               sync::Mutex& mutex,
-              LogDrainErrorHandling error_handling)
+              LogDrainErrorHandling error_handling,
+              Filter* filter = nullptr)
       : channel_id_(channel_id),
         error_handling_(error_handling),
         server_writer_(),
         log_entry_buffer_(log_entry_buffer),
         committed_entry_drop_count_(0),
-        mutex_(mutex) {
+        mutex_(mutex),
+        filter_(filter) {
     PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
   }
 
@@ -173,6 +156,7 @@
   const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
   uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_);
   sync::Mutex& mutex_;
+  Filter* filter_;
 };
 
 }  // namespace pw::log_rpc
diff --git a/pw_log_rpc/rpc_log_drain.cc b/pw_log_rpc/rpc_log_drain.cc
index fe8c025..2ba6330 100644
--- a/pw_log_rpc/rpc_log_drain.cc
+++ b/pw_log_rpc/rpc_log_drain.cc
@@ -56,6 +56,10 @@
     log::LogEntries::MemoryEncoder encoder(server_writer_.PayloadBuffer());
     uint32_t packed_entry_count = 0;
     log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
+    // Avoid sending empty packets.
+    if (encoder.size() == 0) {
+      continue;
+    }
     if (const Status status = server_writer_.Write(encoder); !status.ok()) {
       if (error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
         // Only update this drop count when writer errors are not ignored.
@@ -106,6 +110,14 @@
     // At this point all expected error modes have been handled.
     PW_CHECK_OK(possible_entry.status());
 
+    // TODO(pwbug/559): avoid sending multiple drop counts between filtered out
+    // log entries.
+    if (filter_ != nullptr &&
+        filter_->ShouldDropLog(possible_entry.value().entry())) {
+      PW_CHECK_OK(PopEntry(possible_entry.value()));
+      return LogDrainState::kMoreEntriesRemaining;
+    }
+
     // Check if the entry fits in encoder buffer.
     const size_t encoded_entry_size =
         possible_entry.value().entry().size() + kLogEntryEncodeFrameSize;
diff --git a/pw_log_rpc/rpc_log_drain_test.cc b/pw_log_rpc/rpc_log_drain_test.cc
index aec1617..3d8690d 100644
--- a/pw_log_rpc/rpc_log_drain_test.cc
+++ b/pw_log_rpc/rpc_log_drain_test.cc
@@ -19,6 +19,7 @@
 #include <span>
 
 #include "gtest/gtest.h"
+#include "pw_log_rpc/log_filter.h"
 #include "pw_log_rpc/log_service.h"
 #include "pw_log_rpc/rpc_log_drain_map.h"
 #include "pw_multisink/multisink.h"
@@ -42,7 +43,8 @@
       drain_id,
       buffer,
       mutex,
-      RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError);
+      RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+      nullptr);
   EXPECT_EQ(drain.channel_id(), drain_id);
 
   // Attach drain to a MultiSink.
@@ -62,20 +64,21 @@
   sync::Mutex mutex;
   std::array<std::array<std::byte, kBufferSize>, kMaxDrains> buffers;
   std::array<RpcLogDrain, kMaxDrains> drains{
-      RpcLogDrain(
-          0,
-          buffers[0],
-          mutex,
-          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
-      RpcLogDrain(
-          1,
-          buffers[1],
-          mutex,
-          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+      RpcLogDrain(0,
+                  buffers[0],
+                  mutex,
+                  RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+                  nullptr),
+      RpcLogDrain(1,
+                  buffers[1],
+                  mutex,
+                  RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+                  nullptr),
       RpcLogDrain(2,
                   buffers[2],
                   mutex,
-                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors),
+                  RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
+                  nullptr),
   };
 
   RpcLogDrainMap drain_map(drains);
@@ -96,14 +99,14 @@
   std::array<std::byte, kBufferSize> buffer;
   sync::Mutex mutex;
   std::array<RpcLogDrain, 1> drains{
-      RpcLogDrain(
-          drain_id,
-          buffer,
-          mutex,
-          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+      RpcLogDrain(drain_id,
+                  buffer,
+                  mutex,
+                  RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+                  nullptr),
   };
   RpcLogDrainMap drain_map(drains);
-  LogService log_service(drain_map);
+  LogService log_service(drain_map, nullptr);
 
   rpc::RawFakeChannelOutput<3, 128> output;
   rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
@@ -135,14 +138,14 @@
   std::array<std::byte, kBufferSize> buffer;
   sync::Mutex mutex;
   std::array<RpcLogDrain, 1> drains{
-      RpcLogDrain(
-          drain_id,
-          buffer,
-          mutex,
-          RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError),
+      RpcLogDrain(drain_id,
+                  buffer,
+                  mutex,
+                  RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
+                  nullptr),
   };
   RpcLogDrainMap drain_map(drains);
-  LogService log_service(drain_map);
+  LogService log_service(drain_map, nullptr);
 
   rpc::RawFakeChannelOutput<1, 128> output;
   rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));