blob: 6933690bc89138f901a16fe1373ca7bac5bc7fc1 [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <array>
#include <cstdint>
#include "pw_assert/assert.h"
#include "pw_bytes/span.h"
#include "pw_log/proto/log.pwpb.h"
#include "pw_multisink/multisink.h"
#include "pw_protobuf/serialized_size.h"
#include "pw_result/result.h"
#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/mutex.h"
namespace pw::log_rpc {
// RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A
// RPC channel ID identifies this drain. The user must attach this drain
// to a MultiSink that returns a log::LogEntry, and provide a buffer large
// enough to hold the largest log::LogEntry transmittable. The user must call
// Flush(), which, on every call, packs as many log::LogEntry items as possible
// into a log::LogEntries message, writes the message to the provided writer,
// then repeats the process until there are no more entries in the MultiSink or
// the writer failed to write the outgoing package, in which case the RPC on
// the writer is closed. When close_stream_on_writer_error is false the drain
// will continue to retrieve log entries out of the MultiSink and attempt to
// send them out ignoring the writer errors. Note: this behavior might change or
// be removed in the future.
class RpcLogDrain : public multisink::MultiSink::Drain {
// The minimum buffer size, without the message payload, needed to retrieve a
// log::LogEntry from the attached MultiSink. The user must account for the
// max message size to avoid log entry drops.
static constexpr size_t kMinEntrySizeWithoutPayload =
// message
protobuf::SizeOfFieldKey(1) +
1 // Assume minimum varint length, skip the payload bytes.
// line_level
+ protobuf::SizeOfFieldKey(2) +
// flags
+ protobuf::SizeOfFieldKey(3) +
// timestamp or time_since_last_entry
+ protobuf::SizeOfFieldKey(4) + protobuf::kMaxSizeBytesInt64;
// Message format to report the drop count.
static constexpr char kDropMessageFormatString[] = "Dropped %u";
// With a uint32_t number, "Dropped %u" is no more than 18 characters long.
static constexpr size_t kMaxDropMessageSize = 18;
// The smallest buffer size must be able to fit a drop message.
static constexpr size_t kMinEntryBufferSize =
kMaxDropMessageSize + kMinEntrySizeWithoutPayload;
// Creates a log stream with the provided open writer. Useful for streaming
// logs without a request.
// The provided buffer must be large enough to hold the largest transmittable
// log::LogEntry or a drop count message at the very least. The user can
// choose to provide a unique mutex for the drain, or share it to save RAM as
// long as they are aware of contengency issues.
RpcLogDrain(uint32_t channel_id,
ByteSpan log_entry_buffer,
rpc::RawServerWriter writer,
sync::Mutex& mutex,
bool close_stream_on_writer_error)
: channel_id_(channel_id),
mutex_(mutex) {
PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
// Creates a closed log stream with a writer that can be set at a later time.
// The provided buffer must be large enough to hold the largest transmittable
// log::LogEntry or a drop count message at the very least. The user can
// choose to provide a unique mutex for the drain, or share it to save RAM as
// long as they are aware of contengency issues.
RpcLogDrain(uint32_t channel_id,
ByteSpan log_entry_buffer,
sync::Mutex& mutex,
bool close_stream_on_writer_error)
: channel_id_(channel_id),
mutex_(mutex) {
PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
// Not copyable.
RpcLogDrain(const RpcLogDrain&) = delete;
RpcLogDrain& operator=(const RpcLogDrain&) = delete;
// Configures the drain with a new open server writer if the current one is
// not open.
// Return values:
// OK - Successfully set the new open writer.
// FAILED_PRECONDITION - The given writer is not open.
// ALREADY_EXISTS - an open writer is already set.
Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_);
// Accesses log entries and sends them via the writer. Expected to be called
// frequently to avoid log drops. If the writer fails to send a packet with
// multiple log entries, the entries are dropped and a drop message with the
// count is sent. When close_stream_on_writer_error is set, the stream will
// automatically be closed and Flush will return the writer error.
// Precondition: the drain must be attached to a MultiSink.
// Return values:
// OK - all entries were consumed.
// ABORTED - there was an error writing the packet, and
// close_stream_on_writer_error is true.
Status Flush() PW_LOCKS_EXCLUDED(mutex_);
// Ends RPC log stream without flushing.
// Return values:
// OK - successfully closed the server writer.
// FAILED_PRECONDITION - The given writer is not open.
// Errors from the underlying writer send packet.
Status Close() PW_LOCKS_EXCLUDED(mutex_);
uint32_t channel_id() const { return channel_id_; }
enum class LogDrainState {
// Fills the outgoing buffer with as many entries as possible.
LogDrainState EncodeOutgoingPacket(log::LogEntries::MemoryEncoder& encoder,
uint32_t packed_entry_count_out)
const uint32_t channel_id_;
const bool close_stream_on_writer_error_;
rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_);
const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
uint32_t committed_entry_drop_count_ PW_GUARDED_BY(mutex_);
sync::Mutex& mutex_;
} // namespace pw::log_rpc