blob: 1a6ffb1d84d228ed7b0f2aae9950ac25b7b728b7 [file] [log] [blame]
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#define PW_LOG_MODULE_NAME "TRN"
#include "pw_transfer/client.h"
#include <algorithm>
#include <cstring>
#include <mutex>
#include "pw_assert/check.h"
#include "pw_log/log.h"
#include "pw_transfer/internal/chunk.h"
#include "pw_transfer/transfer.pwpb.h"
namespace pw::transfer {
Status Client::Read(uint32_t transfer_id,
stream::Writer& output,
CompletionFunc&& on_completion,
chrono::SystemClock::duration timeout) {
if (on_completion == nullptr) {
return Status::InvalidArgument();
}
if (!read_stream_.active()) {
read_stream_ =
client_.Read([this](ConstByteSpan chunk) { OnChunk(chunk, kRead); });
}
return StartNewTransfer(
transfer_id, kRead, output, std::move(on_completion), timeout);
}
Status Client::Write(uint32_t transfer_id,
stream::Reader& input,
CompletionFunc&& on_completion,
chrono::SystemClock::duration timeout) {
if (on_completion == nullptr) {
return Status::InvalidArgument();
}
if (!write_stream_.active()) {
write_stream_ =
client_.Write([this](ConstByteSpan chunk) { OnChunk(chunk, kWrite); });
}
return StartNewTransfer(
transfer_id, kWrite, input, std::move(on_completion), timeout);
}
Status Client::StartNewTransfer(uint32_t transfer_id,
Type type,
stream::Stream& stream,
CompletionFunc&& on_completion,
chrono::SystemClock::duration timeout) {
std::lock_guard lock(transfer_context_mutex_);
ClientContext* context = nullptr;
// Check the transfer ID is already being used. If not, find an available
// transfer slot.
for (ClientContext& ctx : transfer_contexts_) {
if (ctx.active()) {
if (ctx.transfer_id() == transfer_id) {
return Status::AlreadyExists();
}
} else {
context = &ctx;
}
}
if (context == nullptr) {
return Status::ResourceExhausted();
}
if (type == kWrite) {
PW_LOG_DEBUG("Starting new write transfer %u",
static_cast<unsigned>(transfer_id));
context->StartWrite(*this,
transfer_id,
work_queue_,
static_cast<stream::Reader&>(stream),
write_stream_,
std::move(on_completion),
timeout);
} else {
PW_LOG_DEBUG("Starting new read transfer %u",
static_cast<unsigned>(transfer_id));
context->StartRead(*this,
transfer_id,
work_queue_,
static_cast<stream::Writer&>(stream),
read_stream_,
std::move(on_completion),
timeout);
}
return context->InitiateTransfer(max_parameters_);
}
// TODO(pwbug/592): This function should be updated to only return active
// transfers. Calling ReadChunkData() / Finish() on inactive transfers is
// unintuitive and has led to several bugs where not all cases are handled.
Client::ClientContext* Client::GetTransferById(uint32_t transfer_id) {
std::lock_guard lock(transfer_context_mutex_);
auto it =
std::find_if(transfer_contexts_.begin(),
transfer_contexts_.end(),
[&transfer_id](ClientContext& c) {
return c.initialized() && c.transfer_id() == transfer_id;
});
if (it == transfer_contexts_.end()) {
return nullptr;
}
return it;
}
void Client::OnChunk(ConstByteSpan data, Type type) {
internal::Chunk chunk;
if (Status status = DecodeChunk(data, chunk); !status.ok()) {
// TODO(frolv): Handle this error case.
return;
}
ClientContext* ctx = GetTransferById(chunk.transfer_id);
if (ctx == nullptr) {
// TODO(frolv): Handle this error case.
return;
}
if (type == kRead && !ctx->is_read_transfer()) {
PW_LOG_ERROR(
"Received a read chunk for transfer %u, but it is a write transfer",
static_cast<unsigned>(ctx->transfer_id()));
if (ctx->active()) {
// TODO(pwbug/592): Remove the active() check.
ctx->Finish(Status::Internal());
}
return;
}
if (type == kWrite && !ctx->is_write_transfer()) {
PW_LOG_ERROR(
"Received a write chunk for transfer %u, but it is a read transfer",
static_cast<unsigned>(ctx->transfer_id()));
if (ctx->active()) {
// TODO(pwbug/592): Remove the active() check.
ctx->Finish(Status::Internal());
}
return;
}
if (chunk.status.has_value() && ctx->active()) {
// A status field indicates that the transfer has finished.
//
// TODO(frolv): This is invoked from the RPC client thread -- should it be
// run in the work queue instead?
//
// TODO(pwbug/592): Remove the active() check.
ctx->Finish(chunk.status.value());
return;
}
if (ctx->ReadChunkData(chunk_data_buffer_, max_parameters_, chunk)) {
// TODO(frolv): This should be run from work_queue_.
ctx->ProcessChunk(chunk_data_buffer_, max_parameters_);
}
// TODO(frolv): This silences the compiler. Actually use the work queue.
static_cast<void>(work_queue_);
}
} // namespace pw::transfer