pw_channel: Introduce forwarding channels
A forwarding channel pair can be used to connect two subsystems that
communicate with channels without implementing a custom channel.
Change-Id: I296ee5cd4217f4a35b5abd6cc3e14a353c501020
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/197353
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Taylor Cramer <cramertj@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed-service-accounts.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/docs/BUILD.gn b/docs/BUILD.gn
index 09de184..d1e7248 100644
--- a/docs/BUILD.gn
+++ b/docs/BUILD.gn
@@ -159,6 +159,7 @@
"$dir_pw_bytes/public/pw_bytes/bit.h",
"$dir_pw_bytes/public/pw_bytes/byte_builder.h",
"$dir_pw_channel/public/pw_channel/channel.h",
+ "$dir_pw_channel/public/pw_channel/forwarding_channel.h",
"$dir_pw_chre/public/pw_chre/chre.h",
"$dir_pw_chre/public/pw_chre/host_link.h",
"$dir_pw_chrono/public/pw_chrono/system_clock.h",
diff --git a/pw_channel/BUILD.bazel b/pw_channel/BUILD.bazel
index 0831ebe..20d09ab 100644
--- a/pw_channel/BUILD.bazel
+++ b/pw_channel/BUILD.bazel
@@ -51,3 +51,27 @@
"//pw_unit_test",
],
)
+
+cc_library(
+ name = "forwarding_channel",
+ srcs = ["forwarding_channel.cc"],
+ hdrs = ["public/pw_channel/forwarding_channel.h"],
+ includes = ["public"],
+ deps = [
+ ":pw_channel",
+ "//pw_multibuf:allocator",
+ "//pw_sync:mutex",
+ ],
+)
+
+pw_cc_test(
+ name = "forwarding_channel_test",
+ srcs = ["forwarding_channel_test.cc"],
+ deps = [
+ ":forwarding_channel",
+ "//pw_allocator:testing",
+ "//pw_multibuf:header_chunk_region_tracker",
+ "//pw_multibuf:simple_allocator",
+ "//pw_unit_test",
+ ],
+)
diff --git a/pw_channel/BUILD.gn b/pw_channel/BUILD.gn
index 3349f6e..6029897 100644
--- a/pw_channel/BUILD.gn
+++ b/pw_channel/BUILD.gn
@@ -42,12 +42,37 @@
sources = [ "public/pw_channel/internal/channel_specializations.h" ]
}
+pw_source_set("forwarding_channel") {
+ public_configs = [ ":public_include_path" ]
+ public = [ "public/pw_channel/forwarding_channel.h" ]
+ sources = [ "forwarding_channel.cc" ]
+ public_deps = [
+ ":pw_channel",
+ "$dir_pw_multibuf:allocator",
+ "$dir_pw_sync:mutex",
+ ]
+}
+
+pw_test("forwarding_channel_test") {
+ sources = [ "forwarding_channel_test.cc" ]
+ deps = [
+ ":forwarding_channel",
+ "$dir_pw_allocator:testing",
+ "$dir_pw_multibuf:header_chunk_region_tracker",
+ "$dir_pw_multibuf:simple_allocator",
+ ]
+ enable_if = pw_async2_DISPATCHER_BACKEND != ""
+}
+
pw_doc_group("docs") {
sources = [ "docs.rst" ]
}
pw_test_group("tests") {
- tests = [ ":channel_test" ]
+ tests = [
+ ":channel_test",
+ ":forwarding_channel_test",
+ ]
}
pw_test("channel_test") {
diff --git a/pw_channel/docs.rst b/pw_channel/docs.rst
index ebc894d..7715cca 100644
--- a/pw_channel/docs.rst
+++ b/pw_channel/docs.rst
@@ -48,3 +48,9 @@
.. doxygengroup:: pw_channel
:content-only:
:members:
+
+Channel implementations
+=======================
+.. doxygengroup:: pw_channel_forwarding
+ :content-only:
+ :members:
diff --git a/pw_channel/forwarding_channel.cc b/pw_channel/forwarding_channel.cc
new file mode 100644
index 0000000..01b94d8
--- /dev/null
+++ b/pw_channel/forwarding_channel.cc
@@ -0,0 +1,105 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_channel/forwarding_channel.h"
+
+namespace pw::channel::internal {
+
+async2::Poll<Result<multibuf::MultiBuf>>
+ForwardingChannel<DataType::kDatagram>::DoPollRead(async2::Context& cx)
+ PW_NO_LOCK_SAFETY_ANALYSIS {
+ std::lock_guard lock(pair_.mutex_);
+ if (!read_queue_.has_value()) {
+ read_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
+ return async2::Pending();
+ }
+ auto read_data = std::move(*read_queue_);
+ read_queue_.reset();
+ std::move(sibling_.write_waker_).Wake();
+ return read_data;
+}
+async2::Poll<> ForwardingChannel<DataType::kDatagram>::DoPollReadyToWrite(
+ async2::Context& cx) PW_NO_LOCK_SAFETY_ANALYSIS {
+ std::lock_guard lock(pair_.mutex_);
+ if (sibling_.read_queue_.has_value()) {
+ write_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
+ return async2::Pending();
+ }
+ return async2::Ready();
+}
+
+Result<channel::WriteToken> ForwardingChannel<DataType::kDatagram>::DoWrite(
+ multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
+ std::lock_guard lock(pair_.mutex_);
+ PW_DASSERT(!sibling_.read_queue_.has_value());
+ sibling_.read_queue_ = std::move(data);
+ const uint32_t token = ++write_token_;
+ std::move(sibling_.read_waker_).Wake();
+ return CreateWriteToken(token);
+}
+
+async2::Poll<Result<channel::WriteToken>>
+ForwardingChannel<DataType::kDatagram>::DoPollFlush(async2::Context&) {
+ std::lock_guard lock(pair_.mutex_);
+ return async2::Ready(CreateWriteToken(write_token_));
+}
+
+async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPollClose(
+ async2::Context&) {
+ std::lock_guard lock(pair_.mutex_);
+ read_queue_.reset();
+ std::move(read_waker_).Wake();
+ return OkStatus();
+}
+
+async2::Poll<Result<multibuf::MultiBuf>>
+ForwardingChannel<DataType::kByte>::DoPollRead(async2::Context& cx) {
+ std::lock_guard lock(pair_.mutex_);
+ if (read_queue_.empty()) {
+ read_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
+ return async2::Pending();
+ }
+
+ auto read_data = std::move(read_queue_);
+ read_queue_ = {};
+ return read_data;
+}
+
+Result<channel::WriteToken> ForwardingChannel<DataType::kByte>::DoWrite(
+ multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
+ std::lock_guard lock(pair_.mutex_);
+ if (data.empty()) {
+ return CreateWriteToken(write_token_); // no data, nothing to do
+ }
+
+ write_token_ += data.size();
+ sibling_.read_queue_.PushSuffix(std::move(data));
+ return CreateWriteToken(write_token_);
+}
+
+async2::Poll<Result<channel::WriteToken>>
+ForwardingChannel<DataType::kByte>::DoPollFlush(async2::Context&) {
+ std::lock_guard lock(pair_.mutex_);
+ return async2::Ready(CreateWriteToken(write_token_));
+}
+
+async2::Poll<Status> ForwardingChannel<DataType::kByte>::DoPollClose(
+ async2::Context&) {
+ std::lock_guard lock(pair_.mutex_);
+ read_queue_.Release();
+ std::move(read_waker_).Wake();
+ return OkStatus();
+}
+
+} // namespace pw::channel::internal
diff --git a/pw_channel/forwarding_channel_test.cc b/pw_channel/forwarding_channel_test.cc
new file mode 100644
index 0000000..1298a1b
--- /dev/null
+++ b/pw_channel/forwarding_channel_test.cc
@@ -0,0 +1,323 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_channel/forwarding_channel.h"
+
+#include <algorithm>
+#include <array>
+
+#include "pw_allocator/testing.h"
+#include "pw_multibuf/header_chunk_region_tracker.h"
+#include "pw_multibuf/simple_allocator.h"
+#include "pw_string/string.h"
+#include "pw_unit_test/framework.h"
+
+namespace {
+
+// Creates and initializes a MultiBuf to the specified value.
+class InitializedMultiBuf {
+ public:
+ InitializedMultiBuf(std::string_view contents) {
+ std::optional<pw::multibuf::OwnedChunk> chunk =
+ pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
+ allocator_, contents.size());
+ std::memcpy(chunk.value().data(), contents.data(), contents.size());
+ buf_.PushFrontChunk(std::move(*chunk));
+ }
+
+ pw::multibuf::MultiBuf Take() { return std::move(buf_); }
+
+ private:
+ pw::allocator::test::AllocatorForTest<2048> allocator_;
+ pw::multibuf::MultiBuf buf_;
+};
+
+pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
+ pw::InlineString<128> contents(mb.size(), '\0');
+ std::copy(
+ mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
+ return contents;
+}
+
+template <pw::channel::DataType kType,
+ size_t kDataSize = 128,
+ size_t kMetaSize = 128>
+class TestChannelPair {
+ public:
+ TestChannelPair()
+ : simple_allocator_(data_area_, meta_alloc_), pair_(simple_allocator_) {}
+
+ pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
+
+ private:
+ std::array<std::byte, kDataSize> data_area_;
+ pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
+ pw::multibuf::SimpleAllocator simple_allocator_;
+
+ pw::channel::ForwardingChannelPair<kType> pair_;
+};
+
+// TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
+// wakers are stored / woken properly by ForwardingChannel.
+TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
+ pw::async2::Dispatcher dispatcher;
+
+ class : public pw::async2::Task {
+ public:
+ TestChannelPair<pw::channel::DataType::kDatagram> pair;
+
+ int test_completed = 0;
+
+ private:
+ pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+ // No data yet
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ // Send datagram first->second
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ auto result = pair->first().Write({}); // Write empty datagram
+ EXPECT_EQ(pw::OkStatus(), result.status());
+
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+ auto empty_chunk_result = pair->second().PollRead(cx);
+ EXPECT_TRUE(empty_chunk_result.IsReady());
+ EXPECT_TRUE(empty_chunk_result->ok());
+ EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ // Send datagram second->first
+ EXPECT_EQ(pw::async2::Ready(), pair->second().PollReadyToWrite(cx));
+ result = pair->second().Write({}); // Write empty datagram
+ EXPECT_EQ(pw::OkStatus(), result.status());
+
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ empty_chunk_result = pair->first().PollRead(cx);
+ EXPECT_TRUE(empty_chunk_result.IsReady());
+ EXPECT_TRUE(empty_chunk_result->ok());
+ EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+ test_completed += 1;
+ return pw::async2::Ready();
+ }
+ } test_task;
+
+ dispatcher.Post(test_task);
+
+ EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+ EXPECT_EQ(test_task.test_completed, 1);
+}
+
+TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
+ pw::async2::Dispatcher dispatcher;
+
+ class : public pw::async2::Task {
+ public:
+ TestChannelPair<pw::channel::DataType::kDatagram> pair;
+
+ int test_completed = 0;
+
+ private:
+ pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+ InitializedMultiBuf b1("Hello");
+ InitializedMultiBuf b2("world!");
+
+ // Send datagram first->second
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
+
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollReadyToWrite(cx));
+
+ EXPECT_EQ(CopyToString(pair->second().PollRead(cx).value().value()),
+ "Hello");
+
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
+ EXPECT_EQ(CopyToString(pair->second().PollRead(cx).value().value()),
+ "world!");
+
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+
+ test_completed += 1;
+ return pw::async2::Ready();
+ }
+ } test_task;
+
+ dispatcher.Post(test_task);
+
+ EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+ EXPECT_EQ(test_task.test_completed, 1);
+}
+
+TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
+ pw::async2::Dispatcher dispatcher;
+
+ class : public pw::async2::Task {
+ public:
+ TestChannelPair<pw::channel::DataType::kDatagram> pair;
+
+ int test_completed = 0;
+
+ private:
+ pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+ // No data yet
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ // Send datagram first->second
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ auto result = pair->first().Write({}); // Write empty datagram
+ EXPECT_EQ(pw::OkStatus(), result.status());
+
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+ auto empty_chunk_result = pair->second().PollRead(cx);
+ EXPECT_TRUE(empty_chunk_result.IsReady());
+ EXPECT_TRUE(empty_chunk_result->ok());
+ EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ // Send datagram second->first
+ EXPECT_EQ(pw::async2::Ready(), pair->second().PollReadyToWrite(cx));
+ result = pair->second().Write({}); // Write empty datagram
+ EXPECT_EQ(pw::OkStatus(), result.status());
+
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ empty_chunk_result = pair->first().PollRead(cx);
+ EXPECT_TRUE(empty_chunk_result.IsReady());
+ EXPECT_TRUE(empty_chunk_result->ok());
+ EXPECT_EQ((*empty_chunk_result)->size(), 0u);
+
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+ test_completed += 1;
+ return pw::async2::Ready();
+ }
+ } test_task;
+
+ dispatcher.Post(test_task);
+
+ EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+ EXPECT_EQ(test_task.test_completed, 1);
+}
+TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
+ pw::async2::Dispatcher dispatcher;
+
+ class : public pw::async2::Task {
+ public:
+ TestChannelPair<pw::channel::DataType::kByte> pair;
+
+ int test_completed = 0;
+
+ private:
+ pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+ // No data yet
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ // Send nothing first->second
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
+
+ // Still no data
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ // Send nothing second->first
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
+
+ // Still no data
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ test_completed += 1;
+ return pw::async2::Ready();
+ }
+ } test_task;
+
+ dispatcher.Post(test_task);
+
+ EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+ EXPECT_EQ(test_task.test_completed, 1);
+}
+
+TEST(ForwardingByteChannel, WriteData) {
+ pw::async2::Dispatcher dispatcher;
+
+ class : public pw::async2::Task {
+ public:
+ TestChannelPair<pw::channel::DataType::kByte> pair;
+
+ int test_completed = 0;
+
+ private:
+ pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
+ // No data yet
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ InitializedMultiBuf b1("hello");
+ InitializedMultiBuf b2(" ");
+ InitializedMultiBuf b3("world");
+
+ // Send "hello world" first->second
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write(b3.Take()).status());
+
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+
+ auto hello_world_result = pair->second().PollRead(cx);
+ EXPECT_TRUE(hello_world_result.IsReady());
+
+ EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
+
+ // Send nothing second->first
+ EXPECT_EQ(pw::async2::Ready(), pair->first().PollReadyToWrite(cx));
+ EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
+
+ // Still no data
+ EXPECT_EQ(pw::async2::Pending(), pair->first().PollRead(cx));
+ EXPECT_EQ(pw::async2::Pending(), pair->second().PollRead(cx));
+
+ test_completed += 1;
+ return pw::async2::Ready();
+ }
+ } test_task;
+
+ dispatcher.Post(test_task);
+
+ EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
+ EXPECT_EQ(test_task.test_completed, 1);
+}
+
+} // namespace
diff --git a/pw_channel/public/pw_channel/forwarding_channel.h b/pw_channel/public/pw_channel/forwarding_channel.h
new file mode 100644
index 0000000..44a5bea
--- /dev/null
+++ b/pw_channel/public/pw_channel/forwarding_channel.h
@@ -0,0 +1,190 @@
+// Copyright 2024 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <cstdint>
+#include <mutex>
+#include <optional>
+
+#include "pw_async2/dispatcher.h"
+#include "pw_async2/poll.h"
+#include "pw_channel/channel.h"
+#include "pw_sync/lock_annotations.h"
+#include "pw_sync/mutex.h"
+
+namespace pw::channel {
+namespace internal {
+
+// Internal Channel implementation for use with ForwardingChannelPair. It is
+// specialized for kDatagram and kByte.
+template <DataType kType>
+class ForwardingChannel;
+
+} // namespace internal
+
+/// @defgroup pw_channel_forwarding
+/// @{
+
+/// Forwards datagrams between two channels. Writes to the first channel appear
+/// as reads on the second, and vice versa.
+///
+/// `ForwardingChannelPair` enables connecting two subsystems that
+/// communicate with datagram channels without implementing a custom channel.
+template <DataType kType>
+class ForwardingChannelPair {
+ public:
+ explicit constexpr ForwardingChannelPair(
+ multibuf::MultiBufAllocator& allocator);
+
+ ForwardingChannelPair(const ForwardingChannelPair&) = delete;
+ ForwardingChannelPair& operator=(const ForwardingChannelPair&) = delete;
+
+ ForwardingChannelPair(ForwardingChannelPair&&) = delete;
+ ForwardingChannelPair& operator=(ForwardingChannelPair&&) = delete;
+
+ /// Returns the first channel in the pair.
+ Channel<kType, kReliable, kReadable, kWritable>& first() { return first_; }
+
+ /// Returns a const reference to the first channel in the pair.
+ const Channel<kType, kReliable, kReadable, kWritable>& first() const {
+ return first_;
+ }
+
+ /// Returns the second channel in the pair.
+ Channel<kType, kReliable, kReadable, kWritable>& second() { return second_; }
+
+ /// Returns a const reference to the second channel in the pair.
+ const Channel<kType, kReliable, kReadable, kWritable>& second() const {
+ return second_;
+ }
+
+ private:
+ template <DataType>
+ friend class internal::ForwardingChannel;
+
+ sync::Mutex mutex_;
+ multibuf::MultiBufAllocator& allocator_;
+
+ // These channels refer to each other, so their lifetimes must match.
+ internal::ForwardingChannel<kType> first_;
+ internal::ForwardingChannel<kType> second_;
+};
+
+/// Alias for a pair of forwarding datagram channels.
+using ForwardingDatagramChannelPair =
+ ForwardingChannelPair<DataType::kDatagram>;
+
+/// Alias for a pair of forwarding byte channels.
+using ForwardingByteChannelPair = ForwardingChannelPair<DataType::kByte>;
+
+/// @}
+
+namespace internal {
+
+template <>
+class ForwardingChannel<DataType::kDatagram>
+ : public ReliableDatagramReaderWriter {
+ public:
+ ForwardingChannel(const ForwardingChannel&) = delete;
+ ForwardingChannel& operator=(const ForwardingChannel&) = delete;
+
+ ForwardingChannel(ForwardingChannel&&) = delete;
+ ForwardingChannel& operator=(ForwardingChannel&&) = delete;
+
+ private:
+ friend class ForwardingChannelPair<DataType::kDatagram>;
+
+ constexpr ForwardingChannel(ForwardingChannelPair<DataType::kDatagram>& pair,
+ ForwardingChannel* sibling)
+ : pair_(pair), sibling_(*sibling), write_token_(0) {}
+
+ async2::Poll<Result<multibuf::MultiBuf>> DoPollRead(
+ async2::Context& cx) override;
+
+ async2::Poll<> DoPollReadyToWrite(async2::Context& cx) override;
+
+ multibuf::MultiBufAllocator& DoGetWriteAllocator() override {
+ return pair_.allocator_;
+ }
+
+ Result<channel::WriteToken> DoWrite(multibuf::MultiBuf&& data) override;
+
+ async2::Poll<Result<channel::WriteToken>> DoPollFlush(
+ async2::Context&) override;
+
+ async2::Poll<Status> DoPollClose(async2::Context&) override;
+
+ // The two channels share one mutex. Lock safty analysis doesn't understand
+ // that, so has to be disabled for some functions.
+ ForwardingChannelPair<DataType::kDatagram>& pair_;
+ ForwardingChannel& sibling_;
+
+ // Could use a queue here.
+ std::optional<multibuf::MultiBuf> read_queue_ PW_GUARDED_BY(pair_.mutex_);
+ uint32_t write_token_ PW_GUARDED_BY(pair_.mutex_);
+ async2::Waker read_waker_ PW_GUARDED_BY(pair_.mutex_);
+ async2::Waker write_waker_ PW_GUARDED_BY(pair_.mutex_);
+};
+
+template <>
+class ForwardingChannel<DataType::kByte> : public ByteReaderWriter {
+ public:
+ ForwardingChannel(const ForwardingChannel&) = delete;
+ ForwardingChannel& operator=(const ForwardingChannel&) = delete;
+
+ ForwardingChannel(ForwardingChannel&&) = delete;
+ ForwardingChannel& operator=(ForwardingChannel&&) = delete;
+
+ private:
+ friend class ForwardingChannelPair<DataType::kByte>;
+
+ constexpr ForwardingChannel(ForwardingChannelPair<DataType::kByte>& pair,
+ ForwardingChannel* sibling)
+ : pair_(pair), sibling_(*sibling), write_token_(0) {}
+
+ async2::Poll<Result<multibuf::MultiBuf>> DoPollRead(
+ async2::Context& cx) override;
+
+ async2::Poll<> DoPollReadyToWrite(async2::Context&) override {
+ return async2::Ready();
+ }
+
+ multibuf::MultiBufAllocator& DoGetWriteAllocator() override {
+ return pair_.allocator_;
+ }
+
+ Result<channel::WriteToken> DoWrite(multibuf::MultiBuf&& data) override;
+
+ async2::Poll<Result<channel::WriteToken>> DoPollFlush(
+ async2::Context&) override;
+
+ async2::Poll<Status> DoPollClose(async2::Context&) override;
+
+ ForwardingChannelPair<DataType::kByte>& pair_;
+ ForwardingChannel& sibling_;
+
+ multibuf::MultiBuf read_queue_ PW_GUARDED_BY(pair_.mutex_);
+ uint32_t write_token_ PW_GUARDED_BY(pair_.mutex_);
+ async2::Waker read_waker_ PW_GUARDED_BY(pair_.mutex_);
+};
+
+} // namespace internal
+
+// Define the constructor out-of-line, after ForwardingChannel is defined.
+template <DataType kType>
+constexpr ForwardingChannelPair<kType>::ForwardingChannelPair(
+ multibuf::MultiBufAllocator& allocator)
+ : allocator_(allocator), first_(*this, &second_), second_(*this, &first_) {}
+
+} // namespace pw::channel