blob: b5ce399f53a7db263698f111b6fc818387d52626 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
// __ ___ ___ _ _ ___ _ _ ___
// \ \ / /_\ | _ \ \| |_ _| \| |/ __|
// \ \/\/ / _ \| / .` || || .` | (_ |
// \_/\_/_/ \_\_|_\_|\_|___|_|\_|\___|
// _____ _____ ___ ___ ___ __ __ ___ _ _ _____ _ _
// | __\ \/ / _ \ __| _ \_ _| \/ | __| \| |_ _/_\ | |
// | _| > <| _/ _|| /| || |\/| | _|| .` | | |/ _ \| |__
// |___/_/\_\_| |___|_|_\___|_| |_|___|_|\_| |_/_/ \_\____|
//
// This module is in an early, experimental state. The APIs are in flux and may
// change without notice. Please do not rely on it in production code, but feel
// free to explore and share feedback with the Pigweed team!
#include <cstddef>
#include <cstdint>
#include <limits>
#include "pw_async2/dispatcher.h"
#include "pw_async2/poll.h"
#include "pw_bytes/span.h"
#include "pw_multibuf/allocator.h"
#include "pw_multibuf/multibuf.h"
#include "pw_result/result.h"
#include "pw_status/status.h"
namespace pw::channel {
/// @defgroup pw_channel
/// @{
/// Basic properties of a `Channel`. A `Channel` type can convert to any other
/// `Channel` for which it supports the required properties. For example, a
/// `kReadable` and `kWritable` channel may be passed to an API that only
/// requires `kReadable`.
enum Property : uint8_t {
/// All data is guaranteed to be delivered in order. The channel is closed if
/// data is lost.
kReliable = 1 << 0,
/// The channel supports reading.
kReadable = 1 << 1,
/// The channel supports writing.
kWritable = 1 << 2,
/// The channel supports seeking (changing the read/write position).
kSeekable = 1 << 3,
};
/// The type of data exchanged in `Channel` read and write calls. Unlike
/// `Property`, `Channels` with different `DataType`s cannot be used
/// interchangeably.
enum class DataType : uint8_t { kByte = 0, kDatagram = 1 };
/// Positions from which to seek.
enum Whence : uint8_t {
/// Seek from the beginning of the channel. The offset is a direct offset
/// into the data.
kBeginning,
/// Seek from the current position in the channel. The offset is added to
/// the current position. Use a negative offset to seek backwards.
///
/// Implementations may only support seeking within a limited range from the
/// current position.
kCurrent,
/// Seek from the end of the channel. The offset is added to the end
/// position. Use a negative offset to seek backwards from the end.
kEnd,
};
/// Represents a write operation. `WriteToken` can be used to track whether a
/// particular write has been flushed.
class [[nodiscard]] WriteToken {
public:
constexpr WriteToken() : token_(0) {}
constexpr WriteToken(const WriteToken&) = default;
constexpr WriteToken& operator=(const WriteToken&) = default;
constexpr bool operator==(const WriteToken& other) const {
return token_ == other.token_;
}
constexpr bool operator!=(const WriteToken& other) const {
return token_ != other.token_;
}
constexpr bool operator<(const WriteToken& other) const {
return token_ < other.token_;
}
constexpr bool operator>(const WriteToken& other) const {
return token_ > other.token_;
}
constexpr bool operator<=(const WriteToken& other) const {
return token_ <= other.token_;
}
constexpr bool operator>=(const WriteToken& other) const {
return token_ >= other.token_;
}
private:
friend class AnyChannel;
constexpr WriteToken(uint32_t value) : token_(value) {}
uint32_t token_;
};
/// A generic data channel that may support reading or writing bytes or
/// datagrams.
///
/// Note that this channel should be used from only one ``pw::async::Task``
/// at a time, as the ``Pend`` methods are only required to remember the
/// latest ``pw::async2::Context`` that was provided.
class AnyChannel {
public:
virtual ~AnyChannel() = default;
// Returned by Position() if getting the position is not supported.
// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
// static constexpr size_t kUnknownPosition =
// std::numeric_limits<size_t>::max();
// Channel properties
[[nodiscard]] constexpr DataType data_type() const { return data_type_; }
[[nodiscard]] constexpr bool reliable() const {
return (properties_ & Property::kReliable) != 0;
}
[[nodiscard]] constexpr bool seekable() const {
return (properties_ & Property::kSeekable) != 0;
}
[[nodiscard]] constexpr bool readable() const {
return (properties_ & Property::kReadable) != 0;
}
[[nodiscard]] constexpr bool writable() const {
return (properties_ & Property::kWritable) != 0;
}
[[nodiscard]] constexpr bool is_read_open() const { return read_open_; }
[[nodiscard]] constexpr bool is_write_open() const { return write_open_; }
[[nodiscard]] constexpr bool is_read_or_write_open() const {
return read_open_ || write_open_;
}
/// Read API
/// Returns a `pw::multibuf::MultiBuf` with read data, if available. If data
/// is not available, invokes `cx.waker()` when it becomes available.
///
/// For datagram channels, each successful read yields one complete
/// datagram, which may contain zero or more bytes. For byte stream channels,
/// each successful read yields one or more bytes.
///
/// Channels only support one read operation / waker at a time.
///
/// @returns @rst
///
/// .. pw-status-codes::
///
/// OK: Data was read into a MultiBuf.
///
/// UNIMPLEMENTED: The channel does not support reading.
///
/// FAILED_PRECONDITION: The channel is closed.
///
/// OUT_OF_RANGE: The end of the stream was reached. This may be though
/// of as reaching the end of a file. Future reads may succeed after
/// ``Seek`` ing backwards, but no more new data will be produced. The
/// channel is still open; writes and seeks may succeed.
///
/// @endrst
async2::Poll<Result<multibuf::MultiBuf>> PendRead(async2::Context& cx) {
if (!is_read_open()) {
return Status::FailedPrecondition();
}
async2::Poll<Result<multibuf::MultiBuf>> result = DoPendRead(cx);
if (result.IsReady() && result->status().IsFailedPrecondition()) {
set_read_closed();
}
return result;
}
/// Write API
/// Checks whether a writeable channel is *currently* writeable.
///
/// This should be called before attempting to ``Write``, and may be called
/// before allocating a write buffer if trying to reduce memory pressure.
///
/// This method will return:
///
/// * Ready(OK) - The channel is currently writeable, and a single caller
/// may proceed to ``Write``.
/// * Ready(UNIMPLEMENTED) - The channel does not support writing.
/// * Ready(FAILED_PRECONDITION) - The channel is closed for writing.
/// * Pending - ``cx`` will be awoken when the channel becomes writeable
/// again.
///
/// Note: this method will always return ``Ready`` for non-writeable
/// channels.
async2::Poll<Status> PendReadyToWrite(pw::async2::Context& cx) {
if (!is_write_open()) {
return Status::FailedPrecondition();
}
async2::Poll<Status> result = DoPendReadyToWrite(cx);
if (result.IsReady() && result->IsFailedPrecondition()) {
set_write_closed();
}
return result;
}
/// Gives access to an allocator for write buffers. The MultiBufAllocator
/// provides an asynchronous API for obtaining a buffer.
///
/// This allocator must *only* be used to allocate the next argument to
/// ``Write``. The allocator must be used at most once per call to
/// ``Write``, and the returned ``MultiBuf`` must not be combined with
/// any other ``MultiBuf`` s or ``Chunk`` s.
///
/// This method must not be called on channels which do not support writing.
multibuf::MultiBufAllocator& GetWriteAllocator() {
return DoGetWriteAllocator();
}
/// Writes using a previously allocated MultiBuf. Returns a token that
/// refers to this write. These tokens are monotonically increasing, and
/// PendFlush() returns the value of the latest token it has flushed.
///
/// The ``MultiBuf`` argument to ``Write`` may consist of either:
/// (1) A single ``MultiBuf`` allocated by ``GetWriteAllocator()``
/// that has not been combined with any other ``MultiBuf`` s
/// or ``Chunk``s OR
/// (2) A ``MultiBuf`` containing any combination of buffers from sources
/// other than ``GetWriteAllocator``.
///
/// This requirement allows for more efficient use of memory in case (1).
/// For example, a ring-buffer implementation of a ``Channel`` may
/// specialize ``GetWriteAllocator`` to return the next section of the
/// buffer available for writing.
///
/// @returns @rst
/// May fail with the following error codes:
///
/// .. pw-status-codes::
///
/// OK: Data was accepted by the channel.
///
/// UNIMPLEMENTED: The channel does not support writing.
///
/// UNAVAILABLE: The write failed due to a transient error (only applies
/// to unreliable channels).
///
/// FAILED_PRECONDITION: The channel is closed.
///
/// @endrst
Result<WriteToken> Write(multibuf::MultiBuf&& data) {
if (!is_write_open()) {
return Status::FailedPrecondition();
}
Result<WriteToken> result = DoWrite(std::move(data));
if (result.status().IsFailedPrecondition()) {
set_write_closed();
}
return result;
}
/// Flushes pending writes.
///
/// Returns a ``async2::Poll`` indicating whether or not flushing has
/// completed.
///
/// * Ready(OK) - All data has been successfully flushed.
/// * Ready(UNIMPLEMENTED) - The channel does not support writing.
/// * Ready(FAILED_PRECONDITION) - The channel is closed.
/// * Pending - Data remains to be flushed.
async2::Poll<Result<WriteToken>> PendFlush(async2::Context& cx) {
if (!is_write_open()) {
return Status::FailedPrecondition();
}
async2::Poll<Result<WriteToken>> result = DoPendFlush(cx);
if (result.IsReady() && result->status().IsFailedPrecondition()) {
set_write_closed();
}
return result;
}
/// Seek changes the position in the stream.
///
/// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
///
/// Any ``PendRead`` or ``Write`` calls following a call to ``Seek`` will be
/// relative to the new position. Already-written data still being flushed
/// will be output relative to the old position.
///
/// @returns @rst
///
/// .. pw-status-codes::
///
/// OK: The current position was successfully changed.
///
/// UNIMPLEMENTED: The channel does not support seeking.
///
/// FAILED_PRECONDITION: The channel is closed.
///
/// NOT_FOUND: The seek was to a valid position, but the channel is no
/// longer capable of seeking to this position (partially seekable
/// channels only).
///
/// OUT_OF_RANGE: The seek went beyond the end of the stream.
///
/// @endrst
Status Seek(async2::Context& cx, ptrdiff_t position, Whence whence);
/// Returns the current position in the stream, or `kUnknownPosition` if
/// unsupported.
///
/// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
size_t Position() const;
/// Closes the channel, flushing any data.
///
/// @returns @rst
///
/// .. pw-status-codes::
///
/// OK: The channel was closed and all data was sent successfully.
///
/// DATA_LOSS: The channel was closed, but not all previously written
/// data was delivered.
///
/// FAILED_PRECONDITION: Channel was already closed, which can happen
/// out-of-band due to errors.
///
/// @endrst
async2::Poll<pw::Status> PendClose(async2::Context& cx) {
if (!is_read_or_write_open()) {
return Status::FailedPrecondition();
}
auto result = DoPendClose(cx);
if (result.IsReady()) {
set_read_closed();
set_write_closed();
}
return result;
}
protected:
static constexpr WriteToken CreateWriteToken(uint32_t value) {
return WriteToken(value);
}
// Marks the channel as closed for reading, but does nothing else.
//
// PendClose() always marks the channel closed when DoPendClose() returns
// Ready(), regardless of the status.
void set_read_closed() { read_open_ = false; }
// Marks the channel as closed for writing, but does nothing else.
//
// PendClose() always marks the channel closed when DoPendClose() returns
// Ready(), regardless of the status.
void set_write_closed() { write_open_ = false; }
private:
template <DataType, Property...>
friend class Channel;
template <Property kLhs, Property kRhs, Property... kProperties>
static constexpr bool PropertiesAreInOrderWithoutDuplicates() {
return (kLhs < kRhs) &&
PropertiesAreInOrderWithoutDuplicates<kRhs, kProperties...>();
}
template <Property>
static constexpr bool PropertiesAreInOrderWithoutDuplicates() {
return true;
}
template <Property... kProperties>
static constexpr uint8_t GetProperties() {
return (static_cast<uint8_t>(kProperties) | ...);
}
template <Property... kProperties>
static constexpr bool PropertiesAreValid() {
static_assert(((kProperties != kSeekable) && ...),
"Seekable channels are not yet implemented; see b/323624921");
static_assert(((kProperties == kReadable) || ...) ||
((kProperties == kWritable) || ...),
"At least one of kReadable or kWritable must be provided");
static_assert(sizeof...(kProperties) <= 4,
"Too many properties given; no more than 4 may be specified "
"(kReliable, kReadable, kWritable, kSeekable)");
static_assert(
PropertiesAreInOrderWithoutDuplicates<kProperties...>(),
"Properties must be specified in the following order, without "
"duplicates: kReliable, kReadable, kWritable, kSeekable");
return true;
}
// `AnyChannel` may only be constructed by deriving from `Channel`.
explicit constexpr AnyChannel(DataType type, uint8_t properties)
: read_open_(true),
write_open_(true),
data_type_(type),
properties_(properties) {}
// Virtual interface
// Read functions
// The max_bytes argument is ignored for datagram-oriented channels.
virtual async2::Poll<Result<multibuf::MultiBuf>> DoPendRead(
async2::Context& cx) = 0;
// Write functions
virtual multibuf::MultiBufAllocator& DoGetWriteAllocator() = 0;
virtual pw::async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) = 0;
virtual Result<WriteToken> DoWrite(multibuf::MultiBuf&& buffer) = 0;
virtual pw::async2::Poll<Result<WriteToken>> DoPendFlush(
async2::Context& cx) = 0;
// Seek functions
/// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
// virtual Status DoSeek(ptrdiff_t position, Whence whence) = 0;
// virtual size_t DoPosition() const = 0;
// Common functions
virtual async2::Poll<Status> DoPendClose(async2::Context& cx) = 0;
bool read_open_;
bool write_open_;
DataType data_type_;
uint8_t properties_;
};
/// The basic `Channel` type. Unlike `AnyChannel`, the `Channel`'s properties
/// are expressed in template parameters and thus reflected in the type.
///
/// Properties must be specified in order (`kDatagram`, `kReliable`,
/// `kReadable`, `kWritable`, `kSeekable`) and without duplicates.
template <DataType kDataType, Property... kProperties>
class Channel : public AnyChannel {
static_assert(PropertiesAreValid<kProperties...>());
};
/// A `ByteChannel` exchanges data as a stream of bytes.
template <Property... kProperties>
using ByteChannel = Channel<DataType::kByte, kProperties...>;
/// A `DatagramChannel` exchanges data as a series of datagrams.
template <Property... kProperties>
using DatagramChannel = Channel<DataType::kDatagram, kProperties...>;
/// Unreliable byte-oriented `Channel` that supports reading.
using ByteReader = ByteChannel<kReadable>;
/// Unreliable byte-oriented `Channel` that supports writing.
using ByteWriter = ByteChannel<kWritable>;
/// Unreliable byte-oriented `Channel` that supports reading and writing.
using ByteReaderWriter = ByteChannel<kReadable, kWritable>;
/// Reliable byte-oriented `Channel` that supports reading.
using ReliableByteReader = ByteChannel<kReliable, kReadable>;
/// Reliable byte-oriented `Channel` that supports writing.
using ReliableByteWriter = ByteChannel<kReliable, kWritable>;
/// Reliable byte-oriented `Channel` that supports reading and writing.
using ReliableByteReaderWriter = ByteChannel<kReliable, kReadable, kWritable>;
/// Unreliable datagram-oriented `Channel` that supports reading.
using DatagramReader = DatagramChannel<kReadable>;
/// Unreliable datagram-oriented `Channel` that supports writing.
using DatagramWriter = DatagramChannel<kWritable>;
/// Unreliable datagram-oriented `Channel` that supports reading and writing.
using DatagramReaderWriter = DatagramChannel<kReadable, kWritable>;
/// Reliable datagram-oriented `Channel` that supports reading.
using ReliableDatagramReader = DatagramChannel<kReliable, kReadable>;
/// Reliable datagram-oriented `Channel` that supports writing.
using ReliableDatagramWriter = DatagramChannel<kReliable, kWritable>;
/// Reliable datagram-oriented `Channel` that supports reading and writing.
using ReliableDatagramReaderWriter =
DatagramChannel<kReliable, kReadable, kWritable>;
/// @}
} // namespace pw::channel
// Include specializations for supported Channel types.
#include "pw_channel/internal/channel_specializations.h"