blob: e61c71c7c2c762efbe7af5142d2e3d629214c8cc [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <cstddef>
#include <optional>
#include "pw_chrono/system_clock.h"
#include "pw_log_rpc/log_service.h"
#include "pw_log_rpc/rpc_log_drain_map.h"
#include "pw_multisink/multisink.h"
#include "pw_result/result.h"
#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_span/span.h"
#include "pw_status/status.h"
#include "pw_status/try.h"
#include "pw_sync/timed_thread_notification.h"
#include "pw_thread/thread_core.h"
namespace pw::log_rpc {
// RpcLogDrainThread is a single thread and single MultiSink::Listener that
// manages multiple log streams. It is a suitable option when a minimal
// thread count is desired but comes with the cost of individual log streams
// blocking each other's flushing.
class RpcLogDrainThread : public thread::ThreadCore,
public multisink::MultiSink::Listener {
public:
RpcLogDrainThread(multisink::MultiSink& multisink,
RpcLogDrainMap& drain_map,
span<std::byte> encoding_buffer)
: drain_map_(drain_map),
multisink_(multisink),
encoding_buffer_(encoding_buffer) {}
void OnNewEntryAvailable() override {
ready_to_flush_notification_.release();
}
// Sequentially flushes each log stream.
void Run() override {
for (auto& drain : drain_map_.drains()) {
multisink_.AttachDrain(drain);
drain.set_on_open_callback(
[this]() { this->ready_to_flush_notification_.release(); });
}
multisink_.AttachListener(*this);
bool drains_pending = true;
std::optional<chrono::SystemClock::duration> min_delay =
chrono::SystemClock::duration::zero();
while (true) {
if (drains_pending && min_delay.has_value()) {
ready_to_flush_notification_.try_acquire_for(min_delay.value());
} else {
ready_to_flush_notification_.acquire();
}
drains_pending = false;
min_delay = std::nullopt;
for (auto& drain : drain_map_.drains()) {
std::optional<chrono::SystemClock::duration> drain_ready_in =
drain.Trickle(encoding_buffer_);
if (drain_ready_in.has_value()) {
min_delay = std::min(drain_ready_in.value(),
min_delay.value_or(drain_ready_in.value()));
drains_pending = true;
}
}
}
}
// Opens a server writer to set up an unrequested log stream.
Status OpenUnrequestedLogStream(uint32_t channel_id,
rpc::Server& rpc_server,
LogService& log_service) {
rpc::RawServerWriter writer =
rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
rpc_server, channel_id, log_service);
const Result<RpcLogDrain*> drain =
drain_map_.GetDrainFromChannelId(channel_id);
PW_TRY(drain.status());
return drain.value()->Open(writer);
}
private:
sync::TimedThreadNotification ready_to_flush_notification_;
RpcLogDrainMap& drain_map_;
multisink::MultiSink& multisink_;
span<std::byte> encoding_buffer_;
};
template <size_t kEncodingBufferSizeBytes>
class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread {
public:
RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink,
RpcLogDrainMap& drain_map)
: RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {}
private:
static_assert(kEncodingBufferSizeBytes >=
RpcLogDrain::kLogEntriesEncodeFrameSize +
RpcLogDrain::kMinEntryBufferSize,
"RpcLogDrainThread's encoding buffer must be large enough for "
"at least one entry");
std::byte encoding_buffer_array_[kEncodingBufferSizeBytes];
};
} // namespace pw::log_rpc