blob: 395c19c59f0d0f9c2a201b5deeec8af4a915070e [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package dev.pigweed.pw_transfer;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
import com.google.common.util.concurrent.SettableFuture;
import dev.pigweed.pw_log.Logger;
import dev.pigweed.pw_rpc.Status;
import java.time.Duration;
import java.time.Instant;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
/** Base class for tracking the state of a read or write transfer. */
abstract class Transfer<T> {
private static final Logger logger = Logger.forClass(Transfer.class);
// Largest nanosecond instant. Used to block indefinitely when no transfers are pending.
static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE);
private final int resourceId;
private final int sessionId;
private final TransferEventHandler.TransferInterface eventHandler;
private final SettableFuture<T> future;
private final int maxRetries;
private final int timeoutMillis;
private final int initialTimeoutMillis;
private final Consumer<TransferProgress> progressCallback;
private final BooleanSupplier shouldAbortCallback;
private final Instant startTime;
private State state = new Inactive();
private Instant deadline = NO_TIMEOUT;
private boolean isCleanedUp = false;
// The number of times this transfer has retried due to an RPC disconnection. Limit this to
// maxRetries to prevent repeated crashes if reading to / writing from a particular transfer is
// causing crashes.
private int disconnectionRetries = 0;
/**
* Creates a new read or write transfer.
* @param resourceId The resource ID of the transfer
* @param eventHandler Interface to use to send a chunk.
* @param timeoutMillis Maximum time to wait for a chunk from the server.
* @param initialTimeoutMillis Maximum time to wait for the first chunk from the server.
* @param maxRetries Number of times to retry due to a timeout or RPC error.
* @param progressCallback Called each time a packet is sent.
* @param shouldAbortCallback BooleanSupplier that returns true if a transfer should be aborted.
*/
Transfer(int resourceId,
TransferEventHandler.TransferInterface eventHandler,
int timeoutMillis,
int initialTimeoutMillis,
int maxRetries,
Consumer<TransferProgress> progressCallback,
BooleanSupplier shouldAbortCallback) {
this.resourceId = resourceId;
this.sessionId = resourceId; // TODO(b/227211777): Distinguish between resource & session ID
this.eventHandler = eventHandler;
this.future = SettableFuture.create();
this.timeoutMillis = timeoutMillis;
this.initialTimeoutMillis = initialTimeoutMillis;
this.maxRetries = maxRetries;
this.progressCallback = progressCallback;
this.shouldAbortCallback = shouldAbortCallback;
// If the future is cancelled, tell the TransferEventHandler to cancel the transfer.
future.addListener(() -> {
if (future.isCancelled()) {
eventHandler.cancelTransfer(this);
}
}, directExecutor());
startTime = Instant.now();
}
final int getResourceId() {
return resourceId;
}
final int getSessionId() {
return sessionId;
}
final Instant getDeadline() {
return deadline;
}
final void setNextChunkTimeout() {
deadline = Instant.now().plusMillis(timeoutMillis);
}
private void setInitialTimeout() {
deadline = Instant.now().plusMillis(initialTimeoutMillis);
}
final void setTimeoutMicros(int timeoutMicros) {
deadline = Instant.now().plusNanos((long) timeoutMicros * 1000);
}
final SettableFuture<T> getFuture() {
return future;
}
final void start() {
logger.atFine().log(
"Transfer %d for resource %d starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires",
getSessionId(),
getResourceId(),
timeoutMillis,
initialTimeoutMillis,
maxRetries);
if (!sendChunk(getChunkForRetry())) {
return; // Sending failed, transfer is cancelled
}
setState(getWaitingForDataState());
setInitialTimeout();
}
/** Processes an incoming chunk from the server. */
final void handleChunk(Chunk chunk) {
// Since a packet has been received, don't allow retries on disconnection; abort instead.
disconnectionRetries = Integer.MAX_VALUE;
if (chunk.hasStatus()) {
logger.atFinest().log("Event: handle final chunk");
state.handleFinalChunk(chunk);
} else {
logger.atFinest().log("Event: handle data chunk");
state.handleDataChunk(chunk);
}
}
final void handleTimeoutIfDeadlineExceeded() {
if (Instant.now().isAfter(deadline)) {
logger.atFinest().log("Event: handleTimeout since %s is after %s", Instant.now(), deadline);
state.handleTimeout();
}
}
final void handleTermination() {
state.handleTermination();
}
final void handleCancellation() {
state.handleCancellation();
}
/** Restarts a transfer after an RPC disconnection. */
final void handleDisconnection() {
// disconnectionRetries is set to Int.MAX_VALUE when a packet is received to prevent retries
// after the initial packet.
if (disconnectionRetries++ < maxRetries) {
logger.atFine().log("Restarting the pw_transfer RPC for transfer %d (attempt %d/%d)",
sessionId,
disconnectionRetries,
maxRetries);
if (sendChunk(getChunkForRetry())) {
setInitialTimeout();
}
} else {
cleanUp(new TransferError(
"Transfer " + sessionId + " restarted " + maxRetries + " times, aborting",
Status.INTERNAL));
}
}
/** Returns the State to enter immediately after sending the first packet. */
abstract State getWaitingForDataState();
/**
* Returns the chunk to send for a retry. Returns the initial chunk if no chunks have been sent.
*/
abstract Chunk getChunkForRetry();
/** Sets the result for the future after a successful transfer. */
abstract void setFutureResult();
final Chunk.Builder newChunk(Chunk.Type type) {
// TODO(frolv): Properly set the session ID after it is configured by the server.
return Chunk.newBuilder().setTransferId(getSessionId()).setType(type);
}
final void setState(State newState) {
if (newState != state) {
logger.atFinest().log(
"Transfer %d state %s -> %s", getSessionId(), state.getName(), newState.getName());
}
state = newState;
}
/** Sends a chunk. Returns true if sent, false if sending failed and the transfer was aborted. */
final boolean sendChunk(Chunk chunk) {
if (shouldAbortCallback.getAsBoolean()) {
logger.atWarning().log("Abort signal received.");
cleanUp(new TransferError(sessionId, Status.ABORTED));
return false;
}
try {
eventHandler.sendChunk(chunk);
return true;
} catch (TransferError transferError) {
cleanUp(transferError);
return false;
}
}
final boolean sendChunk(Chunk.Builder chunk) {
return sendChunk(chunk.build());
}
/** Performs final cleanup of a completed transfer. No packets are sent to the server. */
private void cleanUp(Status status) {
eventHandler.unregisterTransfer(sessionId);
logger.atInfo().log("Transfer %d completed with status %s", sessionId, status);
if (status.ok()) {
setFutureResult();
} else {
future.setException(new TransferError(sessionId, status));
}
isCleanedUp = true;
}
/** Finishes the transfer due to an exception. No packets are sent to the server. */
final void cleanUp(TransferError exception) {
eventHandler.unregisterTransfer(sessionId);
logger.atWarning().withCause(exception).log("Transfer %d terminated with exception", sessionId);
future.setException(exception);
isCleanedUp = true;
}
/** Sends a status chunk to the server and finishes the transfer. */
final void sendFinalChunk(Status status) {
logger.atFine().log("Sending final chunk for transfer %d with status %s", sessionId, status);
// If the transfer was completed due to concurrently handling chunks, don't send.
if (isCleanedUp) {
logger.atFine().log("Skipping sending final chunk on already-completed transfer");
return;
}
// Only call finish() if the sendChunk was successful. If it wasn't, the exception would have
// already terminated the transfer.
if (sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status.code()))) {
cleanUp(status);
}
}
/** Invokes the transfer progress callback and logs the progress. */
final void updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes) {
TransferProgress progress =
TransferProgress.create(bytesSent, bytesConfirmedReceived, totalSizeBytes);
progressCallback.accept(progress);
long durationNanos = Duration.between(startTime, Instant.now()).toNanos();
long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos);
logger.atFine().log("Transfer %d progress: "
+ "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s",
sessionId,
progress.percentReceived(),
bytesSent,
bytesConfirmedReceived,
totalSizeBytes == UNKNOWN_TRANSFER_SIZE ? "unknown" : totalSizeBytes,
totalRate);
}
/** Represents a state in the transfer state machine. */
abstract class State {
private String getName() {
return getClass().getSimpleName();
}
/** Called to handle a non-final chunk for this transfer. */
void handleDataChunk(Chunk chunk) {
logger.atFine().log(
"Transfer %d [%s state]: Received unexpected data chunk", getSessionId(), getName());
}
/** Called to handle the final chunk for this transfer. */
void handleFinalChunk(Chunk chunk) {
Status status = Status.fromCode(chunk.getStatus());
if (status != null) {
cleanUp(status);
} else {
logger.atWarning().log("Received invalid status value %d", chunk.getStatus());
sendFinalChunk(Status.INVALID_ARGUMENT);
}
setState(new Completed());
}
/** Called when this transfer's deadline expires. */
void handleTimeout() {
logger.atFine().log("Transfer %d [%s state]: Ignoring timeout", getSessionId(), getName());
}
/** Called if the transfer is cancelled by the user. */
void handleCancellation() {
sendFinalChunk(Status.CANCELLED);
setState(new Completed());
}
/** Called when the transfer thread is shutting down. */
void handleTermination() {
sendFinalChunk(Status.ABORTED);
setState(new Completed());
}
}
private class Inactive extends State {}
/** Recovering from an expired timeout. */
class Recovery extends State {
private int retries;
@Override
void handleDataChunk(Chunk chunk) {
setState(getWaitingForDataState());
state.handleDataChunk(chunk);
}
@Override
void handleTimeout() {
if (retries < maxRetries) {
logger.atFiner().log("Transfer %d received no chunks for %d ms; retrying %d/%d",
getSessionId(),
timeoutMillis,
retries,
maxRetries);
sendChunk(getChunkForRetry());
retries += 1;
setNextChunkTimeout();
return;
}
sendFinalChunk(Status.DEADLINE_EXCEEDED);
setState(new Completed());
}
}
/** Transfer completed. Do nothing if the transfer is terminated or cancelled. */
class Completed extends State {
Completed() {
deadline = NO_TIMEOUT;
}
@Override
void handleTermination() {}
@Override
void handleCancellation() {}
}
}