blob: 9d68d58e6738745c3e10bc8854b938e5fcf0549b [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_transfer/transfer.h"
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_status/try.h"
#include "pw_transfer/transfer.pwpb.h"
#include "pw_transfer_private/chunk.h"
namespace pw::transfer {
namespace internal {
Status Context::Start(Type type, Handler& handler) {
PW_DCHECK(!active());
if (type == kRead) {
PW_TRY(handler.PrepareRead());
} else {
PW_TRY(handler.PrepareWrite());
}
type_ = type;
handler_ = &handler;
offset_ = 0;
return OkStatus();
}
void Context::Finish(Status status) {
PW_DCHECK(active());
if (type_ == kRead) {
handler_->FinalizeRead(status);
} else {
handler_->FinalizeWrite(status)
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
}
handler_ = nullptr;
}
} // namespace internal
void TransferService::Read(ServerContext&,
RawServerReaderWriter& reader_writer) {
read_stream_ = std::move(reader_writer);
read_stream_.set_on_next(
[this](ConstByteSpan message) { OnReadMessage(message); });
}
void TransferService::Write(ServerContext&,
RawServerReaderWriter& reader_writer) {
// TODO(frolv): Implement server-side write transfers.
reader_writer.Finish(Status::Unimplemented())
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
}
void TransferService::SendStatusChunk(RawServerReaderWriter& stream,
uint32_t transfer_id,
Status status) {
internal::Chunk chunk = {};
chunk.transfer_id = transfer_id;
chunk.status = status.code();
Result<ConstByteSpan> result =
internal::EncodeChunk(chunk, stream.PayloadBuffer());
if (result.ok()) {
stream.Write(result.value())
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
}
}
bool TransferService::SendNextReadChunk(internal::Context& context) {
if (context.pending_bytes() == 0) {
return false;
}
ByteSpan buffer = read_stream_.PayloadBuffer();
// Begin by doing a partial encode of all the metadata fields, leaving the
// buffer with usable space for the chunk data at the end.
Chunk::MemoryEncoder encoder(buffer);
encoder.WriteTransferId(context.transfer_id())
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
encoder.WriteOffset(context.offset())
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
// Reserve space for the data proto field overhead and use the remainder of
// the buffer for the chunk data.
size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */;
ByteSpan data_buffer = buffer.subspan(reserved_size);
size_t max_bytes_to_send =
std::min(context.pending_bytes(), context.max_chunk_size_bytes());
if (max_bytes_to_send < data_buffer.size()) {
data_buffer = data_buffer.first(max_bytes_to_send);
}
Result<ByteSpan> data = context.reader().Read(data_buffer);
if (data.status().IsOutOfRange()) {
// No more data to read.
encoder.WriteRemainingBytes(0)
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
context.set_pending_bytes(0);
} else if (!data.ok()) {
read_stream_.ReleaseBuffer();
return false;
} else {
encoder.WriteData(data.value())
.IgnoreError(); // TODO(pwbug/387): Handle Status properly
context.set_offset(context.offset() + data.value().size());
context.set_pending_bytes(context.pending_bytes() - data.value().size());
}
return read_stream_.Write(encoder).ok();
}
Result<internal::Context*> TransferService::GetOrStartReadTransfer(
uint32_t id) {
internal::Context* new_transfer = nullptr;
// Check if the ID belongs to an active transfer. If not, pick an inactive
// slot to start a new transfer.
for (internal::Context& transfer : read_transfers_) {
if (transfer.active()) {
if (transfer.transfer_id() == id) {
return &transfer;
}
} else {
new_transfer = &transfer;
}
}
if (!new_transfer) {
return Status::ResourceExhausted();
}
// Try to start the new transfer by checking if a handler for it exists.
auto handler = std::find_if(handlers_.begin(), handlers_.end(), [&](auto& h) {
return h.id() == id;
});
if (handler == handlers_.end()) {
return Status::NotFound();
}
PW_TRY(new_transfer->Start(internal::Context::kRead, *handler));
return new_transfer;
}
void TransferService::OnReadMessage(ConstByteSpan message) {
// All incoming chunks in a client read transfer are transfer parameter
// updates, except for the final chunk, which is an acknowledgement of
// completion.
//
// Transfer parameters may contain the following fields:
//
// - transfer_id (required)
// - pending_bytes (required)
// - offset (required)
// - max_chunk_size_bytes
// - min_delay_microseconds (not yet supported)
//
internal::Chunk parameters;
if (Status status = internal::DecodeChunk(message, parameters);
!status.ok()) {
// No special handling required here. The client will retransmit the chunk
// when no response is received.
PW_LOG_ERROR("Failed to decode incoming read transfer chunk");
return;
}
Result<internal::Context*> result =
GetOrStartReadTransfer(parameters.transfer_id);
if (!result.ok()) {
PW_LOG_ERROR("Error handling read transfer %u: %d",
static_cast<unsigned>(parameters.transfer_id),
static_cast<int>(result.status().code()));
SendStatusChunk(read_stream_, parameters.transfer_id, result.status());
return;
}
internal::Context& transfer = *result.value();
if (parameters.status.has_value()) {
// Transfer has been terminated (successfully or not).
Status status = parameters.status.value();
if (!status.ok()) {
PW_LOG_ERROR("Transfer %u failed with status %d",
static_cast<unsigned>(parameters.transfer_id),
static_cast<int>(status.code()));
}
transfer.Finish(status);
return;
}
if (!parameters.pending_bytes.has_value()) {
// Malformed chunk.
SendStatusChunk(
read_stream_, parameters.transfer_id, Status::InvalidArgument());
transfer.Finish(Status::InvalidArgument());
return;
}
// Update local transfer fields based on the received chunk.
if (transfer.offset() != parameters.offset) {
// TODO(frolv): pw_stream does not yet support seeking, so this temporarily
// cancels the transfer. Once seeking is added, this should be updated.
//
// transfer.set_offset(parameters.offset.value());
// transfer.Seek(transfer.offset());
//
SendStatusChunk(
read_stream_, parameters.transfer_id, Status::Unimplemented());
transfer.Finish(Status::Unimplemented());
return;
}
if (parameters.max_chunk_size_bytes.has_value()) {
transfer.set_max_chunk_size_bytes(parameters.max_chunk_size_bytes.value());
}
transfer.set_pending_bytes(parameters.pending_bytes.value());
while (SendNextReadChunk(transfer)) {
// Empty.
}
}
} // namespace pw::transfer