blob: 32a83ffb4f2db7eac5660d35b034ad673450d4c2 [file] [log] [blame]
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
// Simple RPC server with the transfer service registered. Reads HDLC frames
// with RPC packets through a socket. This server has a single resource ID that
// is available, and data must be written to the server before data can be read
// from the resource ID.
//
// Usage:
//
// integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'"
#include <sys/socket.h>
#include <chrono>
#include <cstddef>
#include <cstdlib>
#include <string>
#include <thread>
#include <variant>
#include <vector>
#include "google/protobuf/text_format.h"
#include "pw_assert/check.h"
#include "pw_chrono/system_clock.h"
#include "pw_log/log.h"
#include "pw_rpc_system_server/rpc_server.h"
#include "pw_rpc_system_server/socket.h"
#include "pw_stream/std_file_stream.h"
#include "pw_thread/thread.h"
#include "pw_thread_stl/options.h"
#include "pw_transfer/integration_test/config.pb.h"
#include "pw_transfer/transfer.h"
namespace pw::transfer {
namespace {
using stream::MemoryReader;
using stream::MemoryWriter;
// This is the maximum size of the socket send buffers. Ideally, this is set
// to the lowest allowed value to minimize buffering between the proxy and
// clients so rate limiting causes the client to block and wait for the
// integration test proxy to drain rather than allowing OS buffers to backlog
// large quantities of data.
//
// Note that the OS may chose to not strictly follow this requested buffer size.
// Still, setting this value to be as small as possible does reduce bufer sizes
// significantly enough to better reflect typical inter-device communication.
//
// For this to be effective, servers should also configure their sockets to a
// smaller receive buffer size.
constexpr int kMaxSocketSendBufferSize = 1;
// TODO(tpudlik): This is copy-pasted from test_rpc_server.cc, break it out into
// a shared library.
class FileTransferHandler final : public ReadWriteHandler {
public:
FileTransferHandler(uint32_t resource_id, const char* path)
: ReadWriteHandler(resource_id), path_(path) {}
~FileTransferHandler() = default;
Status PrepareRead() final {
PW_LOG_DEBUG("Preparing read for file %s", path_.c_str());
set_reader(stream_.emplace<stream::StdFileReader>(path_.c_str()));
return OkStatus();
}
void FinalizeRead(Status) final {
std::get<stream::StdFileReader>(stream_).Close();
}
Status PrepareWrite() final {
PW_LOG_DEBUG("Preparing write for file %s", path_.c_str());
set_writer(stream_.emplace<stream::StdFileWriter>(path_.c_str()));
return OkStatus();
}
Status FinalizeWrite(Status) final {
std::get<stream::StdFileWriter>(stream_).Close();
return OkStatus();
}
private:
std::string path_;
std::variant<std::monostate, stream::StdFileReader, stream::StdFileWriter>
stream_;
};
void RunServer(int socket_port, ServerConfig config) {
std::vector<std::byte> chunk_buffer(config.chunk_size_bytes());
std::vector<std::byte> encode_buffer(config.chunk_size_bytes());
transfer::Thread<4, 4> transfer_thread(chunk_buffer, encode_buffer);
TransferService transfer_service(
transfer_thread,
config.pending_bytes(),
std::chrono::seconds(config.chunk_timeout_seconds()),
config.transfer_service_retries(),
config.extend_window_divisor());
rpc::system_server::set_socket_port(socket_port);
rpc::system_server::Init();
rpc::system_server::Server().RegisterService(transfer_service);
// Start transfer thread.
thread::Thread transfer_thread_handle =
thread::Thread(thread::stl::Options(), transfer_thread);
int retval = setsockopt(rpc::system_server::GetServerSocketFd(),
SOL_SOCKET,
SO_SNDBUF,
&kMaxSocketSendBufferSize,
sizeof(kMaxSocketSendBufferSize));
PW_CHECK_INT_EQ(retval,
0,
"Failed to configure socket send buffer size with errno=%d",
errno);
// It's fine to allocate this on the stack since this thread doesn't return
// until this process is killed.
FileTransferHandler transfer_handler(config.resource_id(),
config.file().c_str());
transfer_service.RegisterHandler(transfer_handler);
PW_LOG_INFO("Starting pw_rpc server");
PW_CHECK_OK(rpc::system_server::Start());
// Unregister transfer handler before cleaning up the thread since doing so
// requires the transfer thread to be running.
transfer_service.UnregisterHandler(transfer_handler);
// End transfer thread.
transfer_thread.Terminate();
transfer_thread_handle.join();
}
} // namespace
} // namespace pw::transfer
int main(int argc, char* argv[]) {
if (argc != 2) {
PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
return 1;
}
int port = std::atoi(argv[1]);
PW_CHECK_UINT_GT(port, 0, "Invalid port!");
std::string config_string;
std::string line;
while (std::getline(std::cin, line)) {
config_string = config_string + line + '\n';
}
pw::transfer::ServerConfig config;
bool ok =
google::protobuf::TextFormat::ParseFromString(config_string, &config);
if (!ok) {
PW_LOG_INFO("Failed to parse config: %s", config_string.c_str());
PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
return 1;
} else {
PW_LOG_INFO("Server loaded config:\n%s", config.DebugString().c_str());
}
pw::transfer::RunServer(port, config);
return 0;
}