Replace simulator StreamChannel with upstream

Change-Id: Iaaf3165d20e8b7f656c1cf6b368f874428d840d9
Reviewed-on: https://pigweed-internal-review.git.corp.google.com/c/pigweed/showcase/rp2/+/73133
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Taylor Cramer <cramertj@google.com>
diff --git a/targets/host/BUILD.bazel b/targets/host/BUILD.bazel
index 56f19e9..01edf9e 100644
--- a/targets/host/BUILD.bazel
+++ b/targets/host/BUILD.bazel
@@ -23,7 +23,6 @@
         "system.cc",
     ],
     implementation_deps = [
-        ":stream_channel",
         "//modules/air_sensor:air_sensor_fake",
         "//modules/board:board_fake",
         "//modules/led:monochrome_led_fake",
@@ -31,6 +30,7 @@
         "//modules/light:fake_sensor",
         "//modules/proximity:fake_sensor",
         "@pigweed//pw_channel",
+        "@pigweed//pw_channel:stream_channel",
         "@pigweed//pw_digital_io",
         "@pigweed//pw_multibuf:simple_allocator",
         "@pigweed//pw_system:async",
@@ -41,20 +41,6 @@
 )
 
 cc_library(
-    name = "stream_channel",
-    srcs = ["stream_channel.cc"],
-    hdrs = ["stream_channel.h"],
-    target_compatible_with = incompatible_with_mcu(),
-    deps = [
-        "@pigweed//pw_channel",
-        "@pigweed//pw_log",
-        "@pigweed//pw_stream",
-        "@pigweed//pw_sync:interrupt_spin_lock",
-        "@pigweed//pw_sync:thread_notification",
-    ],
-)
-
-cc_library(
     name = "production_app_threads",
     srcs = ["production_app_threads.cc"],
     implementation_deps = ["@pigweed//pw_thread_stl:thread"],
diff --git a/targets/host/stream_channel.cc b/targets/host/stream_channel.cc
deleted file mode 100644
index cd6f06a..0000000
--- a/targets/host/stream_channel.cc
+++ /dev/null
@@ -1,210 +0,0 @@
-// Copyright 2024 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#include "targets/host/stream_channel.h"
-
-#include <thread>
-
-#include "pw_assert/check.h"
-#include "pw_async2/dispatcher_base.h"
-#include "pw_log/log.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-
-namespace sense::system {
-
-using pw::OkStatus;
-using pw::Result;
-using pw::Status;
-using pw::async2::Context;
-using pw::async2::Pending;
-using pw::async2::Poll;
-using pw::async2::Ready;
-using pw::async2::WaitReason;
-using pw::async2::Waker;
-using pw::channel::ByteReaderWriter;
-using pw::multibuf::MultiBuf;
-using pw::multibuf::MultiBufAllocationFuture;
-using pw::multibuf::MultiBufAllocator;
-using pw::multibuf::OwnedChunk;
-using pw::sync::InterruptSpinLock;
-using pw::sync::ThreadNotification;
-
-namespace internal {
-
-bool StreamChannelReadState::HasBufferToFill() {
-  std::lock_guard lock(buffer_lock_);
-  return !buffer_to_fill_.empty();
-}
-
-void StreamChannelReadState::ProvideBufferToFill(MultiBuf&& buf) {
-  {
-    std::lock_guard lock(buffer_lock_);
-    buffer_to_fill_.PushSuffix(std::move(buf));
-  }
-  buffer_to_fill_available_.release();
-}
-
-Poll<Result<MultiBuf>> StreamChannelReadState::PendFilledBuffer(Context& cx) {
-  std::lock_guard lock(buffer_lock_);
-  if (!filled_buffer_.empty()) {
-    return std::move(filled_buffer_);
-  }
-  // Return an error status only after pulling all the data.
-  if (!status_.ok()) {
-    return status_;
-  }
-  on_buffer_filled_ = cx.GetWaker(pw::async2::WaitReason::Unspecified());
-  return Pending();
-}
-
-void StreamChannelReadState::ReadLoop(pw::stream::Reader& reader) {
-  while (true) {
-    OwnedChunk buffer = WaitForBufferToFillAndTakeFrontChunk();
-    Result<pw::ByteSpan> read = reader.Read(buffer);
-    if (!read.ok()) {
-      PW_LOG_ERROR("Failed to read from stream in StreamChannel.");
-      SetReadError(read.status());
-      return;
-    }
-    buffer->Truncate(read->size());
-    ProvideFilledBuffer(MultiBuf::FromChunk(std::move(buffer)));
-  }
-}
-
-OwnedChunk StreamChannelReadState::WaitForBufferToFillAndTakeFrontChunk() {
-  while (true) {
-    {
-      std::lock_guard lock(buffer_lock_);
-      if (!buffer_to_fill_.empty()) {
-        return buffer_to_fill_.TakeFrontChunk();
-      }
-    }
-    buffer_to_fill_available_.acquire();
-  }
-  PW_UNREACHABLE;
-}
-
-void StreamChannelReadState::ProvideFilledBuffer(MultiBuf&& filled_buffer) {
-  std::lock_guard lock(buffer_lock_);
-  filled_buffer_.PushSuffix(std::move(filled_buffer));
-  std::move(on_buffer_filled_).Wake();
-}
-
-void StreamChannelReadState::SetReadError(Status status) {
-  std::lock_guard lock(buffer_lock_);
-  status_ = status;
-}
-
-Status StreamChannelWriteState::SendData(MultiBuf&& buf) {
-  {
-    std::lock_guard lock(buffer_lock_);
-    if (!status_.ok()) {
-      return status_;
-    }
-    buffer_to_write_.PushSuffix(std::move(buf));
-  }
-  data_available_.release();
-  return OkStatus();
-}
-
-void StreamChannelWriteState::WriteLoop(pw::stream::Writer& writer) {
-  while (true) {
-    data_available_.acquire();
-    MultiBuf buffer;
-    {
-      std::lock_guard lock(buffer_lock_);
-      if (buffer_to_write_.empty()) {
-        continue;
-      }
-      buffer = std::move(buffer_to_write_);
-    }
-    for (const auto& chunk : buffer.Chunks()) {
-      Status status = writer.Write(chunk);
-      if (!status.ok()) {
-        PW_LOG_ERROR("Failed to write to stream in StreamChannel.");
-        std::lock_guard lock(buffer_lock_);
-        status_ = status;
-        return;
-      }
-    }
-  }
-}
-
-}  // namespace internal
-
-static constexpr size_t kMinimumReadSize = 64;
-static constexpr size_t kDesiredReadSize = 1024;
-
-StreamChannel::StreamChannel(MultiBufAllocator& allocator,
-                             pw::stream::Reader& reader,
-                             pw::stream::Writer& writer)
-    : read_state_(),
-      write_state_(),
-      allocation_future_(std::nullopt),
-      allocator_(&allocator),
-      write_token_(0) {
-  std::thread read_thread([this, &reader]() { read_state_.ReadLoop(reader); });
-  read_thread.detach();
-  std::thread write_thread(
-      [this, &writer]() { write_state_.WriteLoop(writer); });
-  write_thread.detach();
-}
-
-Status StreamChannel::ProvideBufferIfAvailable(Context& cx) {
-  if (read_state_.HasBufferToFill()) {
-    return OkStatus();
-  }
-
-  if (!allocation_future_.has_value()) {
-    allocation_future_ =
-        allocator_->AllocateContiguousAsync(kMinimumReadSize, kDesiredReadSize);
-  }
-  Poll<std::optional<MultiBuf>> maybe_multibuf = allocation_future_->Pend(cx);
-
-  // If this is pending, we'll be awoken and this function will be re-run
-  // when a buffer becomes available, allowing us to provide a buffer.
-  if (maybe_multibuf.IsPending()) {
-    return OkStatus();
-  }
-
-  allocation_future_ = std::nullopt;
-
-  if (!maybe_multibuf->has_value()) {
-    PW_LOG_ERROR("Failed to allocate multibuf for reading");
-    return Status::ResourceExhausted();
-  }
-
-  read_state_.ProvideBufferToFill(std::move(**maybe_multibuf));
-  return OkStatus();
-}
-
-Poll<Result<MultiBuf>> StreamChannel::DoPendRead(Context& cx) {
-  Status status = ProvideBufferIfAvailable(cx);
-  if (!status.ok()) {
-    return status;
-  }
-  return read_state_.PendFilledBuffer(cx);
-}
-
-Poll<Status> StreamChannel::DoPendReadyToWrite(Context&) { return OkStatus(); }
-
-pw::Result<pw::channel::WriteToken> StreamChannel::DoWrite(
-    pw::multibuf::MultiBuf&& data) {
-  write_state_.SendData(std::move(data));
-  const uint32_t token = write_token_++;
-  return CreateWriteToken(token);
-}
-
-}  // namespace sense::system
diff --git a/targets/host/stream_channel.h b/targets/host/stream_channel.h
deleted file mode 100644
index 31f7e09..0000000
--- a/targets/host/stream_channel.h
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright 2024 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-//     https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include <memory>
-
-#include "pw_async2/dispatcher_base.h"
-#include "pw_channel/channel.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-#include "pw_stream/stream.h"
-#include "pw_sync/interrupt_spin_lock.h"
-#include "pw_sync/thread_notification.h"
-
-namespace sense::system {
-namespace internal {
-
-/// State for the stream-reading thread.
-class StreamChannelReadState {
- public:
-  StreamChannelReadState() = default;
-  StreamChannelReadState(const StreamChannelReadState&) = delete;
-  StreamChannelReadState& operator=(const StreamChannelReadState&) = delete;
-  StreamChannelReadState(StreamChannelReadState&&) = delete;
-  StreamChannelReadState& operator=(StreamChannelReadState&&) = delete;
-
-  /// Whether or not the `ReadLoop` already has a buffer available into which
-  /// data can be read.
-  bool HasBufferToFill();
-
-  /// Provide a buffer for `ReadLoop` to read data into.
-  void ProvideBufferToFill(pw::multibuf::MultiBuf&& buf);
-
-  /// Receives any available data processed by `ReadLoop`.
-  ///
-  /// If no data is available, schedules a wakeup of the task in `cx` when
-  /// new data arrives.
-  pw::async2::Poll<pw::Result<pw::multibuf::MultiBuf>> PendFilledBuffer(
-      pw::async2::Context& cx);
-
-  /// A loop which reads data from `reader` into buffers provided by
-  /// `ProvideBufferToFill` and then makes them available via
-  /// `PendFilledBuffer`.
-  ///
-  /// This is blocking and is intended to be run on an independent thread.
-  void ReadLoop(pw::stream::Reader& reader);
-
- private:
-  pw::multibuf::OwnedChunk WaitForBufferToFillAndTakeFrontChunk();
-  void ProvideFilledBuffer(pw::multibuf::MultiBuf&& filled_buffer);
-  void SetReadError(pw::Status status);
-
-  pw::sync::ThreadNotification buffer_to_fill_available_;
-  pw::async2::Waker on_buffer_filled_;
-  pw::sync::InterruptSpinLock buffer_lock_;
-  pw::multibuf::MultiBuf buffer_to_fill_ PW_GUARDED_BY(buffer_lock_);
-  pw::multibuf::MultiBuf filled_buffer_ PW_GUARDED_BY(buffer_lock_);
-  pw::Status status_ PW_GUARDED_BY(buffer_lock_);
-};
-
-/// State for the stream-writing thread.
-class StreamChannelWriteState {
- public:
-  StreamChannelWriteState() = default;
-  StreamChannelWriteState(const StreamChannelWriteState&) = delete;
-  StreamChannelWriteState& operator=(const StreamChannelWriteState&) = delete;
-  StreamChannelWriteState(StreamChannelWriteState&&) = delete;
-  StreamChannelWriteState& operator=(StreamChannelWriteState&&) = delete;
-
-  /// Queues `buf` to be sent into `writer` via the `WriteLoop`.
-  ///
-  /// Returns a status indicating whether the `WriteLoop` has encountered
-  /// errors writing into `writer`.
-  pw::Status SendData(pw::multibuf::MultiBuf&& buf);
-
-  /// A loop which writes the data sent via `SendData` into `writer`.
-  ///
-  /// This is blocking and is intended to be run on an independent thread.
-  void WriteLoop(pw::stream::Writer& writer);
-
- private:
-  pw::sync::ThreadNotification data_available_;
-  pw::sync::InterruptSpinLock buffer_lock_;
-  pw::multibuf::MultiBuf buffer_to_write_ PW_GUARDED_BY(buffer_lock_);
-  pw::Status status_;
-};
-
-}  // namespace internal
-
-/// A channel which delegates to an underlying reader and writer stream.
-///
-/// NOTE: this channel as well as its `reader` and `writer` must all continue to
-/// exist for the duration of the program, as they are referenced by other
-/// threads.
-///
-/// This restriction will be relaxed in the future.
-class StreamChannel final : public pw::channel::ByteReaderWriter {
- public:
-  StreamChannel(pw::multibuf::MultiBufAllocator& allocator,
-                pw::stream::Reader& reader,
-                pw::stream::Writer& writer);
-
-  // StreamChannel is referenced from other threads and is therefore not movable
-  // or copyable.
-  StreamChannel(const StreamChannel&) = delete;
-  StreamChannel& operator=(const StreamChannel&) = delete;
-  StreamChannel(StreamChannel&&) = delete;
-  StreamChannel& operator=(StreamChannel&&) = delete;
-
- private:
-  pw::Status ProvideBufferIfAvailable(pw::async2::Context& cx);
-
-  pw::async2::Poll<pw::Result<pw::multibuf::MultiBuf>> DoPendRead(
-      pw::async2::Context& cx) override;
-
-  pw::async2::Poll<pw::Status> DoPendReadyToWrite(
-      pw::async2::Context& cx) override;
-
-  pw::multibuf::MultiBufAllocator& DoGetWriteAllocator() override {
-    return *allocator_;
-  }
-
-  pw::Result<pw::channel::WriteToken> DoWrite(
-      pw::multibuf::MultiBuf&& data) override;
-
-  pw::async2::Poll<pw::Result<pw::channel::WriteToken>> DoPendFlush(
-      pw::async2::Context&) override {
-    return CreateWriteToken(write_token_);
-  }
-
-  pw::async2::Poll<pw::Status> DoPendClose(pw::async2::Context&) override {
-    return pw::async2::Ready(pw::OkStatus());
-  }
-
-  internal::StreamChannelReadState read_state_;
-  internal::StreamChannelWriteState write_state_;
-  std::optional<pw::multibuf::MultiBufAllocationFuture> allocation_future_;
-  pw::multibuf::MultiBufAllocator* allocator_;
-  uint32_t write_token_;
-};
-
-}  // namespace sense::system
\ No newline at end of file
diff --git a/targets/host/system.cc b/targets/host/system.cc
index b3beb59..585d6de 100644
--- a/targets/host/system.cc
+++ b/targets/host/system.cc
@@ -22,14 +22,16 @@
 #include "modules/light/fake_sensor.h"
 #include "modules/proximity/fake_sensor.h"
 #include "pw_assert/check.h"
+#include "pw_channel/stream_channel.h"
 #include "pw_digital_io/digital_io.h"
 #include "pw_multibuf/simple_allocator.h"
 #include "pw_system/io.h"
 #include "pw_system/system.h"
-#include "targets/host/stream_channel.h"
+#include "pw_thread_stl/options.h"
 
-using pw::digital_io::DigitalIn;
-using pw::digital_io::State;
+using ::pw::channel::StreamChannel;
+using ::pw::digital_io::DigitalIn;
+using ::pw::digital_io::State;
 
 extern "C" {
 
@@ -88,10 +90,14 @@
   static std::byte channel_buffer[16384];
   static pw::multibuf::SimpleAllocator multibuf_alloc(channel_buffer,
                                                       pw::System().allocator());
-  static StreamChannel channel(
-      multibuf_alloc, pw::system::GetReader(), pw::system::GetWriter());
+  static pw::NoDestructor<StreamChannel> channel(
+      multibuf_alloc,
+      pw::system::GetReader(),
+      pw::thread::stl::Options(),
+      pw::system::GetWriter(),
+      pw::thread::stl::Options());
 
-  pw::SystemStart(channel);
+  pw::SystemStart(*channel);
   PW_UNREACHABLE;
 }