// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

package dev.pigweed.pw_transfer;

import static java.lang.Math.min;

import com.google.protobuf.ByteString;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

class WriteTransfer extends Transfer<Void> {
  private static final Logger logger = Logger.forClass(WriteTransfer.class);

  // Short chunk delays often turn into much longer delays. Ignore delays <10ms to avoid impacting
  // performance.
  private static final int MIN_CHUNK_DELAY_TO_SLEEP_MICROS = 10000;

  private int maxChunkSizeBytes = 0;
  private int minChunkDelayMicros = 0;
  private int sentOffset;
  private long totalDroppedBytes;

  private final byte[] data;

  protected WriteTransfer(int resourceId,
      ProtocolVersion desiredProtocolVersion,
      TransferInterface transferManager,
      TransferTimeoutSettings timeoutSettings,
      byte[] data,
      Consumer<TransferProgress> progressCallback,
      BooleanSupplier shouldAbortCallback) {
    super(resourceId,
        desiredProtocolVersion,
        transferManager,
        timeoutSettings,
        progressCallback,
        shouldAbortCallback);
    this.data = data;
  }

  @Override
  void prepareInitialChunk(VersionedChunk.Builder chunk) {
    chunk.setRemainingBytes(data.length);
  }

  @Override
  State getWaitingForDataState() {
    return new WaitingForTransferParameters();
  }

  private class WaitingForTransferParameters extends ActiveState {
    @Override
    public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
      updateTransferParameters(chunk);
    }
  }

  /** Transmitting a transfer window. */
  private class Transmitting extends ActiveState {
    private final int windowStartOffset;
    private final int windowEndOffset;

    Transmitting(int windowStartOffset, int windowEndOffset) {
      this.windowStartOffset = windowStartOffset;
      this.windowEndOffset = windowEndOffset;
    }

    @Override
    public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
      updateTransferParameters(chunk);
    }

    @Override
    public void handleTimeout() throws TransferAbortedException {
      ByteString chunkData = ByteString.copyFrom(
          data, sentOffset, min(windowEndOffset - sentOffset, maxChunkSizeBytes));

      if (VERBOSE_LOGGING) {
        logger.atFinest().log("%s sending bytes %d-%d (%d B chunk, max size %d B)",
            WriteTransfer.this,
            sentOffset,
            sentOffset + chunkData.size() - 1,
            chunkData.size(),
            maxChunkSizeBytes);
      }

      sendChunk(buildDataChunk(chunkData));

      sentOffset += chunkData.size();
      updateProgress(sentOffset, windowStartOffset, data.length);

      if (sentOffset < windowEndOffset) {
        setTimeoutMicros(minChunkDelayMicros);
        return; // Keep transmitting packets
      }
      setNextChunkTimeout();
      changeState(new WaitingForTransferParameters());
    }
  }

  @Override
  VersionedChunk getChunkForRetry() {
    // The service should resend transfer parameters if there was a timeout. In case the service
    // doesn't support timeouts and to avoid unnecessary waits, resend the last chunk. If there
    // were drops, this will trigger a transfer parameters update.
    return getLastChunkSent();
  }

  @Override
  void setFutureResult() {
    updateProgress(data.length, data.length, data.length);
    getFuture().set(null);
  }

  private void updateTransferParameters(VersionedChunk chunk) throws TransferAbortedException {
    logger.atFiner().log("%s received new chunk %s", this, chunk);

    if (chunk.offset() > data.length) {
      setStateTerminatingAndSendFinalChunk(Status.OUT_OF_RANGE);
      return;
    }

    int windowEndOffset = min(chunk.windowEndOffset(), data.length);
    if (chunk.requestsTransmissionFromOffset()) {
      long droppedBytes = sentOffset - chunk.offset();
      if (droppedBytes > 0) {
        totalDroppedBytes += droppedBytes;
        logger.atFine().log("%s retransmitting %d B (%d retransmitted of %d sent)",
            this,
            droppedBytes,
            totalDroppedBytes,
            sentOffset);
      }
      sentOffset = chunk.offset();
    } else if (windowEndOffset <= sentOffset) {
      logger.atFiner().log("%s ignoring old rolling window packet", this);
      setNextChunkTimeout();
      return; // Received an old rolling window packet, ignore it.
    }

    // Update transfer parameters if they're set.
    chunk.maxChunkSizeBytes().ifPresent(size -> maxChunkSizeBytes = size);
    chunk.minDelayMicroseconds().ifPresent(delay -> {
      if (delay > MIN_CHUNK_DELAY_TO_SLEEP_MICROS) {
        minChunkDelayMicros = delay;
      }
    });

    if (maxChunkSizeBytes == 0) {
      if (windowEndOffset == sentOffset) {
        logger.atWarning().log("%s server requested 0 bytes; aborting", this);
        setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
        return;
      }
      // Default to sending the entire window if the max chunk size is not specified (or is 0).
      maxChunkSizeBytes = windowEndOffset - sentOffset;
    }

    // Enter the transmitting state and immediately send the first packet
    changeState(new Transmitting(chunk.offset(), windowEndOffset)).handleTimeout();
  }

  private VersionedChunk buildDataChunk(ByteString chunkData) {
    VersionedChunk.Builder chunk =
        newChunk(Chunk.Type.DATA).setOffset(sentOffset).setData(chunkData);

    // If this is the last data chunk, setRemainingBytes to 0.
    if (sentOffset + chunkData.size() == data.length) {
      logger.atFiner().log("%s sending final chunk with %d B", this, chunkData.size());
      chunk.setRemainingBytes(0);
    }
    return chunk.build();
  }
}
