pw_channel: Introduce forwarding channels

A forwarding channel pair can be used to connect two subsystems that
communicate with channels without implementing a custom channel.

Change-Id: I296ee5cd4217f4a35b5abd6cc3e14a353c501020
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/197353
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Taylor Cramer <cramertj@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed-service-accounts.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/docs/BUILD.gn b/docs/BUILD.gn
index 09de184..d1e7248 100644
--- a/docs/BUILD.gn
+++ b/docs/BUILD.gn
@@ -159,6 +159,7 @@
   "$dir_pw_bytes/public/pw_bytes/bit.h",
   "$dir_pw_bytes/public/pw_bytes/byte_builder.h",
   "$dir_pw_channel/public/pw_channel/channel.h",
+  "$dir_pw_channel/public/pw_channel/forwarding_channel.h",
   "$dir_pw_chre/public/pw_chre/chre.h",
   "$dir_pw_chre/public/pw_chre/host_link.h",
   "$dir_pw_chrono/public/pw_chrono/system_clock.h",
diff --git a/pw_channel/BUILD.bazel b/pw_channel/BUILD.bazel
index 0831ebe..20d09ab 100644
--- a/pw_channel/BUILD.bazel
+++ b/pw_channel/BUILD.bazel
@@ -51,3 +51,27 @@
         "//pw_unit_test",
     ],
 )
+
+cc_library(
+    name = "forwarding_channel",
+    srcs = ["forwarding_channel.cc"],
+    hdrs = ["public/pw_channel/forwarding_channel.h"],
+    includes = ["public"],
+    deps = [
+        ":pw_channel",
+        "//pw_multibuf:allocator",
+        "//pw_sync:mutex",
+    ],
+)
+
+pw_cc_test(
+    name = "forwarding_channel_test",
+    srcs = ["forwarding_channel_test.cc"],
+    deps = [
+        ":forwarding_channel",
+        "//pw_allocator:testing",
+        "//pw_multibuf:header_chunk_region_tracker",
+        "//pw_multibuf:simple_allocator",
+        "//pw_unit_test",
+    ],
+)
diff --git a/pw_channel/BUILD.gn b/pw_channel/BUILD.gn
index 3349f6e..6029897 100644
--- a/pw_channel/BUILD.gn
+++ b/pw_channel/BUILD.gn
@@ -42,12 +42,37 @@
   sources = [ "public/pw_channel/internal/channel_specializations.h" ]
 }
 
+pw_source_set("forwarding_channel") {
+  public_configs = [ ":public_include_path" ]
+  public = [ "public/pw_channel/forwarding_channel.h" ]
+  sources = [ "forwarding_channel.cc" ]
+  public_deps = [
+    ":pw_channel",
+    "$dir_pw_multibuf:allocator",
+    "$dir_pw_sync:mutex",
+  ]
+}
+
+pw_test("forwarding_channel_test") {
+  sources = [ "forwarding_channel_test.cc" ]
+  deps = [
+    ":forwarding_channel",
+    "$dir_pw_allocator:testing",
+    "$dir_pw_multibuf:header_chunk_region_tracker",
+    "$dir_pw_multibuf:simple_allocator",
+  ]
+  enable_if = pw_async2_DISPATCHER_BACKEND != ""
+}
+
 pw_doc_group("docs") {
   sources = [ "docs.rst" ]
 }
 
 pw_test_group("tests") {
-  tests = [ ":channel_test" ]
+  tests = [
+    ":channel_test",
+    ":forwarding_channel_test",
+  ]
 }
 
 pw_test("channel_test") {
diff --git a/pw_channel/docs.rst b/pw_channel/docs.rst
index ebc894d..7715cca 100644
--- a/pw_channel/docs.rst
+++ b/pw_channel/docs.rst
@@ -48,3 +48,9 @@
 .. doxygengroup:: pw_channel
    :content-only:
    :members:
+
+Channel implementations
+=======================
+.. doxygengroup:: pw_channel_forwarding
+   :content-only:
+   :members:
diff --git a/pw_channel/forwarding_channel.cc b/pw_channel/forwarding_channel.cc
new file mode 100644
index 0000000..01b94d8
--- /dev/null
+++ b/pw_channel/forwarding_channel.cc
@@ -0,0 +1,105 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_channel/forwarding_channel.h"
+
+namespace pw::channel::internal {
+
+async2::Poll<Result<multibuf::MultiBuf>>
+ForwardingChannel<DataType::kDatagram>::DoPollRead(async2::Context& cx)
+    PW_NO_LOCK_SAFETY_ANALYSIS {
+  std::lock_guard lock(pair_.mutex_);
+  if (!read_queue_.has_value()) {
+    read_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
+    return async2::Pending();
+  }
+  auto read_data = std::move(*read_queue_);
+  read_queue_.reset();
+  std::move(sibling_.write_waker_).Wake();
+  return read_data;
+}
+async2::Poll<> ForwardingChannel<DataType::kDatagram>::DoPollReadyToWrite(
+    async2::Context& cx) PW_NO_LOCK_SAFETY_ANALYSIS {
+  std::lock_guard lock(pair_.mutex_);
+  if (sibling_.read_queue_.has_value()) {
+    write_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
+    return async2::Pending();
+  }
+  return async2::Ready();
+}
+
+Result<channel::WriteToken> ForwardingChannel<DataType::kDatagram>::DoWrite(
+    multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
+  std::lock_guard lock(pair_.mutex_);
+  PW_DASSERT(!sibling_.read_queue_.has_value());
+  sibling_.read_queue_ = std::move(data);
+  const uint32_t token = ++write_token_;
+  std::move(sibling_.read_waker_).Wake();
+  return CreateWriteToken(token);
+}
+
+async2::Poll<Result<channel::WriteToken>>
+ForwardingChannel<DataType::kDatagram>::DoPollFlush(async2::Context&) {
+  std::lock_guard lock(pair_.mutex_);
+  return async2::Ready(CreateWriteToken(write_token_));
+}
+
+async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPollClose(
+    async2::Context&) {
+  std::lock_guard lock(pair_.mutex_);
+  read_queue_.reset();
+  std::move(read_waker_).Wake();
+  return OkStatus();
+}
+
+async2::Poll<Result<multibuf::MultiBuf>>
+ForwardingChannel<DataType::kByte>::DoPollRead(async2::Context& cx) {
+  std::lock_guard lock(pair_.mutex_);
+  if (read_queue_.empty()) {
+    read_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
+    return async2::Pending();
+  }
+
+  auto read_data = std::move(read_queue_);
+  read_queue_ = {};
+  return read_data;
+}
+
+Result<channel::WriteToken> ForwardingChannel<DataType::kByte>::DoWrite(
+    multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
+  std::lock_guard lock(pair_.mutex_);
+  if (data.empty()) {
+    return CreateWriteToken(write_token_);  // no data, nothing to do
+  }
+
+  write_token_ += data.size();
+  sibling_.read_queue_.PushSuffix(std::move(data));
+  return CreateWriteToken(write_token_);
+}
+
+async2::Poll<Result<channel::WriteToken>>
+ForwardingChannel<DataType::kByte>::DoPollFlush(async2::Context&) {
+  std::lock_guard lock(pair_.mutex_);
+  return async2::Ready(CreateWriteToken(write_token_));
+}
+
+async2::Poll<Status> ForwardingChannel<DataType::kByte>::DoPollClose(
+    async2::Context&) {
+  std::lock_guard lock(pair_.mutex_);
+  read_queue_.Release();
+  std::move(read_waker_).Wake();
+  return OkStatus();
+}
+
+}  // namespace pw::channel::internal
diff --git a/pw_channel/forwarding_channel_test.cc b/pw_channel/forwarding_channel_test.cc
new file mode 100644
index 0000000..1298a1b
--- /dev/null
+++ b/pw_channel/forwarding_channel_test.cc
@@ -0,0 +1,323 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_channel/forwarding_channel.h"
+
+#include <algorithm>
+#include <array>
+
+#include "pw_allocator/testing.h"
+#include "pw_multibuf/header_chunk_region_tracker.h"
+#include "pw_multibuf/simple_allocator.h"
+#include "pw_string/string.h"
+#include "pw_unit_test/framework.h"
+
+namespace {
+
+// Creates and initializes a MultiBuf to the specified value.
+class InitializedMultiBuf {
+ public:
+  InitializedMultiBuf(std::string_view contents) {
+    std::optional<pw::multibuf::OwnedChunk> chunk =
+        pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
+            allocator_, contents.size());
+    std::memcpy(chunk.value().data(), contents.data(), contents.size());
+    buf_.PushFrontChunk(std::move(*chunk));
+  }
+
+  pw::multibuf::MultiBuf Take() { return std::move(buf_); }
+
+ private:
+  pw::allocator::test::AllocatorForTest<2048> allocator_;
+  pw::multibuf::MultiBuf buf_;
+};
+
+pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
+  pw::InlineString<128> contents(mb.size(), '\0');
+  std::copy(
+      mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
+  return contents;
+}
+
+template <pw::channel::DataType kType,
+          size_t kDataSize = 128,
+          size_t kMetaSize = 128>
+class TestChannelPair {
+ public:
+  TestChannelPair()
+      : simple_allocator_(data_area_, meta_alloc_), pair_(simple_allocator_) {}
+
+  pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
+
+ private:
+  std::array<std::byte, kDataSize> data_area_;
+  pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
+  pw::multibuf::SimpleAllocator simple_allocator_;
+
+  pw::channel::ForwardingChannelPair<kType> pair_;
+};
+
+// TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
+//     wakers are stored / woken properly by ForwardingChannel.
+TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
+  pw::async2::Dispatcher dispatcher;
+
+  class : public pw::async2::Task {
+   public:
+    TestChannelPair<pw::channel::DataType::kDatagram> pair;
+
+    int test_completed = 0;
+
+   private:
+    pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+      // No data yet
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      // Send datagram first->second
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      auto result = pair->first().Write({});  // Write empty datagram
+      EXPECT_EQ(pw::OkStatus(), result.status());
+
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+      auto empty_chunk_result = pair->second().PollRead(cx);
+      EXPECT_TRUE(empty_chunk_result.IsReady());
+      EXPECT_TRUE(empty_chunk_result->ok());
+      EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      // Send datagram second->first
+      EXPECT_EQ(pw::async2::Ready(), pair->second().PollReadyToWrite(cx));
+      result = pair->second().Write({});  // Write empty datagram
+      EXPECT_EQ(pw::OkStatus(), result.status());
+
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      empty_chunk_result = pair->first().PollRead(cx);
+      EXPECT_TRUE(empty_chunk_result.IsReady());
+      EXPECT_TRUE(empty_chunk_result->ok());
+      EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+      test_completed += 1;
+      return pw::async2::Ready();
+    }
+  } test_task;
+
+  dispatcher.Post(test_task);
+
+  EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+  EXPECT_EQ(test_task.test_completed, 1);
+}
+
+TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
+  pw::async2::Dispatcher dispatcher;
+
+  class : public pw::async2::Task {
+   public:
+    TestChannelPair<pw::channel::DataType::kDatagram> pair;
+
+    int test_completed = 0;
+
+   private:
+    pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+      InitializedMultiBuf b1("Hello");
+      InitializedMultiBuf b2("world!");
+
+      // Send datagram first->second
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
+
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollReadyToWrite(cx));
+
+      EXPECT_EQ(CopyToString(pair->second().PollRead(cx).value().value()),
+                "Hello");
+
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
+      EXPECT_EQ(CopyToString(pair->second().PollRead(cx).value().value()),
+                "world!");
+
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+
+      test_completed += 1;
+      return pw::async2::Ready();
+    }
+  } test_task;
+
+  dispatcher.Post(test_task);
+
+  EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+  EXPECT_EQ(test_task.test_completed, 1);
+}
+
+TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
+  pw::async2::Dispatcher dispatcher;
+
+  class : public pw::async2::Task {
+   public:
+    TestChannelPair<pw::channel::DataType::kDatagram> pair;
+
+    int test_completed = 0;
+
+   private:
+    pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+      // No data yet
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      // Send datagram first->second
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      auto result = pair->first().Write({});  // Write empty datagram
+      EXPECT_EQ(pw::OkStatus(), result.status());
+
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+      auto empty_chunk_result = pair->second().PollRead(cx);
+      EXPECT_TRUE(empty_chunk_result.IsReady());
+      EXPECT_TRUE(empty_chunk_result->ok());
+      EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      // Send datagram second->first
+      EXPECT_EQ(pw::async2::Ready(), pair->second().PollReadyToWrite(cx));
+      result = pair->second().Write({});  // Write empty datagram
+      EXPECT_EQ(pw::OkStatus(), result.status());
+
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      empty_chunk_result = pair->first().PollRead(cx);
+      EXPECT_TRUE(empty_chunk_result.IsReady());
+      EXPECT_TRUE(empty_chunk_result->ok());
+      EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+      test_completed += 1;
+      return pw::async2::Ready();
+    }
+  } test_task;
+
+  dispatcher.Post(test_task);
+
+  EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+  EXPECT_EQ(test_task.test_completed, 1);
+}
+TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
+  pw::async2::Dispatcher dispatcher;
+
+  class : public pw::async2::Task {
+   public:
+    TestChannelPair<pw::channel::DataType::kByte> pair;
+
+    int test_completed = 0;
+
+   private:
+    pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+      // No data yet
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      // Send nothing first->second
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
+
+      // Still no data
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      // Send nothing second->first
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
+
+      // Still no data
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      test_completed += 1;
+      return pw::async2::Ready();
+    }
+  } test_task;
+
+  dispatcher.Post(test_task);
+
+  EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+  EXPECT_EQ(test_task.test_completed, 1);
+}
+
+TEST(ForwardingByteChannel, WriteData) {
+  pw::async2::Dispatcher dispatcher;
+
+  class : public pw::async2::Task {
+   public:
+    TestChannelPair<pw::channel::DataType::kByte> pair;
+
+    int test_completed = 0;
+
+   private:
+    pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+      // No data yet
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      InitializedMultiBuf b1("hello");
+      InitializedMultiBuf b2(" ");
+      InitializedMultiBuf b3("world");
+
+      // Send "hello world" first->second
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write(b3.Take()).status());
+
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+      auto hello_world_result = pair->second().PollRead(cx);
+      EXPECT_TRUE(hello_world_result.IsReady());
+
+      EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
+
+      // Send nothing second->first
+      EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+      EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
+
+      // Still no data
+      EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+      EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+      test_completed += 1;
+      return pw::async2::Ready();
+    }
+  } test_task;
+
+  dispatcher.Post(test_task);
+
+  EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+  EXPECT_EQ(test_task.test_completed, 1);
+}
+
+}  // namespace
diff --git a/pw_channel/public/pw_channel/forwarding_channel.h b/pw_channel/public/pw_channel/forwarding_channel.h
new file mode 100644
index 0000000..44a5bea
--- /dev/null
+++ b/pw_channel/public/pw_channel/forwarding_channel.h
@@ -0,0 +1,190 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <cstdint>
+#include <mutex>
+#include <optional>
+
+#include "pw_async2/dispatcher.h"
+#include "pw_async2/poll.h"
+#include "pw_channel/channel.h"
+#include "pw_sync/lock_annotations.h"
+#include "pw_sync/mutex.h"
+
+namespace pw::channel {
+namespace internal {
+
+// Internal Channel implementation for use with ForwardingChannelPair. It is
+// specialized for kDatagram and kByte.
+template <DataType kType>
+class ForwardingChannel;
+
+}  // namespace internal
+
+/// @defgroup pw_channel_forwarding
+/// @{
+
+/// Forwards datagrams between two channels. Writes to the first channel appear
+/// as reads on the second, and vice versa.
+///
+/// `ForwardingChannelPair` enables connecting two subsystems that
+/// communicate with datagram channels without implementing a custom channel.
+template <DataType kType>
+class ForwardingChannelPair {
+ public:
+  explicit constexpr ForwardingChannelPair(
+      multibuf::MultiBufAllocator& allocator);
+
+  ForwardingChannelPair(const ForwardingChannelPair&) = delete;
+  ForwardingChannelPair& operator=(const ForwardingChannelPair&) = delete;
+
+  ForwardingChannelPair(ForwardingChannelPair&&) = delete;
+  ForwardingChannelPair& operator=(ForwardingChannelPair&&) = delete;
+
+  /// Returns the first channel in the pair.
+  Channel<kType, kReliable, kReadable, kWritable>& first() { return first_; }
+
+  /// Returns a const reference to the first channel in the pair.
+  const Channel<kType, kReliable, kReadable, kWritable>& first() const {
+    return first_;
+  }
+
+  /// Returns the second channel in the pair.
+  Channel<kType, kReliable, kReadable, kWritable>& second() { return second_; }
+
+  /// Returns a const reference to the second channel in the pair.
+  const Channel<kType, kReliable, kReadable, kWritable>& second() const {
+    return second_;
+  }
+
+ private:
+  template <DataType>
+  friend class internal::ForwardingChannel;
+
+  sync::Mutex mutex_;
+  multibuf::MultiBufAllocator& allocator_;
+
+  // These channels refer to each other, so their lifetimes must match.
+  internal::ForwardingChannel<kType> first_;
+  internal::ForwardingChannel<kType> second_;
+};
+
+/// Alias for a pair of forwarding datagram channels.
+using ForwardingDatagramChannelPair =
+    ForwardingChannelPair<DataType::kDatagram>;
+
+/// Alias for a pair of forwarding byte channels.
+using ForwardingByteChannelPair = ForwardingChannelPair<DataType::kByte>;
+
+/// @}
+
+namespace internal {
+
+template <>
+class ForwardingChannel<DataType::kDatagram>
+    : public ReliableDatagramReaderWriter {
+ public:
+  ForwardingChannel(const ForwardingChannel&) = delete;
+  ForwardingChannel& operator=(const ForwardingChannel&) = delete;
+
+  ForwardingChannel(ForwardingChannel&&) = delete;
+  ForwardingChannel& operator=(ForwardingChannel&&) = delete;
+
+ private:
+  friend class ForwardingChannelPair<DataType::kDatagram>;
+
+  constexpr ForwardingChannel(ForwardingChannelPair<DataType::kDatagram>& pair,
+                              ForwardingChannel* sibling)
+      : pair_(pair), sibling_(*sibling), write_token_(0) {}
+
+  async2::Poll<Result<multibuf::MultiBuf>> DoPollRead(
+      async2::Context& cx) override;
+
+  async2::Poll<> DoPollReadyToWrite(async2::Context& cx) override;
+
+  multibuf::MultiBufAllocator& DoGetWriteAllocator() override {
+    return pair_.allocator_;
+  }
+
+  Result<channel::WriteToken> DoWrite(multibuf::MultiBuf&& data) override;
+
+  async2::Poll<Result<channel::WriteToken>> DoPollFlush(
+      async2::Context&) override;
+
+  async2::Poll<Status> DoPollClose(async2::Context&) override;
+
+  // The two channels share one mutex. Lock safty analysis doesn't understand
+  // that, so has to be disabled for some functions.
+  ForwardingChannelPair<DataType::kDatagram>& pair_;
+  ForwardingChannel& sibling_;
+
+  // Could use a queue here.
+  std::optional<multibuf::MultiBuf> read_queue_ PW_GUARDED_BY(pair_.mutex_);
+  uint32_t write_token_ PW_GUARDED_BY(pair_.mutex_);
+  async2::Waker read_waker_ PW_GUARDED_BY(pair_.mutex_);
+  async2::Waker write_waker_ PW_GUARDED_BY(pair_.mutex_);
+};
+
+template <>
+class ForwardingChannel<DataType::kByte> : public ByteReaderWriter {
+ public:
+  ForwardingChannel(const ForwardingChannel&) = delete;
+  ForwardingChannel& operator=(const ForwardingChannel&) = delete;
+
+  ForwardingChannel(ForwardingChannel&&) = delete;
+  ForwardingChannel& operator=(ForwardingChannel&&) = delete;
+
+ private:
+  friend class ForwardingChannelPair<DataType::kByte>;
+
+  constexpr ForwardingChannel(ForwardingChannelPair<DataType::kByte>& pair,
+                              ForwardingChannel* sibling)
+      : pair_(pair), sibling_(*sibling), write_token_(0) {}
+
+  async2::Poll<Result<multibuf::MultiBuf>> DoPollRead(
+      async2::Context& cx) override;
+
+  async2::Poll<> DoPollReadyToWrite(async2::Context&) override {
+    return async2::Ready();
+  }
+
+  multibuf::MultiBufAllocator& DoGetWriteAllocator() override {
+    return pair_.allocator_;
+  }
+
+  Result<channel::WriteToken> DoWrite(multibuf::MultiBuf&& data) override;
+
+  async2::Poll<Result<channel::WriteToken>> DoPollFlush(
+      async2::Context&) override;
+
+  async2::Poll<Status> DoPollClose(async2::Context&) override;
+
+  ForwardingChannelPair<DataType::kByte>& pair_;
+  ForwardingChannel& sibling_;
+
+  multibuf::MultiBuf read_queue_ PW_GUARDED_BY(pair_.mutex_);
+  uint32_t write_token_ PW_GUARDED_BY(pair_.mutex_);
+  async2::Waker read_waker_ PW_GUARDED_BY(pair_.mutex_);
+};
+
+}  // namespace internal
+
+// Define the constructor out-of-line, after ForwardingChannel is defined.
+template <DataType kType>
+constexpr ForwardingChannelPair<kType>::ForwardingChannelPair(
+    multibuf::MultiBufAllocator& allocator)
+    : allocator_(allocator), first_(*this, &second_), second_(*this, &first_) {}
+
+}  // namespace pw::channel