blob: 7c7fec91bc47a46883f81c9b61acd26e566022c6 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <cinttypes>
#include <cstddef>
#include <limits>
#include <optional>
#include "pw_assert/assert.h"
#include "pw_chrono/system_clock.h"
#include "pw_rpc/writer.h"
#include "pw_status/status.h"
#include "pw_stream/stream.h"
#include "pw_transfer/internal/chunk.h"
#include "pw_transfer/internal/event.h"
#include "pw_transfer/internal/protocol.h"
#include "pw_transfer/rate_estimate.h"
namespace pw::transfer::internal {
class TransferThread;
class TransferParameters {
public:
constexpr TransferParameters(uint32_t pending_bytes,
uint32_t max_chunk_size_bytes,
uint32_t extend_window_divisor)
: pending_bytes_(pending_bytes),
max_chunk_size_bytes_(max_chunk_size_bytes),
extend_window_divisor_(extend_window_divisor) {
PW_ASSERT(pending_bytes > 0);
PW_ASSERT(max_chunk_size_bytes > 0);
PW_ASSERT(extend_window_divisor > 1);
}
uint32_t pending_bytes() const { return pending_bytes_; }
void set_pending_bytes(uint32_t pending_bytes) {
pending_bytes_ = pending_bytes;
}
uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; }
void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) {
max_chunk_size_bytes_ = max_chunk_size_bytes;
}
uint32_t extend_window_divisor() const { return extend_window_divisor_; }
void set_extend_window_divisor(uint32_t extend_window_divisor) {
PW_DASSERT(extend_window_divisor > 1);
extend_window_divisor_ = extend_window_divisor;
}
private:
uint32_t pending_bytes_;
uint32_t max_chunk_size_bytes_;
uint32_t extend_window_divisor_;
};
// Information about a single transfer.
class Context {
public:
static constexpr uint32_t kUnassignedSessionId = 0;
Context(const Context&) = delete;
Context(Context&&) = delete;
Context& operator=(const Context&) = delete;
Context& operator=(Context&&) = delete;
constexpr uint32_t session_id() const { return session_id_; }
constexpr uint32_t resource_id() const { return resource_id_; }
// True if the context has been used for a transfer (it has an ID).
bool initialized() const {
return transfer_state_ != TransferState::kInactive;
}
// True if the transfer is active.
bool active() const { return transfer_state_ >= TransferState::kInitiating; }
std::optional<chrono::SystemClock::time_point> timeout() const {
return active() && next_timeout_ != kNoTimeout
? std::optional(next_timeout_)
: std::nullopt;
}
// Returns true if the transfer's most recently set timeout has passed.
bool timed_out() const {
std::optional<chrono::SystemClock::time_point> next_timeout = timeout();
return next_timeout.has_value() &&
chrono::SystemClock::now() >= next_timeout.value();
}
// Processes an event for this transfer.
void HandleEvent(const Event& event);
protected:
~Context() = default;
constexpr Context()
: session_id_(kUnassignedSessionId),
resource_id_(0),
desired_protocol_version_(ProtocolVersion::kUnknown),
configured_protocol_version_(ProtocolVersion::kUnknown),
flags_(0),
transfer_state_(TransferState::kInactive),
retries_(0),
max_retries_(0),
stream_(nullptr),
rpc_writer_(nullptr),
offset_(0),
window_size_(0),
window_end_offset_(0),
max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
max_parameters_(nullptr),
thread_(nullptr),
last_chunk_sent_(Chunk::Type::kData),
last_chunk_offset_(0),
chunk_timeout_(chrono::SystemClock::duration::zero()),
interchunk_delay_(chrono::SystemClock::for_at_least(
std::chrono::microseconds(kDefaultChunkDelayMicroseconds))),
next_timeout_(kNoTimeout) {}
constexpr TransferType type() const {
return static_cast<TransferType>(flags_ & kFlagsType);
}
private:
enum class TransferState : uint8_t {
// The context is available for use for a new transfer.
kInactive,
// A transfer completed and the final status chunk was sent. The Context is
// available for use for a new transfer. A receive transfer uses this state
// to allow a transmitter to retry its last chunk if the final status chunk
// was dropped.
//
// Only used by the legacy protocol. Starting from version 2, transfer
// completions are acknowledged, for which the TERMINATING state is used.
kCompleted,
// Transfer is starting. The server and client are performing an initial
// handshake and negotiating protocol and feature flags.
kInitiating,
// Waiting for the other end to send a chunk.
kWaiting,
// Transmitting a window of data to a receiver.
kTransmitting,
// Recovering after one or more chunks was dropped in an active transfer.
kRecovery,
// Transfer has completed locally and is waiting for the peer to acknowledge
// its final status. Only entered by the terminating side of the transfer.
//
// The context remains in a TERMINATING state until it receives an
// acknowledgement from the peer or times out. Either way, the context
// transitions to INACTIVE afterwards, fully cleaning it up for reuse.
//
// Used instead of COMPLETED starting from version 2. Unlike COMPLETED,
// contexts in a TERMINATING state cannot be used to start new transfers.
kTerminating,
};
enum class TransmitAction {
// Start of a new transfer.
kBegin,
// Extend the current window length.
kExtend,
// Retransmit from a specified offset.
kRetransmit,
};
void set_transfer_state(TransferState state) { transfer_state_ = state; }
// The session ID as unsigned instead of uint32_t so it can be used with %u.
unsigned id_for_log() const {
static_assert(sizeof(unsigned) >= sizeof(session_id_));
return static_cast<unsigned>(session_id_);
}
stream::Reader& reader() {
PW_DASSERT(active() && type() == TransferType::kTransmit);
return static_cast<stream::Reader&>(*stream_);
}
stream::Writer& writer() {
PW_DASSERT(active() && type() == TransferType::kReceive);
return static_cast<stream::Writer&>(*stream_);
}
bool DataTransferComplete() const {
return transfer_state_ == TransferState::kTerminating ||
transfer_state_ == TransferState::kCompleted;
}
bool ShouldSkipCompletionHandshake() const {
// Completion handshakes are not part of the legacy protocol. Additionally,
// transfers which have not yet fully established should not handshake and
// simply time out.
return configured_protocol_version_ <= ProtocolVersion::kLegacy ||
transfer_state_ == TransferState::kInitiating;
}
// Calculates the maximum size of actual data that can be sent within a
// single client write transfer chunk, accounting for the overhead of the
// transfer protocol and RPC system.
//
// Note: This function relies on RPC protocol internals. This is generally a
// *bad* idea, but is necessary here due to limitations of the RPC system
// and its asymmetric ingress and egress paths.
//
// TODO(frolv): This should be investigated further and perhaps addressed
// within the RPC system, at the least through a helper function.
uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
uint32_t channel_id) const;
// Initializes a new transfer using new_transfer. The provided stream
// argument is used in place of the NewTransferEvent's stream. Only
// initializes state; no packets are sent.
//
// Precondition: context is not active.
void Initialize(const NewTransferEvent& new_transfer);
// Starts a new transfer from an initialized context by sending the initial
// transfer chunk. This is only used by transfer clients, as the transfer
// service cannot initiate transfers.
//
// Calls Finish(), which calls the on_completion callback, if initiating a
// transfer fails.
void InitiateTransferAsClient();
// Starts a new transfer on the server after receiving a request from a
// client.
bool StartTransferAsServer(const NewTransferEvent& new_transfer);
// Does final cleanup specific to the server or client. Returns whether the
// cleanup succeeded. An error in cleanup indicates that the transfer
// failed.
virtual Status FinalCleanup(Status status) = 0;
// Processes a chunk in either a transfer or receive transfer.
void HandleChunkEvent(const ChunkEvent& event);
// Runs the initial three-way handshake when starting a new transfer.
void PerformInitialHandshake(const Chunk& chunk);
void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk);
// Processes a chunk in a transmit transfer.
void HandleTransmitChunk(const Chunk& chunk);
// Processes a transfer parameters update in a transmit transfer.
void HandleTransferParametersUpdate(const Chunk& chunk);
// Sends the next chunk in a transmit transfer, if any.
void TransmitNextChunk(bool retransmit_requested);
// Processes a chunk in a receive transfer.
void HandleReceiveChunk(const Chunk& chunk);
// Processes a data chunk in a received while in the kWaiting state.
void HandleReceivedData(const Chunk& chunk);
// Sends the first chunk in a transmit transfer.
void SendInitialTransmitChunk();
// Updates the current receive transfer parameters based on the context's
// configuration.
void UpdateTransferParameters();
// Populates the transfer parameters fields on a chunk object.
void SetTransferParameters(Chunk& parameters);
// In a receive transfer, sends a parameters chunk telling the transmitter
// how much data they can send.
void SendTransferParameters(TransmitAction action);
// Updates the current receive transfer parameters, then sends them.
void UpdateAndSendTransferParameters(TransmitAction action);
// Processes a chunk in a terminating state.
void HandleTerminatingChunk(const Chunk& chunk);
// Ends the transfer with the specified status, sending a completion chunk to
// the peer.
void TerminateTransfer(Status status);
// Ends a transfer following notification of completion from the peer.
void HandleTermination(Status status);
// Forcefully ends a transfer locally without contacting the peer.
void Abort(Status status) {
Finish(status);
set_transfer_state(TransferState::kCompleted);
}
// Sends a final status chunk of a completed transfer without updating the
// transfer. Sends status_, which MUST have been set by a previous Finish()
// call.
void SendFinalStatusChunk();
// Marks the transfer as completed and calls FinalCleanup(). Sets status_ to
// the final status for this transfer. The transfer MUST be active when this
// is called.
void Finish(Status status);
// Encodes the specified chunk to the encode buffer and sends it with the
// rpc_writer_. Calls Finish() with an error if the operation fails.
void EncodeAndSendChunk(const Chunk& chunk);
void SetTimeout(chrono::SystemClock::duration timeout);
void ClearTimeout() { next_timeout_ = kNoTimeout; }
// Called when the transfer's timeout expires.
void HandleTimeout();
// Resends the last packet or aborts the transfer if the maximum retries has
// been exceeded.
void Retry();
void RetryHandshake();
void LogTransferConfiguration();
static constexpr uint8_t kFlagsType = 1 << 0;
static constexpr uint8_t kFlagsDataSent = 1 << 1;
static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000;
// How long to wait for the other side to ACK a final transfer chunk before
// resetting the context so that it can be reused. During this time, the
// status chunk will be re-sent for every non-ACK chunk received,
// continually notifying the other end that the transfer is over.
static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout =
std::chrono::milliseconds(5000);
static constexpr chrono::SystemClock::time_point kNoTimeout =
chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
uint32_t session_id_;
uint32_t resource_id_;
// The version of the transfer protocol that this node wants to run.
ProtocolVersion desired_protocol_version_;
// The version of the transfer protocol that the context is actually running,
// following negotiation with the transfer peer.
ProtocolVersion configured_protocol_version_;
uint8_t flags_;
TransferState transfer_state_;
uint8_t retries_;
uint8_t max_retries_;
// The stream from which to read or to which to write data.
stream::Stream* stream_;
rpc::Writer* rpc_writer_;
uint32_t offset_;
uint32_t window_size_;
uint32_t window_end_offset_;
uint32_t max_chunk_size_bytes_;
const TransferParameters* max_parameters_;
TransferThread* thread_;
Chunk::Type last_chunk_sent_;
union {
Status status_; // Used when state is kCompleted.
uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery.
};
// How long to wait for a chunk from the other end.
chrono::SystemClock::duration chunk_timeout_;
// How long to delay between transmitting subsequent data chunks within a
// window.
chrono::SystemClock::duration interchunk_delay_;
// Timestamp at which the transfer will next time out, or kNoTimeout.
chrono::SystemClock::time_point next_timeout_;
RateEstimate transfer_rate_;
};
} // namespace pw::transfer::internal