Replace simulator StreamChannel with upstream
Change-Id: Iaaf3165d20e8b7f656c1cf6b368f874428d840d9
Reviewed-on: https://pigweed-internal-review.git.corp.google.com/c/pigweed/showcase/rp2/+/73133
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Taylor Cramer <cramertj@google.com>
diff --git a/targets/host/BUILD.bazel b/targets/host/BUILD.bazel
index 56f19e9..01edf9e 100644
--- a/targets/host/BUILD.bazel
+++ b/targets/host/BUILD.bazel
@@ -23,7 +23,6 @@
"system.cc",
],
implementation_deps = [
- ":stream_channel",
"//modules/air_sensor:air_sensor_fake",
"//modules/board:board_fake",
"//modules/led:monochrome_led_fake",
@@ -31,6 +30,7 @@
"//modules/light:fake_sensor",
"//modules/proximity:fake_sensor",
"@pigweed//pw_channel",
+ "@pigweed//pw_channel:stream_channel",
"@pigweed//pw_digital_io",
"@pigweed//pw_multibuf:simple_allocator",
"@pigweed//pw_system:async",
@@ -41,20 +41,6 @@
)
cc_library(
- name = "stream_channel",
- srcs = ["stream_channel.cc"],
- hdrs = ["stream_channel.h"],
- target_compatible_with = incompatible_with_mcu(),
- deps = [
- "@pigweed//pw_channel",
- "@pigweed//pw_log",
- "@pigweed//pw_stream",
- "@pigweed//pw_sync:interrupt_spin_lock",
- "@pigweed//pw_sync:thread_notification",
- ],
-)
-
-cc_library(
name = "production_app_threads",
srcs = ["production_app_threads.cc"],
implementation_deps = ["@pigweed//pw_thread_stl:thread"],
diff --git a/targets/host/stream_channel.cc b/targets/host/stream_channel.cc
deleted file mode 100644
index cd6f06a..0000000
--- a/targets/host/stream_channel.cc
+++ /dev/null
@@ -1,210 +0,0 @@
-// Copyright 2024 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#include "targets/host/stream_channel.h"
-
-#include <thread>
-
-#include "pw_assert/check.h"
-#include "pw_async2/dispatcher_base.h"
-#include "pw_log/log.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-
-namespace sense::system {
-
-using pw::OkStatus;
-using pw::Result;
-using pw::Status;
-using pw::async2::Context;
-using pw::async2::Pending;
-using pw::async2::Poll;
-using pw::async2::Ready;
-using pw::async2::WaitReason;
-using pw::async2::Waker;
-using pw::channel::ByteReaderWriter;
-using pw::multibuf::MultiBuf;
-using pw::multibuf::MultiBufAllocationFuture;
-using pw::multibuf::MultiBufAllocator;
-using pw::multibuf::OwnedChunk;
-using pw::sync::InterruptSpinLock;
-using pw::sync::ThreadNotification;
-
-namespace internal {
-
-bool StreamChannelReadState::HasBufferToFill() {
- std::lock_guard lock(buffer_lock_);
- return !buffer_to_fill_.empty();
-}
-
-void StreamChannelReadState::ProvideBufferToFill(MultiBuf&& buf) {
- {
- std::lock_guard lock(buffer_lock_);
- buffer_to_fill_.PushSuffix(std::move(buf));
- }
- buffer_to_fill_available_.release();
-}
-
-Poll<Result<MultiBuf>> StreamChannelReadState::PendFilledBuffer(Context& cx) {
- std::lock_guard lock(buffer_lock_);
- if (!filled_buffer_.empty()) {
- return std::move(filled_buffer_);
- }
- // Return an error status only after pulling all the data.
- if (!status_.ok()) {
- return status_;
- }
- on_buffer_filled_ = cx.GetWaker(pw::async2::WaitReason::Unspecified());
- return Pending();
-}
-
-void StreamChannelReadState::ReadLoop(pw::stream::Reader& reader) {
- while (true) {
- OwnedChunk buffer = WaitForBufferToFillAndTakeFrontChunk();
- Result<pw::ByteSpan> read = reader.Read(buffer);
- if (!read.ok()) {
- PW_LOG_ERROR("Failed to read from stream in StreamChannel.");
- SetReadError(read.status());
- return;
- }
- buffer->Truncate(read->size());
- ProvideFilledBuffer(MultiBuf::FromChunk(std::move(buffer)));
- }
-}
-
-OwnedChunk StreamChannelReadState::WaitForBufferToFillAndTakeFrontChunk() {
- while (true) {
- {
- std::lock_guard lock(buffer_lock_);
- if (!buffer_to_fill_.empty()) {
- return buffer_to_fill_.TakeFrontChunk();
- }
- }
- buffer_to_fill_available_.acquire();
- }
- PW_UNREACHABLE;
-}
-
-void StreamChannelReadState::ProvideFilledBuffer(MultiBuf&& filled_buffer) {
- std::lock_guard lock(buffer_lock_);
- filled_buffer_.PushSuffix(std::move(filled_buffer));
- std::move(on_buffer_filled_).Wake();
-}
-
-void StreamChannelReadState::SetReadError(Status status) {
- std::lock_guard lock(buffer_lock_);
- status_ = status;
-}
-
-Status StreamChannelWriteState::SendData(MultiBuf&& buf) {
- {
- std::lock_guard lock(buffer_lock_);
- if (!status_.ok()) {
- return status_;
- }
- buffer_to_write_.PushSuffix(std::move(buf));
- }
- data_available_.release();
- return OkStatus();
-}
-
-void StreamChannelWriteState::WriteLoop(pw::stream::Writer& writer) {
- while (true) {
- data_available_.acquire();
- MultiBuf buffer;
- {
- std::lock_guard lock(buffer_lock_);
- if (buffer_to_write_.empty()) {
- continue;
- }
- buffer = std::move(buffer_to_write_);
- }
- for (const auto& chunk : buffer.Chunks()) {
- Status status = writer.Write(chunk);
- if (!status.ok()) {
- PW_LOG_ERROR("Failed to write to stream in StreamChannel.");
- std::lock_guard lock(buffer_lock_);
- status_ = status;
- return;
- }
- }
- }
-}
-
-} // namespace internal
-
-static constexpr size_t kMinimumReadSize = 64;
-static constexpr size_t kDesiredReadSize = 1024;
-
-StreamChannel::StreamChannel(MultiBufAllocator& allocator,
- pw::stream::Reader& reader,
- pw::stream::Writer& writer)
- : read_state_(),
- write_state_(),
- allocation_future_(std::nullopt),
- allocator_(&allocator),
- write_token_(0) {
- std::thread read_thread([this, &reader]() { read_state_.ReadLoop(reader); });
- read_thread.detach();
- std::thread write_thread(
- [this, &writer]() { write_state_.WriteLoop(writer); });
- write_thread.detach();
-}
-
-Status StreamChannel::ProvideBufferIfAvailable(Context& cx) {
- if (read_state_.HasBufferToFill()) {
- return OkStatus();
- }
-
- if (!allocation_future_.has_value()) {
- allocation_future_ =
- allocator_->AllocateContiguousAsync(kMinimumReadSize, kDesiredReadSize);
- }
- Poll<std::optional<MultiBuf>> maybe_multibuf = allocation_future_->Pend(cx);
-
- // If this is pending, we'll be awoken and this function will be re-run
- // when a buffer becomes available, allowing us to provide a buffer.
- if (maybe_multibuf.IsPending()) {
- return OkStatus();
- }
-
- allocation_future_ = std::nullopt;
-
- if (!maybe_multibuf->has_value()) {
- PW_LOG_ERROR("Failed to allocate multibuf for reading");
- return Status::ResourceExhausted();
- }
-
- read_state_.ProvideBufferToFill(std::move(**maybe_multibuf));
- return OkStatus();
-}
-
-Poll<Result<MultiBuf>> StreamChannel::DoPendRead(Context& cx) {
- Status status = ProvideBufferIfAvailable(cx);
- if (!status.ok()) {
- return status;
- }
- return read_state_.PendFilledBuffer(cx);
-}
-
-Poll<Status> StreamChannel::DoPendReadyToWrite(Context&) { return OkStatus(); }
-
-pw::Result<pw::channel::WriteToken> StreamChannel::DoWrite(
- pw::multibuf::MultiBuf&& data) {
- write_state_.SendData(std::move(data));
- const uint32_t token = write_token_++;
- return CreateWriteToken(token);
-}
-
-} // namespace sense::system
diff --git a/targets/host/stream_channel.h b/targets/host/stream_channel.h
deleted file mode 100644
index 31f7e09..0000000
--- a/targets/host/stream_channel.h
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright 2024 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include <memory>
-
-#include "pw_async2/dispatcher_base.h"
-#include "pw_channel/channel.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-#include "pw_stream/stream.h"
-#include "pw_sync/interrupt_spin_lock.h"
-#include "pw_sync/thread_notification.h"
-
-namespace sense::system {
-namespace internal {
-
-/// State for the stream-reading thread.
-class StreamChannelReadState {
- public:
- StreamChannelReadState() = default;
- StreamChannelReadState(const StreamChannelReadState&) = delete;
- StreamChannelReadState& operator=(const StreamChannelReadState&) = delete;
- StreamChannelReadState(StreamChannelReadState&&) = delete;
- StreamChannelReadState& operator=(StreamChannelReadState&&) = delete;
-
- /// Whether or not the `ReadLoop` already has a buffer available into which
- /// data can be read.
- bool HasBufferToFill();
-
- /// Provide a buffer for `ReadLoop` to read data into.
- void ProvideBufferToFill(pw::multibuf::MultiBuf&& buf);
-
- /// Receives any available data processed by `ReadLoop`.
- ///
- /// If no data is available, schedules a wakeup of the task in `cx` when
- /// new data arrives.
- pw::async2::Poll<pw::Result<pw::multibuf::MultiBuf>> PendFilledBuffer(
- pw::async2::Context& cx);
-
- /// A loop which reads data from `reader` into buffers provided by
- /// `ProvideBufferToFill` and then makes them available via
- /// `PendFilledBuffer`.
- ///
- /// This is blocking and is intended to be run on an independent thread.
- void ReadLoop(pw::stream::Reader& reader);
-
- private:
- pw::multibuf::OwnedChunk WaitForBufferToFillAndTakeFrontChunk();
- void ProvideFilledBuffer(pw::multibuf::MultiBuf&& filled_buffer);
- void SetReadError(pw::Status status);
-
- pw::sync::ThreadNotification buffer_to_fill_available_;
- pw::async2::Waker on_buffer_filled_;
- pw::sync::InterruptSpinLock buffer_lock_;
- pw::multibuf::MultiBuf buffer_to_fill_ PW_GUARDED_BY(buffer_lock_);
- pw::multibuf::MultiBuf filled_buffer_ PW_GUARDED_BY(buffer_lock_);
- pw::Status status_ PW_GUARDED_BY(buffer_lock_);
-};
-
-/// State for the stream-writing thread.
-class StreamChannelWriteState {
- public:
- StreamChannelWriteState() = default;
- StreamChannelWriteState(const StreamChannelWriteState&) = delete;
- StreamChannelWriteState& operator=(const StreamChannelWriteState&) = delete;
- StreamChannelWriteState(StreamChannelWriteState&&) = delete;
- StreamChannelWriteState& operator=(StreamChannelWriteState&&) = delete;
-
- /// Queues `buf` to be sent into `writer` via the `WriteLoop`.
- ///
- /// Returns a status indicating whether the `WriteLoop` has encountered
- /// errors writing into `writer`.
- pw::Status SendData(pw::multibuf::MultiBuf&& buf);
-
- /// A loop which writes the data sent via `SendData` into `writer`.
- ///
- /// This is blocking and is intended to be run on an independent thread.
- void WriteLoop(pw::stream::Writer& writer);
-
- private:
- pw::sync::ThreadNotification data_available_;
- pw::sync::InterruptSpinLock buffer_lock_;
- pw::multibuf::MultiBuf buffer_to_write_ PW_GUARDED_BY(buffer_lock_);
- pw::Status status_;
-};
-
-} // namespace internal
-
-/// A channel which delegates to an underlying reader and writer stream.
-///
-/// NOTE: this channel as well as its `reader` and `writer` must all continue to
-/// exist for the duration of the program, as they are referenced by other
-/// threads.
-///
-/// This restriction will be relaxed in the future.
-class StreamChannel final : public pw::channel::ByteReaderWriter {
- public:
- StreamChannel(pw::multibuf::MultiBufAllocator& allocator,
- pw::stream::Reader& reader,
- pw::stream::Writer& writer);
-
- // StreamChannel is referenced from other threads and is therefore not movable
- // or copyable.
- StreamChannel(const StreamChannel&) = delete;
- StreamChannel& operator=(const StreamChannel&) = delete;
- StreamChannel(StreamChannel&&) = delete;
- StreamChannel& operator=(StreamChannel&&) = delete;
-
- private:
- pw::Status ProvideBufferIfAvailable(pw::async2::Context& cx);
-
- pw::async2::Poll<pw::Result<pw::multibuf::MultiBuf>> DoPendRead(
- pw::async2::Context& cx) override;
-
- pw::async2::Poll<pw::Status> DoPendReadyToWrite(
- pw::async2::Context& cx) override;
-
- pw::multibuf::MultiBufAllocator& DoGetWriteAllocator() override {
- return *allocator_;
- }
-
- pw::Result<pw::channel::WriteToken> DoWrite(
- pw::multibuf::MultiBuf&& data) override;
-
- pw::async2::Poll<pw::Result<pw::channel::WriteToken>> DoPendFlush(
- pw::async2::Context&) override {
- return CreateWriteToken(write_token_);
- }
-
- pw::async2::Poll<pw::Status> DoPendClose(pw::async2::Context&) override {
- return pw::async2::Ready(pw::OkStatus());
- }
-
- internal::StreamChannelReadState read_state_;
- internal::StreamChannelWriteState write_state_;
- std::optional<pw::multibuf::MultiBufAllocationFuture> allocation_future_;
- pw::multibuf::MultiBufAllocator* allocator_;
- uint32_t write_token_;
-};
-
-} // namespace sense::system
\ No newline at end of file
diff --git a/targets/host/system.cc b/targets/host/system.cc
index b3beb59..585d6de 100644
--- a/targets/host/system.cc
+++ b/targets/host/system.cc
@@ -22,14 +22,16 @@
#include "modules/light/fake_sensor.h"
#include "modules/proximity/fake_sensor.h"
#include "pw_assert/check.h"
+#include "pw_channel/stream_channel.h"
#include "pw_digital_io/digital_io.h"
#include "pw_multibuf/simple_allocator.h"
#include "pw_system/io.h"
#include "pw_system/system.h"
-#include "targets/host/stream_channel.h"
+#include "pw_thread_stl/options.h"
-using pw::digital_io::DigitalIn;
-using pw::digital_io::State;
+using ::pw::channel::StreamChannel;
+using ::pw::digital_io::DigitalIn;
+using ::pw::digital_io::State;
extern "C" {
@@ -88,10 +90,14 @@
static std::byte channel_buffer[16384];
static pw::multibuf::SimpleAllocator multibuf_alloc(channel_buffer,
pw::System().allocator());
- static StreamChannel channel(
- multibuf_alloc, pw::system::GetReader(), pw::system::GetWriter());
+ static pw::NoDestructor<StreamChannel> channel(
+ multibuf_alloc,
+ pw::system::GetReader(),
+ pw::thread::stl::Options(),
+ pw::system::GetWriter(),
+ pw::thread::stl::Options());
- pw::SystemStart(channel);
+ pw::SystemStart(*channel);
PW_UNREACHABLE;
}