blob: b0afc0bb9cfec3bf9c0d23855ab574b34edc25bc [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
// Simple RPC server with the transfer service registered. Reads HDLC frames
// with RPC packets through a socket. This server has a single resource ID that
// is available, and data must be written to the server before data can be read
// from the resource ID.
//
// Usage:
//
// integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'"
#include <sys/socket.h>
#include <chrono>
#include <cstddef>
#include <cstdlib>
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <variant>
#include <vector>
#include "google/protobuf/text_format.h"
#include "pw_assert/check.h"
#include "pw_chrono/system_clock.h"
#include "pw_log/log.h"
#include "pw_rpc_system_server/rpc_server.h"
#include "pw_rpc_system_server/socket.h"
#include "pw_stream/std_file_stream.h"
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
#include "pw_transfer/integration_test/config.pb.h"
#include "pw_transfer/transfer.h"
namespace pw::transfer {
namespace {
using stream::MemoryReader;
using stream::MemoryWriter;
// This is the maximum size of the socket send buffers. Ideally, this is set
// to the lowest allowed value to minimize buffering between the proxy and
// clients so rate limiting causes the client to block and wait for the
// integration test proxy to drain rather than allowing OS buffers to backlog
// large quantities of data.
//
// Note that the OS may chose to not strictly follow this requested buffer size.
// Still, setting this value to be as small as possible does reduce bufer sizes
// significantly enough to better reflect typical inter-device communication.
//
// For this to be effective, servers should also configure their sockets to a
// smaller receive buffer size.
constexpr int kMaxSocketSendBufferSize = 1;
class FileTransferHandler final : public ReadWriteHandler {
public:
FileTransferHandler(uint32_t resource_id,
std::deque<std::string>&& sources,
std::deque<std::string>&& destinations)
: ReadWriteHandler(resource_id),
sources_(sources),
destinations_(destinations) {}
~FileTransferHandler() = default;
Status PrepareRead() final {
if (sources_.empty()) {
PW_LOG_ERROR("Source paths exhausted");
return Status::ResourceExhausted();
}
auto path = sources_.front();
PW_LOG_DEBUG("Preparing read for file %s", path.c_str());
set_reader(stream_.emplace<stream::StdFileReader>(path.c_str()));
sources_.pop_front();
return OkStatus();
}
void FinalizeRead(Status) final {
std::get<stream::StdFileReader>(stream_).Close();
}
Status PrepareWrite() final {
if (destinations_.empty()) {
PW_LOG_ERROR("Destination paths exhausted");
return Status::ResourceExhausted();
}
auto path = destinations_.front();
PW_LOG_DEBUG("Preparing write for file %s", path.c_str());
set_writer(stream_.emplace<stream::StdFileWriter>(path.c_str()));
destinations_.pop_front();
return OkStatus();
}
Status FinalizeWrite(Status) final {
std::get<stream::StdFileWriter>(stream_).Close();
return OkStatus();
}
private:
std::deque<std::string> sources_;
std::deque<std::string> destinations_;
std::variant<std::monostate, stream::StdFileReader, stream::StdFileWriter>
stream_;
};
void RunServer(int socket_port, ServerConfig config) {
std::vector<std::byte> chunk_buffer(config.chunk_size_bytes());
std::vector<std::byte> encode_buffer(config.chunk_size_bytes());
transfer::Thread<4, 4> transfer_thread(chunk_buffer, encode_buffer);
TransferService transfer_service(
transfer_thread,
config.pending_bytes(),
std::chrono::seconds(config.chunk_timeout_seconds()),
config.transfer_service_retries(),
config.extend_window_divisor());
rpc::system_server::set_socket_port(socket_port);
rpc::system_server::Init();
rpc::system_server::Server().RegisterService(transfer_service);
// Start transfer thread.
thread::Thread transfer_thread_handle =
thread::Thread(thread::stl::Options(), transfer_thread);
int retval = setsockopt(rpc::system_server::GetServerSocketFd(),
SOL_SOCKET,
SO_SNDBUF,
&kMaxSocketSendBufferSize,
sizeof(kMaxSocketSendBufferSize));
PW_CHECK_INT_EQ(retval,
0,
"Failed to configure socket send buffer size with errno=%d",
errno);
std::vector<std::unique_ptr<FileTransferHandler>> handlers;
for (const auto& resource : config.resources()) {
uint32_t id = resource.first;
std::deque<std::string> source_paths(resource.second.source_paths().begin(),
resource.second.source_paths().end());
std::deque<std::string> destination_paths(
resource.second.destination_paths().begin(),
resource.second.destination_paths().end());
auto handler = std::make_unique<FileTransferHandler>(
id, std::move(source_paths), std::move(destination_paths));
transfer_service.RegisterHandler(*handler);
handlers.push_back(std::move(handler));
}
PW_LOG_INFO("Starting pw_rpc server");
PW_CHECK_OK(rpc::system_server::Start());
// Unregister transfer handler before cleaning up the thread since doing so
// requires the transfer thread to be running.
for (auto& handler : handlers) {
transfer_service.UnregisterHandler(*handler);
}
// End transfer thread.
transfer_thread.Terminate();
transfer_thread_handle.join();
}
} // namespace
} // namespace pw::transfer
int main(int argc, char* argv[]) {
if (argc != 2) {
PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
return 1;
}
int port = std::atoi(argv[1]);
PW_CHECK_UINT_GT(port, 0, "Invalid port!");
std::string config_string;
std::string line;
while (std::getline(std::cin, line)) {
config_string = config_string + line + '\n';
}
pw::transfer::ServerConfig config;
bool ok =
google::protobuf::TextFormat::ParseFromString(config_string, &config);
if (!ok) {
PW_LOG_INFO("Failed to parse config: %s", config_string.c_str());
PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
return 1;
} else {
PW_LOG_INFO("Server loaded config:\n%s", config.DebugString().c_str());
}
pw::transfer::RunServer(port, config);
return 0;
}