blob: ee9a0d6cb54c5367f69436e5aabd4edde8438003 [file] [log] [blame]
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
/// @file
/// This file defines types related to a multi-producer, single-consumer stream.
///
/// The single readers must be constructed in place, while writers can be moved.
/// A reader and writer may be connected using `CreateMpscStream()`. Additional
/// writers may be connected by copying a previously connected writer.
///
/// Example:
///
/// @code{.cpp}
/// void WriteThreadRoutine(void* arg) {
/// auto *writer = static_cast<MpscWriter *>(arg);
/// ConstByteSpan data = GenerateSomeData();
/// Status status = writer->Write(data);
/// ...
/// }
/// ...
/// MpscReader reader;
/// MpscWriter writer;
/// CreateMpscStream(reader, writer);
/// thread::Thread t(MakeThreadOptions(), WriteThreadRoutine, &writer);
/// std::byte buffer[kBufSize];
/// if (auto status = reader.Read(ByteSpan(buffer)); status.ok()) {
/// ProcessSomeData(buffer);
/// }
/// @endcode
///
/// See the `MpscReader::ReadAll()` for additional examples.
///
/// The types in the files are designed to be used across different threads,
/// but are not completely thread-safe. Data must only be written by an
/// MpscWriter using a single thread, and data must only be read by an
/// MpscReader using a single thread. In other words, multiple calls to
/// `Write()` must not be made concurrently, and multiple calls to `Read()` and
/// `ReadAll()` must not be made concurrently. Calls to other methods, e.g.
/// `Close()`, are thread-safe and may be made from any thread.
#include <cstddef>
#include "pw_bytes/span.h"
#include "pw_chrono/system_clock.h"
#include "pw_containers/intrusive_list.h"
#include "pw_function/function.h"
#include "pw_status/status.h"
#include "pw_status/status_with_size.h"
#include "pw_stream/stream.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/mutex.h"
#include "pw_sync/timed_thread_notification.h"
namespace pw::stream {
// Forward declaration.
class MpscReader;
class MpscWriter;
/// Creates a multi-producer, single consumer stream.
///
/// This method creates a stream by associating a reader and writer. Both are
/// reset before being connected. This is the only way to connect a reader.
/// Additional writers may be connected by copying the given writer after it is
/// connected.
///
/// This method is thread-safe with respect to other MpscReader and MpscWriter
/// methods. It is not thread-safe with respect to itself, i.e. callers must
/// not make concurrent calls to `CreateMpscStream()` from different threads
/// with the same objects.
///
/// @param[out] reader The reader to connect.
/// @param[out] writer The writer to connect.
void CreateMpscStream(MpscReader& reader, MpscWriter& writer);
/// Writer for a multi-producer, single consumer stream.
///
/// This class has a default constructor that only produces disconnected
/// writers. To connect writers, use `CreateMpscStream()`. Additional connected
/// writers can be created by copying an existing one.
///
/// Each thread should have its own dedicated writer. This class is thread-safe
/// with respect to the reader, but not with respect to itself. In particular,
/// attempting to call `Write()` concurrently on different threads may cause
/// result in a failure.
class MpscWriter : public NonSeekableWriter,
public IntrusiveList<MpscWriter>::Item {
public:
using duration = std::optional<chrono::SystemClock::duration>;
/// A per-writer thread notification that can be added to a reader's list.
///
/// The reader maintains a list of outstanding requests to write data. As
/// data is read, and space to write data becomes available, it uses these
/// requests to signal the waiting the writers.
struct Request : public IntrusiveList<Request>::Item {
sync::TimedThreadNotification notification;
using IntrusiveList<Request>::Item::unlisted;
};
MpscWriter() = default;
MpscWriter(const MpscWriter& other);
MpscWriter& operator=(const MpscWriter& other);
MpscWriter(MpscWriter&& other);
MpscWriter& operator=(MpscWriter&& other);
~MpscWriter() override;
/// Returns whether this object is connected to a reader.
bool connected() const PW_LOCKS_EXCLUDED(mutex_);
/// Indicates how much data was sent in the last call to `Write()`.
size_t last_write() const PW_LOCKS_EXCLUDED(mutex_);
/// Returns the optional maximum time elapsed before a `Write()` fails.
const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_);
/// Set the timeout for writing to this stream.
///
/// After setting a timeout, if the given duration elapses while making a call
/// to `Write()`, @pw_status{RESOURCE_EXHAUSTED} will be returned. If desired,
/// a timeout should be set before calling `Write()`. Setting a timeout when a
/// writer is awaiting notification from a reader will not affect the duration
/// of that wait.
///
/// Note that setting a write timeout makes partial writes possible. For
/// example, if a call to `Write()` of some length corresponds to 2 calls to
/// `Read()` of half that length with an sufficient delay between the calls
/// will result in the first half being written and read, but not the second.
/// This differs from `Stream::Write()` which stipulates that no data is
/// written on failure. If this happens, the length of the data written can be
/// retrieved using `last_write()`.
///
/// Generally, callers should use one of three approaches:
/// 1. Do not set a write timeout, and let writers block arbitrarily long
/// until space is available or the reader is disconnected.
/// 2. Use only a single writer, and use `last_write()` to resend data.
/// 3. Structure the data being sent so that the reader can always read
/// complete messages and avoid blocking or performing complex work
/// mid-message.
///
/// @param[in] timeout The duration to wait before returning an error.
void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
/// Sets the maximum amount that can be written by this writer.
///
/// By default, writers can write an unlimited amount of data. This method can
/// be used to set a limit, or remove it by providing a value of
/// Stream::kUnlimited.
///
/// If a limit is set, the writer will automatically close once it has written
/// that much data. The current number of bytes remaining until the limit is
/// reached can be retrieved using `ConservativeWriteLimit()`.
///
/// @param[in] limit The maximum amount that can be written by this writer.
void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_);
/// Disconnects this writer from its reader.
///
/// This method does nothing if the writer is not connected.
void Close() PW_LOCKS_EXCLUDED(mutex_);
private:
// The factory method is allowed to directly modify a writer to connect it
// to the reader.
friend void CreateMpscStream(MpscReader&, MpscWriter&);
/// @copydoc Stream::ConservativeLimit
size_t ConservativeLimit(LimitType type) const override;
/// @copydoc Stream::DoWrite
///
/// This method is *not* thread-safe with respect to itself. If multiple
/// threads attempt to write concurrently using the same writer, those calls
/// may fail. Instead, each thread should have its own writer.
///
/// @pre No other thread has called `Write()` on this object.
Status DoWrite(ConstByteSpan data) override;
/// Locked implementation of `Close()`.
void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
mutable sync::Mutex mutex_;
MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr;
size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited;
Request write_request_;
duration timeout_ PW_GUARDED_BY(mutex_);
size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
};
/// Reader of a multi-producer, single-consumer stream.
///
/// The reader manages 3 aspects of the stream:
/// * The storage used to hold written data that is to be read.
/// * The list of connected writers.
/// * Accounting for how much data has and can be written.
///
/// This class has a default constructor that can only produce a disconnected
/// reader. To connect a reader, use `CreateMpscStream()`.
class MpscReader : public NonSeekableReader {
public:
using duration = std::optional<chrono::SystemClock::duration>;
MpscReader();
~MpscReader() override;
/// Returns whether this object has any connected writers.
bool connected() const PW_LOCKS_EXCLUDED(mutex_);
/// Set the timeout for reading from this stream.
///
/// After setting a timeout, if the given duration elapses while making a call
/// to `Read()`, RESOURCE_EXHAUSTED will be returned. If desired, a timeout
/// should be set before calling `Read()` or `ReadAll()`. Setting a timeout
/// when a reader is awaiting notification from a writer will not affect the
/// duration of that wait. `ReadUntilClose()` ignores timeouts entirely.
///
/// @param[in] timeout The duration to wait before returning an error.
void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
/// Associates the reader with storage to buffer written data to be read.
///
/// If desired, callers can use this method to buffer written data. This can
/// improve writer performance by allowing calls to `WriteData()` to avoid
/// waiting for the reader, albeit at the cost of increased memory. This can
/// be useful when the reader needs time to process the data it reads, or when
/// the volume of writes varies over time, i.e. is "bursty".
///
/// The reader does not take ownership of the storage, which must be valid
/// until a call to the destructor or another call to `SetBuffer()`.
///
/// @param[in] buffer A view to the storage.
void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_);
/// @fn ReadAll
/// Reads data in a loop and passes it to a provided callback.
///
/// This will read continuously until all connected writers close.
///
/// Example usage:
///
/// @code(.cpp}
/// MpscReader reader;
/// MpscWriter writer;
/// MpscStreamCreate(reader, writer);
/// thread::Thread t(MakeThreadOptions(), [] (void*arg) {
/// auto *writer = static_cast<MpscWriter *>(arg);
/// writer->Write(GenerateSomeData()).IgnoreError();
/// }, &writer);
/// auto status = reader.ReadAll([] (ConstByteSpan data) {
/// return ProcessSomeData();
/// });
/// t.join();
/// @endcode
///
/// @param[in] callback A callable object to invoke on data as it is read.
/// @retval OK Successfully read until writers closed.
/// @retval FAILED_PRECONDITION The object does not have a buffer.
/// @retval RESOURCE_EXHAUSTED Timed out when reading data. This can only
/// occur if a timeout has been set.
/// @retval Any other error as returned by the callback.
using ReadAllCallback = Function<Status(ConstByteSpan data)>;
Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_);
/// Disconnects all writers and drops any unread data.
void Close() PW_LOCKS_EXCLUDED(mutex_);
private:
// The factory method is allowed to directly modify the reader to connect it
// to a writer.
friend void CreateMpscStream(MpscReader&, MpscWriter&);
// The writer is allowed to call directly into the reader to:
// * Add/remove itself to the reader's list of writer.
// * Request space to write data, and to write to that space.
friend class MpscWriter;
/// @fn IncreaseLimit
/// @fn IncreaseLimitLocked
/// Increases the number of remaining bytes to be written.
///
/// Used by `MpscWriter::SetLimit()` and `MpscWriter::WriteData()`.
///
/// @param[in] delta How much to increase the number of remaining bytes.
void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// @fn DecreaseLimit
/// @fn DecreaseLimitLocked
/// Decreases the number of remaining bytes to be written.
///
/// Used by `MpscWriter::SetLimit()` and `MpscWriter::RemoveWriter()`.
///
/// @param[in] delta How much to decrease the number of remaining bytes.
void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// @copydoc Stream::ConservativeLimit
size_t ConservativeLimit(Stream::LimitType type) const override
PW_LOCKS_EXCLUDED(mutex_);
/// Adds the write request to the reader's list of pending requests.
///
/// Used by `MpscWriter::WriteData()`.
///
/// @param[in] write_request A writer's request object.
void RequestWrite(MpscWriter::Request& write_request)
PW_LOCKS_EXCLUDED(mutex_);
/// Checks if a writer can write data, and signals it if so.
///
/// A reader may signal a writer because:
/// * Space to write data has become available.
/// * The queue of write requests has changed.
/// * The reader is closing. `WriteData()` will return OUT_OF_RANGE.
void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Adds data from a writer to the buffer to be read.
///
/// @param[in] data The data to be written.
/// @param[in] limit The writer's current write limit.
///
/// @retval OK Data was written to the buffer.
/// @retval RESOURCE_EXHAUSTED Buffer has insufficent space for data.
/// @retval OUT_OF_RANGE Stream is shut down or closed.
StatusWithSize WriteData(ConstByteSpan data, size_t limit)
PW_LOCKS_EXCLUDED(mutex_);
/// @fn CompleteWrite
/// @fn CompleteWriteLocked
/// Removes the write request from the reader's list of pending requests.
///
/// Used by `MpscWriter::WriteData()` and `MpscWriter::CloseLocked()`.
///
/// @param[in] write_request A writer's request object.
void CompleteWrite(MpscWriter::Request& write_request)
PW_LOCKS_EXCLUDED(mutex_);
void CompleteWriteLocked(MpscWriter::Request& write_request)
PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// @copydoc Stream::DoRead
StatusWithSize DoRead(ByteSpan destination) override
PW_LOCKS_EXCLUDED(mutex_);
// Locked implementations.
mutable sync::Mutex mutex_;
IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_);
IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_);
IntrusiveList<MpscWriter::Request>::iterator last_request_
PW_GUARDED_BY(mutex_);
size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0;
size_t limit_ PW_GUARDED_BY(mutex_) = 0;
bool reading_ PW_GUARDED_BY(mutex_) = false;
sync::TimedThreadNotification readable_;
sync::ThreadNotification closeable_;
duration timeout_ PW_GUARDED_BY(mutex_);
ByteSpan destination_ PW_GUARDED_BY(mutex_);
size_t written_ PW_GUARDED_BY(mutex_) = 0;
ByteSpan buffer_ PW_GUARDED_BY(mutex_);
size_t offset_ PW_GUARDED_BY(mutex_) = 0;
size_t length_ PW_GUARDED_BY(mutex_) = 0;
};
/// Reader for a multi-producer, single consumer stream.
///
/// This class includes an explicitly-sized buffer. It has a default constructor
/// that can only produce a disconnected reader. To connect a reader, use
/// `CreateMpscStream()`.
template <size_t kCapacity>
class BufferedMpscReader : public MpscReader {
public:
BufferedMpscReader() { SetBuffer(buffer_); }
private:
std::array<std::byte, kCapacity> buffer_;
};
} // namespace pw::stream