blob: 32a05b419a2e619de291d06e7134b58d56668e39 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_channel/forwarding_channel.h"
#include <algorithm>
#include <array>
#include "pw_allocator/testing.h"
#include "pw_multibuf/header_chunk_region_tracker.h"
#include "pw_multibuf/simple_allocator.h"
#include "pw_string/string.h"
#include "pw_unit_test/framework.h"
namespace {
using ::pw::Result;
using ::pw::async2::Context;
using ::pw::async2::Pending;
using ::pw::async2::Poll;
using ::pw::async2::Ready;
using ::pw::async2::Task;
using ::pw::async2::Waker;
using ::pw::channel::ByteReader;
using ::pw::channel::ByteWriter;
using ::pw::channel::DatagramReader;
using ::pw::channel::DatagramWriter;
using ::pw::multibuf::MultiBuf;
// Creates and initializes a MultiBuf to the specified value.
class InitializedMultiBuf {
public:
InitializedMultiBuf(std::string_view contents) {
std::optional<pw::multibuf::OwnedChunk> chunk =
pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
allocator_, contents.size());
std::memcpy(chunk.value().data(), contents.data(), contents.size());
buf_.PushFrontChunk(std::move(*chunk));
}
pw::multibuf::MultiBuf Take() { return std::move(buf_); }
private:
pw::allocator::test::AllocatorForTest<2048> allocator_;
pw::multibuf::MultiBuf buf_;
};
pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
pw::InlineString<128> contents(mb.size(), '\0');
std::copy(
mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
return contents;
}
template <pw::channel::DataType kType,
size_t kDataSize = 128,
size_t kMetaSize = 128>
class TestChannelPair {
public:
TestChannelPair()
: simple_allocator_(data_area_, meta_alloc_), pair_(simple_allocator_) {}
pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
private:
std::array<std::byte, kDataSize> data_area_;
pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
pw::multibuf::SimpleAllocator simple_allocator_;
pw::channel::ForwardingChannelPair<kType> pair_;
};
// TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
// wakers are stored / woken properly by ForwardingChannel.
TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
pw::async2::Dispatcher dispatcher;
class : public pw::async2::Task {
public:
TestChannelPair<pw::channel::DataType::kDatagram> pair;
int test_completed = 0;
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
// No data yet
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
// Send datagram first->second
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
auto result = pair->first().Write({}); // Write empty datagram
EXPECT_EQ(pw::OkStatus(), result.status());
EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
auto empty_chunk_result = pair->second().PendRead(cx);
EXPECT_TRUE(empty_chunk_result.IsReady());
EXPECT_TRUE(empty_chunk_result->ok());
EXPECT_EQ((*empty_chunk_result)->size(), 0u);
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
// Send datagram second->first
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->second().PendReadyToWrite(cx));
result = pair->second().Write({}); // Write empty datagram
EXPECT_EQ(pw::OkStatus(), result.status());
EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
empty_chunk_result = pair->first().PendRead(cx);
EXPECT_TRUE(empty_chunk_result.IsReady());
EXPECT_TRUE(empty_chunk_result->ok());
EXPECT_EQ((*empty_chunk_result)->size(), 0u);
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
test_completed += 1;
return pw::async2::Ready();
}
} test_task;
dispatcher.Post(test_task);
EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
EXPECT_EQ(test_task.test_completed, 1);
}
TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
pw::async2::Dispatcher dispatcher;
class : public pw::async2::Task {
public:
TestChannelPair<pw::channel::DataType::kDatagram> pair;
int test_completed = 0;
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
InitializedMultiBuf b1("Hello");
InitializedMultiBuf b2("world!");
// Send datagram first->second
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
"Hello");
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
"world!");
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
test_completed += 1;
return pw::async2::Ready();
}
} test_task;
dispatcher.Post(test_task);
EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
EXPECT_EQ(test_task.test_completed, 1);
}
TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
pw::async2::Dispatcher dispatcher;
class : public pw::async2::Task {
public:
TestChannelPair<pw::channel::DataType::kDatagram> pair;
int test_completed = 0;
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
// No data yet
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
// Send datagram first->second
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
auto result = pair->first().Write({}); // Write empty datagram
EXPECT_EQ(pw::OkStatus(), result.status());
EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
auto empty_chunk_result = pair->second().PendRead(cx);
EXPECT_TRUE(empty_chunk_result.IsReady());
EXPECT_TRUE(empty_chunk_result->ok());
EXPECT_EQ((*empty_chunk_result)->size(), 0u);
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
// Send datagram second->first
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->second().PendReadyToWrite(cx));
result = pair->second().Write({}); // Write empty datagram
EXPECT_EQ(pw::OkStatus(), result.status());
EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
empty_chunk_result = pair->first().PendRead(cx);
EXPECT_TRUE(empty_chunk_result.IsReady());
EXPECT_TRUE(empty_chunk_result->ok());
EXPECT_EQ((*empty_chunk_result)->size(), 0u);
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
test_completed += 1;
return pw::async2::Ready();
}
} test_task;
dispatcher.Post(test_task);
EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
EXPECT_EQ(test_task.test_completed, 1);
}
TEST(ForwardingDatagramchannel, PendCloseAwakensAndClosesPeer) {
class TryToReadUntilClosed : public Task {
public:
TryToReadUntilClosed(DatagramReader& reader) : reader_(reader) {}
private:
pw::async2::Poll<> DoPend(Context& cx) final {
Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
if (read.IsPending()) {
return Pending();
}
EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
return Ready();
}
DatagramReader& reader_;
};
pw::async2::Dispatcher dispatcher;
TestChannelPair<pw::channel::DataType::kDatagram> pair;
TryToReadUntilClosed read_task(pair->first());
dispatcher.Post(read_task);
EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
Waker empty_waker;
Context empty_cx(dispatcher, empty_waker);
EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
}
TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
pw::async2::Dispatcher dispatcher;
class : public pw::async2::Task {
public:
TestChannelPair<pw::channel::DataType::kByte> pair;
int test_completed = 0;
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
// No data yet
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
// Send nothing first->second
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
// Still no data
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
// Send nothing second->first
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
// Still no data
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
test_completed += 1;
return pw::async2::Ready();
}
} test_task;
dispatcher.Post(test_task);
EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
EXPECT_EQ(test_task.test_completed, 1);
}
TEST(ForwardingByteChannel, WriteData) {
pw::async2::Dispatcher dispatcher;
class : public pw::async2::Task {
public:
TestChannelPair<pw::channel::DataType::kByte> pair;
int test_completed = 0;
private:
pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
// No data yet
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
InitializedMultiBuf b1("hello");
InitializedMultiBuf b2(" ");
InitializedMultiBuf b3("world");
// Send "hello world" first->second
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write(b3.Take()).status());
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
auto hello_world_result = pair->second().PendRead(cx);
EXPECT_TRUE(hello_world_result.IsReady());
EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
// Send nothing second->first
EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
pair->first().PendReadyToWrite(cx));
EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
// Still no data
EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
test_completed += 1;
return pw::async2::Ready();
}
} test_task;
dispatcher.Post(test_task);
EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
EXPECT_EQ(test_task.test_completed, 1);
}
TEST(ForwardingByteChannel, PendCloseAwakensAndClosesPeer) {
class TryToReadUntilClosed : public Task {
public:
TryToReadUntilClosed(ByteReader& reader) : reader_(reader) {}
private:
pw::async2::Poll<> DoPend(Context& cx) final {
Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
if (read.IsPending()) {
return Pending();
}
EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
return Ready();
}
ByteReader& reader_;
};
pw::async2::Dispatcher dispatcher;
TestChannelPair<pw::channel::DataType::kByte> pair;
TryToReadUntilClosed read_task(pair->first());
dispatcher.Post(read_task);
EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
Waker empty_waker;
Context empty_cx(dispatcher, empty_waker);
EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
}
} // namespace