// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#pragma once

#include <cstddef>
#include <optional>
#include <span>

#include "pw_chrono/system_clock.h"
#include "pw_log_rpc/log_service.h"
#include "pw_log_rpc/rpc_log_drain_map.h"
#include "pw_multisink/multisink.h"
#include "pw_result/result.h"
#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_status/status.h"
#include "pw_status/try.h"
#include "pw_sync/timed_thread_notification.h"
#include "pw_thread/thread_core.h"

namespace pw::log_rpc {

// RpcLogDrainThread is a single thread and single MultiSink::Listener that
// manages multiple log streams. It is a suitable option when a minimal
// thread count is desired but comes with the cost of individual log streams
// blocking each other's flushing.
class RpcLogDrainThread : public thread::ThreadCore,
                          public multisink::MultiSink::Listener {
 public:
  RpcLogDrainThread(multisink::MultiSink& multisink,
                    RpcLogDrainMap& drain_map,
                    std::span<std::byte> encoding_buffer)
      : drain_map_(drain_map),
        multisink_(multisink),
        encoding_buffer_(encoding_buffer) {}

  void OnNewEntryAvailable() override {
    ready_to_flush_notification_.release();
  }

  // Sequentially flushes each log stream.
  void Run() override {
    for (auto& drain : drain_map_.drains()) {
      multisink_.AttachDrain(drain);
      drain.set_on_open_callback(
          [this]() { this->ready_to_flush_notification_.release(); });
    }
    multisink_.AttachListener(*this);

    bool drains_pending = true;
    std::optional<chrono::SystemClock::duration> min_delay =
        chrono::SystemClock::duration::zero();
    while (true) {
      if (drains_pending && min_delay.has_value()) {
        ready_to_flush_notification_.try_acquire_for(min_delay.value());
      } else {
        ready_to_flush_notification_.acquire();
      }
      drains_pending = false;
      min_delay = std::nullopt;
      for (auto& drain : drain_map_.drains()) {
        std::optional<chrono::SystemClock::duration> drain_ready_in =
            drain.Trickle(encoding_buffer_);
        if (drain_ready_in.has_value()) {
          min_delay = std::min(drain_ready_in.value(),
                               min_delay.value_or(drain_ready_in.value()));
          drains_pending = true;
        }
      }
    }
  }

  // Opens a server writer to set up an unrequested log stream.
  Status OpenUnrequestedLogStream(uint32_t channel_id,
                                  rpc::Server& rpc_server,
                                  LogService& log_service) {
    rpc::RawServerWriter writer =
        rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
            rpc_server, channel_id, log_service);
    const Result<RpcLogDrain*> drain =
        drain_map_.GetDrainFromChannelId(channel_id);
    PW_TRY(drain.status());
    return drain.value()->Open(writer);
  }

 private:
  sync::TimedThreadNotification ready_to_flush_notification_;
  RpcLogDrainMap& drain_map_;
  multisink::MultiSink& multisink_;
  std::span<std::byte> encoding_buffer_;
};

template <size_t kEncodingBufferSizeBytes>
class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread {
 public:
  RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink,
                              RpcLogDrainMap& drain_map)
      : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {}

 private:
  static_assert(kEncodingBufferSizeBytes >=
                    RpcLogDrain::kLogEntriesEncodeFrameSize +
                        RpcLogDrain::kMinEntryBufferSize,
                "RpcLogDrainThread's encoding buffer must be large enough for "
                "at least one entry");

  std::byte encoding_buffer_array_[kEncodingBufferSizeBytes];
};

}  // namespace pw::log_rpc
