blob: dbf67b0322c381734826f736a59cfaaa667dc9a4 [file] [log] [blame]
#include <unistd.h>
#include <array>
#include <chrono>
#include <cstddef>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <optional>
#include <thread>
#include <utility>
#include "pw_log/levels.h"
#define PW_LOG_LEVEL PW_LOG_LEVEL_INFO
#include "pw_assert/check.h"
#include "pw_data_link/data_link.h"
#include "pw_data_link/server_socket.h"
#include "pw_data_link/simple_allocator.h"
#include "pw_data_link/socket_data_link.h"
#include "pw_data_link/socket_data_link_thread.h"
#include "pw_log/log.h"
#include "pw_result/result.h"
#include "pw_status/status.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/detached_thread.h"
#include "pw_thread/thread_core.h"
#include "pw_thread_stl/options.h"
namespace {
const char* kLocalHost = "localhost";
constexpr int kDefaultPort = 33001;
constexpr size_t kReadBufferSize = 1024;
constexpr size_t kWriteBufferSize = 1024;
constexpr size_t kAllocatorSize = 2 * kWriteBufferSize;
struct LinkSignals {
std::atomic<bool> run = true;
pw::sync::ThreadNotification ready_to_read;
pw::sync::ThreadNotification data_read;
pw::sync::ThreadNotification ready_to_write;
// Note: the last_status variable is not thread-safe. It is set in the link
// event callback, called in the link worker thread, and read in the user
// reader or writer threads.
std::atomic<pw::StatusWithSize> last_status = pw::StatusWithSize(0);
};
class LinkThread : public pw::thread::ThreadCore {
public:
LinkThread(std::shared_ptr<pw::data_link::SocketDataLink> link,
std::shared_ptr<LinkSignals> link_signals)
: link_(link), link_signals_(link_signals), bytes_transferred_(0) {}
void Run() override {
while (link_signals_->run) {
Step();
}
end_time_ = std::chrono::high_resolution_clock::now();
PW_LOG_INFO("LinkThread stopped");
}
// Thread must be stopped to avoid data races.
size_t bytes_transferred() const { return bytes_transferred_; }
std::chrono::duration<int> transfer_time() const {
return std::chrono::duration_cast<std::chrono::seconds>(end_time_ -
start_time_);
}
void Stop() {
link_signals_->run = false;
link_signals_->ready_to_write.release();
link_signals_->ready_to_read.release();
}
protected:
virtual void Step() = 0;
std::shared_ptr<pw::data_link::SocketDataLink> link_;
std::shared_ptr<LinkSignals> link_signals_;
size_t bytes_transferred_ = 0;
bool start_time_set_ = false;
std::chrono::steady_clock::time_point start_time_;
std::chrono::steady_clock::time_point end_time_;
};
class Reader : public LinkThread {
public:
Reader(std::shared_ptr<pw::data_link::SocketDataLink> link,
std::shared_ptr<LinkSignals> link_signals)
: LinkThread(link, link_signals), buffer_() {}
void Step() override {
PW_LOG_DEBUG("Waiting to read");
link_signals_->ready_to_read.acquire();
if (!link_signals_->run) {
return;
}
PW_LOG_DEBUG("Reading");
if (const pw::Status status = link_->Read(buffer_); !status.ok()) {
PW_LOG_ERROR("Failed to read. Error: %s", status.str());
return;
}
if (!start_time_set_) {
start_time_set_ = true;
start_time_ = std::chrono::high_resolution_clock::now();
}
PW_LOG_DEBUG("Waiting for read to be done");
link_signals_->data_read.acquire();
const pw::StatusWithSize status = link_signals_->last_status;
PW_LOG_DEBUG("Read returned %s (%d bytes)",
status.status().str(),
static_cast<int>(status.size()));
if (status.ok()) {
bytes_transferred_ += status.size();
// PW_LOG_DEBUG("%s", reinterpret_cast<char*>(buffer_.data()));
}
}
private:
std::array<std::byte, kReadBufferSize> buffer_{};
};
class Writer : public LinkThread {
public:
Writer(std::shared_ptr<pw::data_link::SocketDataLink> link,
std::shared_ptr<LinkSignals> link_signals)
: LinkThread(link, link_signals) {}
void Step() override {
PW_LOG_DEBUG("Waiting to write");
link_signals_->ready_to_write.acquire();
if (!link_signals_->run) {
return;
}
PW_LOG_DEBUG("Waiting for write buffer");
std::optional<pw::multibuf::MultiBuf> buffer =
link_->GetWriteBuffer(kWriteBufferSize);
if (!buffer.has_value()) {
return;
}
if (!start_time_set_) {
start_time_set_ = true;
start_time_ = std::chrono::high_resolution_clock::now();
}
for (auto& chunk_byte : *buffer) {
chunk_byte = std::byte('C');
}
PW_LOG_DEBUG("Writing");
const size_t bytes_written = buffer->size();
const pw::Status status = link_->Write(std::move(*buffer));
if (status.ok()) {
bytes_transferred_ += bytes_written;
} else {
PW_LOG_ERROR("Write failed. Error: %s", status.str());
}
}
};
} // namespace
void print_help_menu() {
std::cout << "Data Link sample app." << std::endl << std::endl;
std::cout << "Use --server to serve a socket." << std::endl;
std::cout << "Use --port <NUMBER> to:" << std::endl;
std::cout << " - serve a socket on the given port when --server is set, or"
<< std::endl;
std::cout << " - connect to a socket on the given port." << std::endl;
std::cout << " Defaults to port " << kDefaultPort << "." << std::endl;
std::cout << "Use --reader to make the link's role read only." << std::endl;
std::cout << " Defaults to writer only role." << std::endl;
std::cout << "Use -h to print this menu and exit." << std::endl;
}
int main(int argc, char** argv) {
constexpr size_t kMaxLinks = 1;
bool is_reader = false;
bool is_server = false;
int port = kDefaultPort;
const std::chrono::duration<int> test_time = std::chrono::seconds(10);
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--port") == 0 && i < argc - 1) {
port = std::atoi(argv[++i]);
} else if (strcmp(argv[i], "--server") == 0) {
is_server = true;
} else if (strcmp(argv[i], "--reader") == 0) {
is_reader = true;
} else if (strcmp(argv[i], "-h") == 0) {
print_help_menu();
exit(0);
} else {
PW_LOG_ERROR("Invalid argument '%s'", argv[i]);
print_help_menu();
exit(-1);
}
}
PW_LOG_INFO("Started");
std::shared_ptr<LinkSignals> link_signals = std::make_shared<LinkSignals>();
auto event_callback = [&link_signals](pw::data_link::DataLink::Event event,
pw::StatusWithSize status) {
link_signals->last_status = status;
switch (event) {
case pw::data_link::DataLink::Event::kOpen: {
if (!status.ok()) {
PW_LOG_ERROR("Link failed to open: %s", status.status().str());
link_signals->run = false;
} else {
PW_LOG_DEBUG("Link open");
}
link_signals->ready_to_write.release();
link_signals->ready_to_read.release();
} break;
case pw::data_link::DataLink::Event::kClosed:
link_signals->run = false;
link_signals->ready_to_read.release();
link_signals->ready_to_write.release();
break;
case pw::data_link::DataLink::Event::kDataReceived:
link_signals->ready_to_read.release();
break;
case pw::data_link::DataLink::Event::kDataRead:
link_signals->data_read.release();
break;
case pw::data_link::DataLink::Event::kDataSent:
link_signals->ready_to_write.release();
break;
}
};
std::shared_ptr<pw::data_link::SocketDataLink> link;
pw::multibuf::TrackingAllocatorWithMemory<kAllocatorSize>
link_buffer_allocator{};
if (is_server) {
PW_LOG_INFO("Serving on port %d", static_cast<int>(port));
pw::data_link::ServerSocket server{kMaxLinks};
PW_CHECK_OK(server.Listen(port));
PW_LOG_INFO("Waiting for connection");
const pw::Result<int> connection_fd = server.Accept();
PW_CHECK_OK(connection_fd.status());
PW_LOG_INFO("New Connection! Creating Link");
link = std::make_shared<pw::data_link::SocketDataLink>(
*connection_fd, std::move(event_callback), link_buffer_allocator);
} else {
PW_LOG_INFO("Openning Link");
link = std::make_shared<pw::data_link::SocketDataLink>(kLocalHost, port);
link->Open(std::move(event_callback), link_buffer_allocator);
}
pw::data_link::SocketDataLinkThreadWithContainer<kMaxLinks> links_thread{};
PW_CHECK_OK(links_thread.RegisterLink(*link));
PW_LOG_INFO("Starting links thread");
pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);
std::shared_ptr<LinkThread> link_thread;
if (is_reader) {
PW_LOG_INFO("Starting reader thread");
auto reader_thread = std::make_shared<Reader>(link, link_signals);
link_thread = reader_thread;
pw::thread::DetachedThread(pw::thread::stl::Options(), *reader_thread);
} else {
PW_LOG_INFO("Starting writer thread");
auto writer_thread = std::make_shared<Writer>(link, link_signals);
link_thread = writer_thread;
pw::thread::DetachedThread(pw::thread::stl::Options(), *writer_thread);
}
if (link_signals->run) {
PW_LOG_INFO("Running for %lld seconds",
std::chrono::seconds(test_time).count());
for (int i = 0;
link_signals->run && i < std::chrono::seconds(test_time).count();
++i) {
sleep(1);
}
PW_LOG_INFO("Stopping link's work");
link_thread->Stop();
}
// Wait for link thread to stop.
// TODO(cachinchilla): Figure out how to join these threads properly.
sleep(3);
PW_LOG_INFO("Link transferred %d bytes in %lld seconds",
static_cast<int>(link_thread->bytes_transferred()),
std::chrono::seconds(link_thread->transfer_time()).count());
PW_LOG_INFO("Cleaning up");
links_thread.UnregisterLink(*link);
PW_LOG_INFO("Stopping links thread");
links_thread.Stop();
PW_LOG_INFO("Terminating");
return 0;
}