// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#pragma once

#include <algorithm>
#include <array>
#include <cstdint>
#include <limits>
#include <optional>
#include <string_view>

#include "pw_assert/assert.h"
#include "pw_bytes/span.h"
#include "pw_chrono/system_clock.h"
#include "pw_function/function.h"
#include "pw_log/proto/log.pwpb.h"
#include "pw_log_rpc/internal/config.h"
#include "pw_log_rpc/log_filter.h"
#include "pw_multisink/multisink.h"
#include "pw_protobuf/serialized_size.h"
#include "pw_result/result.h"
#include "pw_rpc/raw/server_reader_writer.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/mutex.h"

namespace pw::log_rpc {

// RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A
// RPC channel ID identifies this drain. The user must attach this drain
// to a MultiSink that returns a log::LogEntry, and provide a buffer large
// enough to hold the largest log::LogEntry transmittable. The user must call
// Flush(), which, on every call, packs as many log::LogEntry items as possible
// into a log::LogEntries message, writes the message to the provided writer,
// then repeats the process until there are no more entries in the MultiSink or
// the writer failed to write the outgoing package and error_handling is set to
// `kCloseStreamOnWriterError`. When error_handling is `kIgnoreWriterErrors` the
// drain will continue to retrieve log entries out of the MultiSink and attempt
// to send them out ignoring the writer errors without sending a drop count.
// Note: the error handling and drop count reporting might change in the future.
// Log filtering is done using the rules of the Filter provided if any.
class RpcLogDrain : public multisink::MultiSink::Drain {
 public:
  // Dictates how to handle server writer errors.
  enum class LogDrainErrorHandling {
    kIgnoreWriterErrors,
    kCloseStreamOnWriterError,
  };

  // The minimum buffer size, without the message payload or module sizes,
  // needed to retrieve a log::LogEntry from the attached MultiSink. The user
  // must account for the max message size to avoid log entry drops. The dropped
  // field is not accounted since a dropped message has all other fields unset.
  static constexpr size_t kMinEntrySizeWithoutPayload =
      protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MESSAGE, 0) +
      protobuf::SizeOfFieldUint32(log::LogEntry::Fields::LINE_LEVEL) +
      protobuf::SizeOfFieldUint32(log::LogEntry::Fields::FLAGS) +
      protobuf::SizeOfFieldInt64(log::LogEntry::Fields::TIMESTAMP) +
      protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MODULE, 0) +
      protobuf::SizeOfFieldBytes(log::LogEntry::Fields::FILE, 0) +
      protobuf::SizeOfFieldBytes(log::LogEntry::Fields::THREAD, 0);

  // Error messages sent when logs are dropped.
  static constexpr std::string_view kIngressErrorMessage{
      PW_LOG_RPC_INGRESS_ERROR_MSG};
  static constexpr std::string_view kSlowDrainErrorMessage{
      PW_LOG_RPC_SLOW_DRAIN_MSG};
  static constexpr std::string_view kSmallOutboundBufferErrorMessage{
      PW_LOG_RPC_SMALL_OUTBOUND_BUFFER_MSG};
  static constexpr std::string_view kSmallStackBufferErrorMessage{
      PW_LOG_RPC_SMALL_STACK_BUFFER_MSG};
  static constexpr std::string_view kWriterErrorMessage{
      PW_LOG_RPC_WRITER_ERROR_MSG};
  // The smallest entry buffer must fit the largest error message, or a typical
  // token size (4B), whichever is largest.
  static constexpr size_t kLargestErrorMessageOrTokenSize =
      std::max({size_t(4),
                kIngressErrorMessage.size(),
                kSlowDrainErrorMessage.size(),
                kSmallOutboundBufferErrorMessage.size(),
                kSmallStackBufferErrorMessage.size(),
                kWriterErrorMessage.size()});
  static constexpr size_t kMinEntryBufferSize =
      kMinEntrySizeWithoutPayload + sizeof(kLargestErrorMessageOrTokenSize);

  // When encoding LogEntry in LogEntries, there are kLogEntriesEncodeFrameSize
  // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize
  // can be used to calculate the minimum RPC ChannelOutput buffer size.
  static constexpr size_t kLogEntriesEncodeFrameSize =
      protobuf::TagSizeBytes(log::LogEntries::Fields::ENTRIES) +
      protobuf::kMaxSizeOfLength +
      protobuf::SizeOfFieldUint32(
          log::LogEntries::Fields::FIRST_ENTRY_SEQUENCE_ID);

  // Creates a closed log stream with a writer that can be set at a later time.
  // The provided buffer must be large enough to hold the largest transmittable
  // log::LogEntry or a drop count message at the very least. The user can
  // choose to provide a unique mutex for the drain, or share it to save RAM as
  // long as they are aware of contengency issues.
  RpcLogDrain(
      const uint32_t channel_id,
      ByteSpan log_entry_buffer,
      sync::Mutex& mutex,
      LogDrainErrorHandling error_handling,
      Filter* filter = nullptr,
      size_t max_bundles_per_trickle = std::numeric_limits<size_t>::max(),
      pw::chrono::SystemClock::duration trickle_delay =
          chrono::SystemClock::duration::zero())
      : channel_id_(channel_id),
        error_handling_(error_handling),
        server_writer_(),
        log_entry_buffer_(log_entry_buffer),
        drop_count_ingress_error_(0),
        drop_count_slow_drain_(0),
        drop_count_small_outbound_buffer_(0),
        drop_count_small_stack_buffer_(0),
        drop_count_writer_error_(0),
        mutex_(mutex),
        filter_(filter),
        sequence_id_(0),
        max_bundles_per_trickle_(max_bundles_per_trickle),
        trickle_delay_(trickle_delay),
        no_writes_until_(chrono::SystemClock::now()),
        on_open_callback_(nullptr) {
    PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
  }

  // Not copyable.
  RpcLogDrain(const RpcLogDrain&) = delete;
  RpcLogDrain& operator=(const RpcLogDrain&) = delete;

  // Configures the drain with a new open server writer if the current one is
  // not open.
  //
  // Return values:
  // OK - Successfully set the new open writer.
  // FAILED_PRECONDITION - The given writer is not open.
  // ALREADY_EXISTS - an open writer is already set.
  Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_);

  // Accesses log entries and sends them via the writer. Expected to be called
  // frequently to avoid log drops. If the writer fails to send a packet with
  // multiple log entries, the entries are dropped and a drop message with the
  // count is sent. When error_handling is kCloseStreamOnWriterError, the stream
  // will automatically be closed and Flush will return the writer error.
  //
  // Precondition: the drain must be attached to a MultiSink.
  //
  // Return values:
  // OK - all entries were consumed.
  // ABORTED - there was an error writing the packet, and error_handling equals
  // `kCloseStreamOnWriterError`.
  Status Flush(ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_);

  // Writes entries as dictated by this drain's rate limiting configuration.
  //
  // Returns:
  //   A minimum wait duration before Trickle() will be ready to write more logs
  // If no duration is returned, this drain is caught up.
  std::optional<pw::chrono::SystemClock::duration> Trickle(
      ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_);

  // Ends RPC log stream without flushing.
  //
  // Return values:
  // OK - successfully closed the server writer.
  // FAILED_PRECONDITION - The given writer is not open.
  // Errors from the underlying writer send packet.
  Status Close() PW_LOCKS_EXCLUDED(mutex_);

  uint32_t channel_id() const { return channel_id_; }

  size_t max_bundles_per_trickle() const { return max_bundles_per_trickle_; }
  void set_max_bundles_per_trickle(size_t max_num_entries) {
    max_bundles_per_trickle_ = max_num_entries;
  }

  chrono::SystemClock::duration trickle_delay() const { return trickle_delay_; }
  void set_trickle_delay(chrono::SystemClock::duration trickle_delay) {
    trickle_delay_ = trickle_delay;
  }

  // Stores a function that is called when Open() is successful. Pass nulltpr to
  // clear it. This is useful in cases where the owner of the drain needs to be
  // notified that the drain was opened.
  void set_on_open_callback(pw::Function<void()>&& callback) {
    on_open_callback_ = std::move(callback);
  }

 private:
  enum class LogDrainState {
    kCaughtUp,
    kMoreEntriesRemaining,
  };

  LogDrainState SendLogs(size_t max_num_bundles,
                         ByteSpan encoding_buffer,
                         Status& encoding_status) PW_LOCKS_EXCLUDED(mutex_);

  // Fills the outgoing buffer with as many entries as possible.
  LogDrainState EncodeOutgoingPacket(log::LogEntries::MemoryEncoder& encoder,
                                     uint32_t& packed_entry_count_out)
      PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  const uint32_t channel_id_;
  const LogDrainErrorHandling error_handling_;
  rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_);
  const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
  uint32_t drop_count_ingress_error_ PW_GUARDED_BY(mutex_);
  uint32_t drop_count_slow_drain_ PW_GUARDED_BY(mutex_);
  uint32_t drop_count_small_outbound_buffer_ PW_GUARDED_BY(mutex_);
  uint32_t drop_count_small_stack_buffer_ PW_GUARDED_BY(mutex_);
  uint32_t drop_count_writer_error_ PW_GUARDED_BY(mutex_);
  sync::Mutex& mutex_;
  Filter* filter_;
  uint32_t sequence_id_;
  size_t max_bundles_per_trickle_;
  pw::chrono::SystemClock::duration trickle_delay_;
  pw::chrono::SystemClock::time_point no_writes_until_;
  pw::Function<void()> on_open_callback_;
};

}  // namespace pw::log_rpc
