// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#define PW_LOG_MODULE_NAME "TRN"

#include "pw_transfer/internal/context.h"

#include <mutex>

#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_status/try.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_varint/varint.h"

namespace pw::transfer::internal {

Status Context::InitiateTransfer(const TransferParameters& max_parameters) {
  PW_DCHECK(active());

  if (type() == kReceive) {
    // A receiver begins a new transfer with a parameters chunk telling the
    // transmitter what to send.
    PW_TRY(UpdateAndSendTransferParameters(max_parameters));
  } else {
    PW_TRY(SendInitialTransmitChunk());
  }

  timer_.InvokeAfter(chunk_timeout_);
  return OkStatus();
}

bool Context::ReadChunkData(ChunkDataBuffer& buffer,
                            const TransferParameters& max_parameters,
                            const Chunk& chunk) {
  CancelTimer();

  if (type() == kTransmit) {
    return ReadTransmitChunk(max_parameters, chunk);
  }
  return ReadReceiveChunk(buffer, max_parameters, chunk);
}

Status Context::SendInitialTransmitChunk() {
  // A transmitter begins a transfer by just sending its ID.
  internal::Chunk chunk = {};
  chunk.transfer_id = transfer_id_;

  Result<ConstByteSpan> result =
      EncodeChunk(chunk, rpc_writer_->PayloadBuffer());
  if (!result.ok()) {
    rpc_writer_->ReleasePayloadBuffer();
    return result.status();
  }

  return rpc_writer_->Write(*result);
}

void Context::UpdateParameters(const TransferParameters& max_parameters,
                               const Chunk& chunk) {
  offset_ = chunk.offset;

  if (chunk.pending_bytes.has_value()) {
    pending_bytes_ = chunk.pending_bytes.value();
  }

  if (chunk.max_chunk_size_bytes.has_value()) {
    max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes.value(),
                                     max_parameters.max_chunk_size_bytes());
  }
}

bool Context::ReadTransmitChunk(const TransferParameters& max_parameters,
                                const Chunk& chunk) {
  {
    std::lock_guard lock(state_lock_);

    switch (transfer_state_) {
      case TransferState::kInactive:
        PW_CRASH("Never should handle chunk while in kInactive state");

      case TransferState::kRecovery:
        PW_CRASH("Transmit transfer should not enter recovery state.");

      case TransferState::kCompleted:
        // Transfer is not pending; notify the sender.
        SendStatusChunk(Status::FailedPrecondition());
        transfer_state_ = TransferState::kInactive;
        return false;

      case TransferState::kData:
        // Continue with reading the chunk.
        break;

      case TransferState::kTimedOut:
        // Drop the chunk to let the timeout handler run.
        return false;
    }
  }

  if (!chunk.pending_bytes.has_value()) {
    // Malformed chunk.
    FinishAndSendStatus(Status::InvalidArgument());
    return false;
  }

  if (chunk.pending_bytes == 0u) {
    PW_LOG_ERROR("Transfer %d receiver requested 0 bytes (invalid); aborting",
                 static_cast<unsigned>(transfer_id_));
    FinishAndSendStatus(Status::Internal());
    return false;
  }

  // If the offsets don't match, attempt to seek on the reader. Not all readers
  // support seeking; abort with UNIMPLEMENTED if this handler doesn't.
  if (offset_ != chunk.offset) {
    if (Status seek_status = reader().Seek(chunk.offset); !seek_status.ok()) {
      PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
                  static_cast<unsigned>(transfer_id_),
                  static_cast<unsigned>(chunk.offset),
                  seek_status.code());

      // Remap status codes to return one of the following:
      //
      //   INTERNAL: invalid seek, never should happen
      //   DATA_LOSS: the reader is in a bad state
      //   UNIMPLEMENTED: seeking is not supported
      //
      if (seek_status.IsOutOfRange()) {
        seek_status = Status::Internal();
      } else if (!seek_status.IsUnimplemented()) {
        seek_status = Status::DataLoss();
      }

      FinishAndSendStatus(seek_status);
      return false;
    }
  }

  UpdateParameters(max_parameters, chunk);
  return true;
}

void Context::ProcessTransmitChunk() {
  Status status;

  while ((status = SendNextDataChunk()).ok()) {
    // Continue until all requested bytes are sent.
  }

  // If all bytes are successfully sent, SendNextChunk will return OUT_OF_RANGE.
  if (!status.IsOutOfRange()) {
    FinishAndSendStatus(status);
  }
}

bool Context::ReadReceiveChunk(ChunkDataBuffer& buffer,
                               const TransferParameters& max_parameters,
                               const Chunk& chunk) {
  state_lock_.lock();

  switch (transfer_state_) {
    case TransferState::kInactive:
      PW_CRASH("Never should handle chunk while in kInactive state");

    case TransferState::kTimedOut:
      // Drop the chunk to let the timeout handler run.
      state_lock_.unlock();
      return false;

    case TransferState::kCompleted: {
      // If the chunk is a repeat of the final chunk, resend the status chunk,
      // which apparently was lost. Otherwise, send FAILED_PRECONDITION since
      // this is for a non-pending transfer.
      Status response = status_;
      if (!chunk.IsFinalTransmitChunk()) {
        response = Status::FailedPrecondition();
        // Sender should only should retry with final chunk
        transfer_state_ = TransferState::kInactive;
      }
      state_lock_.unlock();
      SendStatusChunk(response);
      return false;
    }

    case TransferState::kData:
      state_lock_.unlock();
      if (!HandleDataChunk(buffer, max_parameters, chunk)) {
        return false;
      }
      break;

    case TransferState::kRecovery:
      if (chunk.offset != offset_) {
        state_lock_.unlock();

        if (last_chunk_offset_ == chunk.offset) {
          PW_LOG_DEBUG(
              "Transfer %u received repeated offset %u; retry detected, "
              "resending transfer parameters",
              static_cast<unsigned>(transfer_id_),
              static_cast<unsigned>(chunk.offset));
          if (!UpdateAndSendTransferParameters(max_parameters).ok()) {
            return false;
          }
        } else {
          PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
                       static_cast<unsigned>(transfer_id_),
                       static_cast<unsigned>(offset_),
                       static_cast<unsigned>(chunk.offset));
        }

        last_chunk_offset_ = chunk.offset;
        return false;
      }

      PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
                   static_cast<unsigned>(transfer_id_),
                   static_cast<unsigned>(offset_));

      transfer_state_ = TransferState::kData;
      state_lock_.unlock();

      if (!HandleDataChunk(buffer, max_parameters, chunk)) {
        return false;
      }
      break;
  }

  // Update the last offset seen so that retries can be detected.
  last_chunk_offset_ = chunk.offset;
  return true;
}

void Context::ProcessReceiveChunk(ChunkDataBuffer& buffer,
                                  const TransferParameters& max_parameters) {
  // Write staged data from the buffer to the stream.
  if (!buffer.empty()) {
    if (Status status = writer().Write(buffer); !status.ok()) {
      PW_LOG_ERROR(
          "Transfer %u write of %u B chunk failed with status %u; aborting "
          "with DATA_LOSS",
          static_cast<unsigned>(transfer_id_),
          static_cast<unsigned>(buffer.size()),
          status.code());
      FinishAndSendStatus(Status::DataLoss());
      return;
    }
  }

  // When the client sets remaining_bytes to 0, it indicates completion of the
  // transfer. Acknowledge the completion through a status chunk and clean up.
  if (buffer.last_chunk()) {
    FinishAndSendStatus(OkStatus());
    return;
  }

  // Update the transfer state.
  offset_ += buffer.size();
  pending_bytes_ -= buffer.size();

  // TODO(frolv): Release the buffer.

  if (pending_bytes_ == 0u) {
    // All pending data has been received. Send a new parameters chunk to start
    // the next batch.
    UpdateAndSendTransferParameters(max_parameters);
  }
}

Status Context::SendNextDataChunk() {
  if (pending_bytes_ == 0) {
    return Status::OutOfRange();
  }

  ByteSpan buffer = rpc_writer_->PayloadBuffer();

  // Begin by doing a partial encode of all the metadata fields, leaving the
  // buffer with usable space for the chunk data at the end.
  transfer::Chunk::MemoryEncoder encoder{buffer};
  encoder.WriteTransferId(transfer_id_).IgnoreError();
  encoder.WriteOffset(offset_).IgnoreError();

  // Reserve space for the data proto field overhead and use the remainder of
  // the buffer for the chunk data.
  size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */;

  ByteSpan data_buffer = buffer.subspan(reserved_size);
  size_t max_bytes_to_send = std::min(pending_bytes_, max_chunk_size_bytes_);

  if (max_bytes_to_send < data_buffer.size()) {
    data_buffer = data_buffer.first(max_bytes_to_send);
  }

  Result<ByteSpan> data = reader().Read(data_buffer);
  if (data.status().IsOutOfRange()) {
    // No more data to read.
    encoder.WriteRemainingBytes(0).IgnoreError();
    pending_bytes_ = 0;
  } else if (data.ok()) {
    encoder.WriteData(data.value()).IgnoreError();
    last_chunk_offset_ = offset_;
    offset_ += data.value().size();
    pending_bytes_ -= data.value().size();
  } else {
    PW_LOG_ERROR("Transfer %u Read() failed with status %u",
                 static_cast<unsigned>(transfer_id_),
                 data.status().code());
    rpc_writer_->ReleasePayloadBuffer();
    return Status::DataLoss();
  }

  if (!encoder.status().ok()) {
    PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
                 static_cast<unsigned>(transfer_id_));
    rpc_writer_->ReleasePayloadBuffer();
    return Status::Internal();
  }

  if (const Status status = rpc_writer_->Write(encoder); !status.ok()) {
    PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
                 static_cast<unsigned>(transfer_id_),
                 status.code());
    return Status::DataLoss();
  }

  flags_ |= kFlagsDataSent;
  return data.status();
}

bool Context::HandleDataChunk(ChunkDataBuffer& buffer,
                              const TransferParameters& max_parameters,
                              const Chunk& chunk) {
  if (chunk.data.size() > pending_bytes_) {
    // End the transfer, as this indcates a bug with the client implementation
    // where it doesn't respect pending_bytes. Trying to recover from here
    // could potentially result in an infinite transfer loop.
    PW_LOG_ERROR(
        "Received more data than what was requested; terminating transfer.");
    FinishAndSendStatus(Status::Internal());
    return false;
  }

  if (chunk.offset != offset_) {
    // Bad offset; reset pending_bytes to send another parameters chunk.
    PW_LOG_DEBUG(
        "Transfer %u expected offset %u, received %u; entering recovery state",
        static_cast<unsigned>(transfer_id_),
        static_cast<unsigned>(offset_),
        static_cast<unsigned>(chunk.offset));
    UpdateAndSendTransferParameters(max_parameters);
    set_transfer_state(TransferState::kRecovery);

    // Return false as there is no immediate deferred work to complete. The
    // transfer must wait for the next data chunk to be sent by the transmitter.
    return false;
  }

  // Write the chunk data to the buffer to be processed later. If the chunk has
  // no data, this will clear the buffer.
  buffer.Write(chunk.data, chunk.IsFinalTransmitChunk());
  return true;
}

Status Context::SendTransferParameters() {
  const internal::Chunk parameters = {
      .transfer_id = transfer_id_,
      .pending_bytes = pending_bytes_,
      .max_chunk_size_bytes = max_chunk_size_bytes_,
      .offset = static_cast<uint32_t>(offset_),
  };

  PW_LOG_DEBUG(
      "Transfer %u sending transfer parameters: "
      "offset=%u, pending_bytes=%u, chunk_size=%u",
      static_cast<unsigned>(transfer_id_),
      static_cast<unsigned>(offset_),
      static_cast<unsigned>(pending_bytes_),
      static_cast<unsigned>(max_chunk_size_bytes_));

  // If the parameters can't be encoded or sent, it most likely indicates a
  // transport-layer issue, so there isn't much that can be done by the transfer
  // service. The client will time out and can try to restart the transfer.
  Result<ConstByteSpan> data =
      internal::EncodeChunk(parameters, rpc_writer_->PayloadBuffer());
  if (!data.ok()) {
    PW_LOG_ERROR("Failed to encode parameters for transfer %u: %d",
                 static_cast<unsigned>(parameters.transfer_id),
                 data.status().code());
    rpc_writer_->ReleasePayloadBuffer();
    FinishAndSendStatus(Status::Internal());
    return Status::Internal();
  }

  if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
    PW_LOG_ERROR("Failed to write parameters for transfer %u: %d",
                 static_cast<unsigned>(parameters.transfer_id),
                 status.code());
    if (on_completion_) {
      on_completion_(*this, Status::Internal());
    }
    return Status::Internal();
  }

  return OkStatus();
}

Status Context::UpdateAndSendTransferParameters(
    const TransferParameters& max_parameters) {
  const size_t write_limit = writer().ConservativeWriteLimit();
  if (write_limit == 0) {
    PW_LOG_WARN(
        "Transfer %u writer returned 0 from ConservativeWriteLimit(); cannot "
        "continue, aborting with RESOURCE_EXHAUSTED",
        static_cast<unsigned>(transfer_id_));
    FinishAndSendStatus(Status::ResourceExhausted());
    return Status::ResourceExhausted();
  }

  pending_bytes_ = std::min(max_parameters.pending_bytes(),
                            static_cast<uint32_t>(write_limit));

  max_chunk_size_bytes_ = MaxWriteChunkSize(
      max_parameters.max_chunk_size_bytes(), rpc_writer_->channel_id());

  return SendTransferParameters();
}

void Context::Initialize(Type type,
                         uint32_t transfer_id,
                         work_queue::WorkQueue& work_queue,
                         rpc::Writer& rpc_writer,
                         stream::Stream& stream,
                         chrono::SystemClock::duration chunk_timeout,
                         uint8_t max_retries) {
  PW_DCHECK(!active());
  PW_CHECK(state_lock_.try_lock());

  transfer_id_ = transfer_id;
  flags_ = static_cast<uint8_t>(type);
  transfer_state_ = TransferState::kData;
  retries_ = 0;
  max_retries_ = max_retries;

  rpc_writer_ = &rpc_writer;
  stream_ = &stream;

  offset_ = 0;
  pending_bytes_ = 0;
  max_chunk_size_bytes_ = std::numeric_limits<uint32_t>::max();

  last_chunk_offset_ = 0;
  chunk_timeout_ = chunk_timeout;
  work_queue_ = &work_queue;

  state_lock_.unlock();
}

void Context::SendStatusChunk(Status status) {
  internal::Chunk chunk = {};
  chunk.transfer_id = transfer_id_;
  chunk.status = status.code();

  Result<ConstByteSpan> result =
      internal::EncodeChunk(chunk, rpc_writer_->PayloadBuffer());

  if (!result.ok()) {
    PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
                 static_cast<unsigned>(transfer_id_));
    rpc_writer_->ReleasePayloadBuffer();
    return;
  }

  if (!rpc_writer_->Write(result.value()).ok()) {
    PW_LOG_ERROR("Failed to send final chunk for transfer %u",
                 static_cast<unsigned>(transfer_id_));
    return;
  }
}

void Context::FinishAndSendStatus(Status status) {
  CancelTimer();

  PW_LOG_INFO("Transfer %u completed with status %u; sending final chunk",
              static_cast<unsigned>(transfer_id_),
              status.code());

  status_ = status;

  if (on_completion_ != nullptr) {
    status.Update(on_completion_(*this, status));
  }

  SendStatusChunk(status);
  set_transfer_state(TransferState::kCompleted);
}

void Context::OnTimeout() {
  std::lock_guard lock(state_lock_);
  transfer_state_ = TransferState::kTimedOut;

  const Status status = work_queue_->PushWork([this]() {
    HandleTimeout();
    if (active()) {
      timer_.InvokeAfter(chunk_timeout_);
    }
  });

  if (!status.ok()) {
    PW_LOG_ERROR("Transfer %u failed to push timeout handler to work queue",
                 static_cast<unsigned>(transfer_id_));

    // If the work queue is full, there is no way to keep the transfer alive or
    // notify the other end of the failure. Simply end the transfer; if any more
    // chunks are received, an error will be sent then.
    status_ = Status::DeadlineExceeded();
    transfer_state_ = TransferState::kCompleted;
  }
}

void Context::HandleTimeout() {
  state_lock_.lock();
  PW_DCHECK(transfer_state_ == TransferState::kTimedOut);

  if (retries_ == max_retries_) {
    PW_LOG_ERROR("Transfer %u failed to receive a chunk after %u retries.",
                 static_cast<unsigned>(transfer_id_),
                 static_cast<unsigned>(retries_));
    PW_LOG_ERROR("Canceling transfer.");
    state_lock_.unlock();
    FinishAndSendStatus(Status::DeadlineExceeded());
    return;
  }

  ++retries_;
  transfer_state_ = TransferState::kData;
  state_lock_.unlock();

  if (type() == kReceive) {
    // Resend the most recent transfer parameters. SendTransferParameters()
    // internally handles failures, so its status can be ignored.
    PW_LOG_DEBUG(
        "Receive transfer %u timed out waiting for chunk; resending parameters",
        static_cast<unsigned>(transfer_id_));
    SendTransferParameters().IgnoreError();
    return;
  }

  // In a transmit, if a data chunk has not yet been sent, the initial transfer
  // parameters did not arrive from the receiver. Resend the initial chunk.
  if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
    PW_LOG_DEBUG(
        "Transmit transfer %u timed out waiting for initial parameters",
        static_cast<unsigned>(transfer_id_));
    SendInitialTransmitChunk();
    return;
  }

  // Otherwise, resend the most recent chunk. If the reader doesn't support
  // seeking, this isn't possible, so just terminate the transfer immediately.
  if (!reader().Seek(last_chunk_offset_).ok()) {
    PW_LOG_ERROR("Transmit transfer %d timed out waiting for new parameters.",
                 static_cast<unsigned>(transfer_id_));
    PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
    FinishAndSendStatus(Status::DeadlineExceeded());
    return;
  }

  // Rewind the transfer position and resend the chunk.
  size_t last_size_sent = offset_ - last_chunk_offset_;
  offset_ = last_chunk_offset_;
  pending_bytes_ += last_size_sent;

  ProcessTransmitChunk();
}

uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
                                    uint32_t channel_id) const {
  // Start with the user-provided maximum chunk size, which should be the usable
  // payload length on the RPC ingress path after any transport overhead.
  ssize_t max_size = max_chunk_size_bytes;

  // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
  //
  //   type:       1 byte key, 1 byte value (CLIENT_STREAM)
  //   channel_id: 1 byte key, varint value (calculate from stream)
  //   service_id: 1 byte key, 4 byte value
  //   method_id:  1 byte key, 4 byte value
  //   payload:    1 byte key, varint length (remaining space)
  //   status:     0 bytes (not set in stream packets)
  //
  //   TOTAL: 14 bytes + encoded channel_id size + encoded payload length
  //
  max_size -= 14;
  max_size -= varint::EncodedSize(channel_id);
  max_size -= varint::EncodedSize(max_size);

  // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
  // overhead calculation will be moved into an RPC helper to avoid having
  // pw_transfer depend on RPC internals.
  max_size -= 5;

  // Subtract the transfer service overhead for a client write chunk
  // (pw_transfer/transfer.proto).
  //
  //   transfer_id: 1 byte key, varint value (calculate)
  //   offset:      1 byte key, varint value (calculate)
  //   data:        1 byte key, varint length (remaining space)
  //
  //   TOTAL: 3 + encoded transfer_id + encoded offset + encoded data length
  //
  size_t max_offset_in_window = offset_ + pending_bytes_;
  max_size -= 3;
  max_size -= varint::EncodedSize(transfer_id_);
  max_size -= varint::EncodedSize(max_offset_in_window);
  max_size -= varint::EncodedSize(max_size);

  // A resulting value of zero (or less) renders write transfers unusable, as
  // there is no space to send any payload. This should be considered a
  // programmer error in the transfer service setup.
  PW_CHECK_INT_GT(
      max_size,
      0,
      "Transfer service maximum chunk size is too small to fit a payload. "
      "Increase max_chunk_size_bytes to support write transfers.");

  return max_size;
}

}  // namespace pw::transfer::internal
