blob: dbc9a95400392c6b39be9588af78c94cb127accf [file] [log] [blame]
#include <unistd.h>
#include <array>
#include <chrono>
#include <cstddef>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <optional>
#include <thread>
#include "pw_assert/check.h"
#include "pw_data_link/data_link.h"
#include "pw_data_link/server_socket.h"
#include "pw_data_link/socket_data_link.h"
#include "pw_data_link/socket_data_link_thread.h"
#include "pw_log/log.h"
#include "pw_result/result.h"
#include "pw_status/status.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/detached_thread.h"
#include "pw_thread/thread_core.h"
#include "pw_thread_stl/options.h"
namespace {
const char* kLocalHost = "localhost";
constexpr int kDefaultPort = 33001;
struct LinkSignals {
bool run = true;
pw::sync::ThreadNotification ready_to_read;
pw::sync::ThreadNotification data_read;
pw::sync::ThreadNotification ready_to_write;
pw::StatusWithSize last_status = pw::StatusWithSize(0);
};
class Reader : public pw::thread::ThreadCore {
public:
Reader(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
: link_(link), link_signals_(link_signals) {}
void Run() override {
while (link_signals_.run) {
PW_LOG_DEBUG("Waiting to read");
link_signals_.ready_to_read.acquire();
if (!link_signals_.run) {
break;
}
PW_LOG_DEBUG("Reading");
const pw::Status status = link_.Read(buffer_);
if (!status.ok()) {
PW_LOG_ERROR("Failed to read. Error: %s", status.str());
continue;
}
PW_LOG_DEBUG("Waiting for read to be done");
link_signals_.data_read.acquire();
PW_LOG_DEBUG("Read returned %s (%d bytes)",
link_signals_.last_status.status().str(),
static_cast<int>(link_signals_.last_status.size()));
if (link_signals_.last_status.ok()) {
PW_LOG_INFO("%s", reinterpret_cast<char*>(buffer_.data()));
}
}
PW_LOG_INFO("Reader thread stopped");
}
private:
pw::data_link::SocketDataLink& link_;
LinkSignals& link_signals_;
std::array<std::byte, 1024> buffer_;
};
class Writer : public pw::thread::ThreadCore {
public:
Writer(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
: link_(link), link_signals_(link_signals) {}
void Run() override {
while (link_signals_.run) {
link_signals_.ready_to_write.acquire();
if (!link_signals_.run) {
break;
}
std::optional<pw::ByteSpan> buffer = link_.GetWriteBuffer();
if (!buffer.has_value()) {
continue;
}
for (size_t i = 0; i < buffer->size(); ++i) {
buffer.value()[i] = std::byte('C');
}
buffer.value()[buffer->size() - 1] = std::byte(0);
const pw::Status status = link_.Write(*buffer);
if (!status.ok()) {
PW_LOG_ERROR("Write failed. Error: %s", status.str());
}
}
PW_LOG_INFO("Writer thread stopped");
}
private:
pw::data_link::SocketDataLink& link_;
LinkSignals& link_signals_;
};
} // namespace
void print_help_menu() {
std::cout << "Data Link sample app." << std::endl << std::endl;
std::cout << "Use --server to serve a socket." << std::endl;
std::cout << "Use --port <NUMBER> to:" << std::endl;
std::cout << " - serve a socket on the given port when --server is set, or"
<< std::endl;
std::cout << " - connect to a socket on the given port." << std::endl;
std::cout << " Defaults to port " << kDefaultPort << "." << std::endl;
std::cout << "Use --reader to make the link's role read only." << std::endl;
std::cout << " Defaults to writer only role." << std::endl;
std::cout << "Use -h to print this menu and exit." << std::endl;
}
int main(int argc, char** argv) {
constexpr size_t kMaxLinks = 1;
bool is_reader = false;
bool is_server = false;
int port = kDefaultPort;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--port") == 0 && i < argc - 1) {
port = std::atoi(argv[++i]);
} else if (strcmp(argv[i], "--server") == 0) {
is_server = true;
} else if (strcmp(argv[i], "--reader") == 0) {
is_reader = true;
} else if (strcmp(argv[i], "-h") == 0) {
print_help_menu();
exit(0);
} else {
PW_LOG_ERROR("Invalid argument '%s'", argv[i]);
print_help_menu();
exit(-1);
}
}
PW_LOG_INFO("Started");
LinkSignals link_signals{};
auto event_callback = [&link_signals](pw::data_link::DataLink::Event event,
pw::StatusWithSize status) {
link_signals.last_status = status;
switch (event) {
case pw::data_link::DataLink::Event::kOpen: {
if (!link_signals.last_status.ok()) {
PW_LOG_ERROR("Link failed to open: %s",
link_signals.last_status.status().str());
link_signals.run = false;
}
link_signals.ready_to_write.release();
link_signals.ready_to_read.release();
} break;
case pw::data_link::DataLink::Event::kClosed:
link_signals.run = false;
link_signals.ready_to_read.release();
link_signals.ready_to_write.release();
break;
case pw::data_link::DataLink::Event::kDataReceived:
link_signals.ready_to_read.release();
break;
case pw::data_link::DataLink::Event::kDataRead:
link_signals.data_read.release();
break;
case pw::data_link::DataLink::Event::kDataSent:
link_signals.ready_to_write.release();
break;
}
};
std::unique_ptr<pw::data_link::SocketDataLink> link;
if (is_server) {
PW_LOG_INFO("Serving on port %d", static_cast<int>(port));
pw::data_link::ServerSocket server{kMaxLinks};
PW_CHECK_OK(server.Listen(port));
PW_LOG_INFO("Waiting for connection");
const pw::Result<int> connection_fd = server.Accept();
PW_CHECK_OK(connection_fd.status());
PW_LOG_INFO("Creating Link");
link = std::make_unique<pw::data_link::SocketDataLink>(*connection_fd,
event_callback);
} else {
PW_LOG_INFO("Openning Link");
link = std::make_unique<pw::data_link::SocketDataLink>(kLocalHost, port);
link->Open(event_callback);
}
pw::data_link::SocketDataLinkThread<kMaxLinks> links_thread{};
PW_CHECK_OK(links_thread.RegisterLink(*link));
PW_LOG_INFO("Starting links thread");
pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);
if (is_reader) {
PW_LOG_INFO("Starting reader thread");
Reader reader_thread{*link, link_signals};
pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
} else {
PW_LOG_INFO("Starting writer thread");
Writer writer_thread{*link, link_signals};
pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
}
PW_LOG_INFO("Running for some time");
for (int i = 0; i < 30 && link_signals.run; ++i) {
sleep(1);
}
PW_LOG_INFO("Closing link");
links_thread.UnregisterLink(*link);
PW_LOG_INFO("Stopping links thread");
links_thread.Stop();
PW_LOG_INFO("Terminating");
return 0;
}