// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once

#include <cinttypes>
#include <cstddef>
#include <limits>

#include "pw_assert/assert.h"
#include "pw_chrono/system_timer.h"
#include "pw_rpc/writer.h"
#include "pw_status/status.h"
#include "pw_stream/stream.h"
#include "pw_sync/interrupt_spin_lock.h"
#include "pw_sync/lock_annotations.h"
#include "pw_transfer/internal/chunk.h"
#include "pw_transfer/internal/chunk_data_buffer.h"
#include "pw_transfer/internal/config.h"
#include "pw_work_queue/work_queue.h"

namespace pw::transfer::internal {

class TransferParameters {
 public:
  constexpr TransferParameters(uint32_t pending_bytes,
                               uint32_t max_chunk_size_bytes)
      : pending_bytes_(pending_bytes),
        max_chunk_size_bytes_(max_chunk_size_bytes) {
    PW_ASSERT(pending_bytes > 0);
    PW_ASSERT(max_chunk_size_bytes > 0);
  }

  uint32_t pending_bytes() const { return pending_bytes_; }
  uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; }

 private:
  uint32_t pending_bytes_;
  uint32_t max_chunk_size_bytes_;
};

// Information about a single transfer.
class Context {
 public:
  enum Type : bool { kTransmit, kReceive };

  Context(const Context&) = delete;
  Context(Context&&) = delete;
  Context& operator=(const Context&) = delete;
  Context& operator=(Context&&) = delete;

  constexpr uint32_t transfer_id() const { return transfer_id_; }

  // True if the context has been used for a transfer (it has an ID).
  bool initialized() {
    state_lock_.lock();
    bool initialized = transfer_state_ != TransferState::kInactive;
    state_lock_.unlock();
    return initialized;
  }

  // True if the transfer is active.
  bool active() {
    state_lock_.lock();
    bool active = transfer_state_ >= TransferState::kData;
    state_lock_.unlock();
    return active;
  }

  // Starts a new transfer from an initialized context by sending the initial
  // transfer chunk. This is generally called from within a transfer client, as
  // it is unusual for a server to initiate a transfer.
  Status InitiateTransfer(const TransferParameters& max_parameters);

  // Extracts data from the provided chunk into the transfer context. This is
  // intended to be the immediate part of the transfer, run directly from within
  // the RPC message handler.
  //
  // Returns true if there is any deferred work required for this chunk (i.e.
  // ProcessChunk should be called to complete the operation).
  bool ReadChunkData(ChunkDataBuffer& buffer,
                     const TransferParameters& max_parameters,
                     const Chunk& chunk);

  // Handles the chunk from the previous invocation of ReadChunkData(). This
  // operation is intended to be deferred, running from a different context than
  // the RPC callback in which the chunk was received.
  void ProcessChunk(ChunkDataBuffer& buffer,
                    const TransferParameters& max_parameters) {
    if (type() == kTransmit) {
      ProcessTransmitChunk();
    } else {
      ProcessReceiveChunk(buffer, max_parameters);
    }

    if (active()) {
      timer_.InvokeAfter(chunk_timeout_);
    }
  }

 protected:
  using CompletionFunction = Status (*)(Context&, Status);

  Context(CompletionFunction on_completion)
      : transfer_id_(0),
        flags_(0),
        transfer_state_(TransferState::kInactive),
        retries_(0),
        max_retries_(0),
        stream_(nullptr),
        rpc_writer_(nullptr),
        offset_(0),
        pending_bytes_(0),
        max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
        last_chunk_offset_(0),
        timer_([this](chrono::SystemClock::time_point) { this->OnTimeout(); }),
        chunk_timeout_(chrono::SystemClock::duration::zero()),
        work_queue_(nullptr),
        on_completion_(on_completion) {}

  enum class TransferState : uint8_t {
    // This ServerContext has never been used for a transfer. It is available
    // for use for a transfer.
    kInactive,
    // A transfer completed and the final status chunk was sent. The Context is
    // available for use for a new transfer. A receive transfer uses this state
    // to allow a transmitter to retry its last chunk if the final status chunk
    // was dropped.
    kCompleted,
    // Sending or receiving data for an active transfer.
    kData,
    // Recovering after one or more chunks was dropped in an active transfer.
    kRecovery,
    // Hit a timeout and waiting for the timeout handler to run.
    kTimedOut,
  };

  constexpr Type type() const { return static_cast<Type>(flags_ & kFlagsType); }

  void set_transfer_state(TransferState state) {
    state_lock_.lock();
    transfer_state_ = state;
    state_lock_.unlock();
  }

  // Begins a new transmit transfer from this context.
  // Precondition: context is not active.
  void InitializeForTransmit(
      uint32_t transfer_id,
      work_queue::WorkQueue& work_queue,
      rpc::Writer& rpc_writer,
      stream::Reader& reader,
      chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout,
      uint8_t max_retries = cfg::kDefaultMaxRetries) {
    Initialize(kTransmit,
               transfer_id,
               work_queue,
               rpc_writer,
               reader,
               chunk_timeout,
               max_retries);
  }

  // Begins a new receive transfer from this context.
  // Precondition: context is not active.
  void InitializeForReceive(
      uint32_t transfer_id,
      work_queue::WorkQueue& work_queue,
      rpc::Writer& rpc_writer,
      stream::Writer& writer,
      chrono::SystemClock::duration chunk_timeout = cfg::kDefaultChunkTimeout,
      uint8_t max_retries = cfg::kDefaultMaxRetries) {
    Initialize(kReceive,
               transfer_id,
               work_queue,
               rpc_writer,
               writer,
               chunk_timeout,
               max_retries);
  }

  // Calculates the maximum size of actual data that can be sent within a single
  // client write transfer chunk, accounting for the overhead of the transfer
  // protocol and RPC system.
  //
  // Note: This function relies on RPC protocol internals. This is generally a
  // *bad* idea, but is necessary here due to limitations of the RPC system and
  // its asymmetric ingress and egress paths.
  //
  // TODO(frolv): This should be investigated further and perhaps addressed
  // within the RPC system, at the least through a helper function.
  uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
                             uint32_t channel_id) const;

 private:
  void Initialize(Type type,
                  uint32_t transfer_id,
                  work_queue::WorkQueue& work_queue,
                  rpc::Writer& rpc_writer,
                  stream::Stream& stream,
                  chrono::SystemClock::duration chunk_timeout,
                  uint8_t max_retries);

  stream::Reader& reader() {
    PW_DASSERT(active() && type() == kTransmit);
    return static_cast<stream::Reader&>(*stream_);
  }

  stream::Writer& writer() {
    PW_DASSERT(active() && type() == kReceive);
    return static_cast<stream::Writer&>(*stream_);
  }

  // Sends the first chunk in a transmit transfer.
  Status SendInitialTransmitChunk();

  // Updates the context's current parameters based on the fields in a chunk.
  void UpdateParameters(const TransferParameters& max_parameters,
                        const Chunk& chunk);

  // Functions which extract relevant data from a chunk into the context.
  bool ReadTransmitChunk(const TransferParameters& max_parameters,
                         const Chunk& chunk);
  bool ReadReceiveChunk(ChunkDataBuffer& buffer,
                        const TransferParameters& max_parameters,
                        const Chunk& chunk);

  // Functions which handle the last received chunk.
  void ProcessTransmitChunk();
  void ProcessReceiveChunk(ChunkDataBuffer& buffer,
                           const TransferParameters& max_parameters);

  // In a transmit transfer, sends the next data chunk from the local stream.
  // Returns status indicating what to do next:
  //
  //    OK - continue
  //    OUT_OF_RANGE - done for now
  //    other errors - abort transfer with this error
  //
  Status SendNextDataChunk();

  // In a receive transfer, processes the fields from a data chunk and stages
  // the data for a deferred write. Returns true if there is a deferred
  // operation to complete.
  bool HandleDataChunk(ChunkDataBuffer& buffer,
                       const TransferParameters& max_parameters,
                       const Chunk& chunk);

  // In a receive transfer, sends a parameters chunk telling the transmitter how
  // much data they can send.
  Status SendTransferParameters();

  // Updates the current receive transfer parameters from the provided object,
  // then sends them.
  Status UpdateAndSendTransferParameters(
      const TransferParameters& max_parameters);

  void SendStatusChunk(Status status);
  void FinishAndSendStatus(Status status);

  void CancelTimer() {
    timer_.Cancel();
    retries_ = 0;
  }

  // Timeout function invoked from the timer context. This may occur in an
  // interrupt, so no real work can be done. Instead, sets state to timed out
  // and adds a job to run the timeout handler.
  void OnTimeout();

  // The acutal timeout handler, invoked from within the work queue.
  void HandleTimeout();

  static constexpr uint8_t kFlagsType = 1 << 0;
  static constexpr uint8_t kFlagsDataSent = 1 << 1;

  uint32_t transfer_id_;
  uint8_t flags_;
  TransferState transfer_state_ PW_GUARDED_BY(state_lock_);
  uint8_t retries_;
  uint8_t max_retries_;

  sync::InterruptSpinLock state_lock_;

  stream::Stream* stream_;
  rpc::Writer* rpc_writer_;

  size_t offset_;
  size_t pending_bytes_;
  size_t max_chunk_size_bytes_;

  union {
    Status status_;             // Used when state is kCompleted.
    size_t last_chunk_offset_;  // Used in states kData and kRecovery.
  };

  // Timer used to handle timeouts waiting for chunks.
  chrono::SystemTimer timer_;
  chrono::SystemClock::duration chunk_timeout_;
  work_queue::WorkQueue* work_queue_;

  CompletionFunction on_completion_;
};

}  // namespace pw::transfer::internal
