blob: 152ddc7d96237b0fd238786ba9a19246ad7b438e [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#define PW_LOG_MODULE_NAME "TRN"
#include "pw_transfer/transfer.h"
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_status/try.h"
#include "pw_transfer/internal/chunk.h"
namespace pw::transfer {
void TransferService::HandleChunk(ConstByteSpan message,
internal::TransferType type) {
// All incoming chunks in a client read transfer are transfer parameter
// updates, except for the final chunk, which is an acknowledgement of
// completion.
//
// Transfer parameters may contain the following fields:
//
// - transfer_id (required)
// - pending_bytes (required)
// - offset (required)
// - max_chunk_size_bytes
// - min_delay_microseconds (not yet supported)
//
internal::Chunk chunk;
if (Status status = internal::DecodeChunk(message, chunk); !status.ok()) {
// No special handling required here. The client will retransmit the chunk
// when no response is received.
PW_LOG_ERROR("Failed to decode incoming transfer chunk");
return;
}
internal::ServerContextPool& pool =
type == internal::kRead ? read_transfers_ : write_transfers_;
rpc::RawServerReaderWriter& stream =
type == internal::kRead ? client_.read_stream() : client_.write_stream();
Result<internal::ServerContext*> result =
chunk.IsInitialChunk() ? pool.StartTransfer(chunk.transfer_id,
work_queue_,
stream,
chunk_timeout_,
max_retries_)
: pool.GetPendingTransfer(chunk.transfer_id);
if (!result.ok()) {
client_.SendStatusChunk(type, chunk.transfer_id, result.status());
PW_LOG_ERROR("Error handling chunk for transfer %u: %d",
static_cast<unsigned>(chunk.transfer_id),
result.status().code());
return;
}
internal::ServerContext& transfer = *result.value();
if (chunk.status.has_value()) {
// Transfer has been terminated (successfully or not).
const Status status = chunk.status.value();
if (!status.ok()) {
PW_LOG_ERROR("Receiver terminated transfer %u with status %d",
static_cast<unsigned>(chunk.transfer_id),
static_cast<int>(status.code()));
}
if (transfer.active()) {
transfer.Finish(status).IgnoreError();
}
return;
}
if (transfer.ReadChunkData(
chunk_data_buffer_, client_.max_parameters(), chunk)) {
// Call this synchronously for now. Later, this will be deferred within a
// work queue.
transfer.ProcessChunk(chunk_data_buffer_, client_.max_parameters());
}
}
} // namespace pw::transfer