blob: 48ea3c7d9ef7e557ac04b3202741db6ccad4eb9b [file] [log] [blame]
#include <array>
#include <chrono>
#include <cstddef>
#include <optional>
#include <thread>
#include "pw_assert/check.h"
#include "pw_data_link/data_link.h"
#include "pw_data_link/server_socket.h"
#include "pw_data_link/socket_data_link.h"
#include "pw_data_link/socket_data_link_thread.h"
#include "pw_log/log.h"
#include "pw_result/result.h"
#include "pw_status/status.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/detached_thread.h"
#include "pw_thread/thread_core.h"
#include "pw_thread_stl/options.h"
namespace {
const char* kLocalHost = "localhost";
constexpr uint16_t kPort = 123;
struct LinkSignals {
bool run = true;
pw::sync::ThreadNotification ready_to_read;
pw::sync::ThreadNotification data_read;
pw::sync::ThreadNotification ready_to_write;
pw::StatusWithSize last_status = pw::StatusWithSize(0);
};
class Reader : public pw::thread::ThreadCore {
public:
Reader(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
: link_(link), link_signals_(link_signals) {}
void Run() override {
while (link_signals_.run) {
PW_LOG_DEBUG("Waiting to read");
link_signals_.ready_to_read.acquire();
PW_LOG_DEBUG("Reading");
const pw::Status status = link_.Read(buffer_);
if (!status.ok()) {
PW_LOG_ERROR("Failed to read. Error: %s", status.str());
continue;
}
PW_LOG_DEBUG("Waiting for read to be done");
link_signals_.data_read.acquire();
PW_LOG_DEBUG("Read returned %s (%d bytes)",
link_signals_.last_status.status().str(),
static_cast<int>(link_signals_.last_status.size()));
if (link_signals_.last_status.ok()) {
PW_LOG_INFO("%s", reinterpret_cast<char*>(buffer_.data()));
}
}
}
private:
pw::data_link::SocketDataLink& link_;
LinkSignals& link_signals_;
std::array<std::byte, 1024> buffer_;
};
class Writer : public pw::thread::ThreadCore {
public:
Writer(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
: link_(link), link_signals_(link_signals) {}
void Run() override {
while (link_signals_.run) {
link_signals_.ready_to_write.acquire();
std::optional<pw::ByteSpan> buffer = link_.GetWriteBuffer();
if (!buffer.has_value()) {
continue;
}
for (size_t i = 0; i < buffer->size(); ++i) {
buffer.value()[i] = std::byte('C');
}
buffer.value()[buffer->size() - 1] = std::byte(0);
const pw::Status status = link_.Write(*buffer);
if (!status.ok()) {
PW_LOG_ERROR("Write failed. Error: %s", status.str());
}
}
}
private:
pw::data_link::SocketDataLink& link_;
LinkSignals& link_signals_;
};
} // namespace
int main() {
PW_LOG_INFO("Started");
constexpr size_t kMaxLinks = 2;
pw::data_link::SocketDataLinkThread<kMaxLinks> links_thread{};
pw::data_link::ServerSocket server{kMaxLinks};
PW_CHECK_OK(server.Listen());
const pw::Result<int> connection_fd = server.Accept();
PW_CHECK_OK(connection_fd.status());
LinkSignals reader_link_signals{};
pw::data_link::SocketDataLink reader_link{
*connection_fd,
[&reader_link_signals](pw::data_link::DataLink::Event event,
pw::StatusWithSize status) {
reader_link_signals.last_status = status;
switch (event) {
case pw::data_link::DataLink::Event::kOpen: {
if (!reader_link_signals.last_status.ok()) {
PW_LOG_ERROR("Read link failed to open: %s",
reader_link_signals.last_status.status().str());
break;
}
reader_link_signals.ready_to_write.release();
reader_link_signals.ready_to_read.release();
} break;
case pw::data_link::DataLink::Event::kClosed:
reader_link_signals.run = false;
break;
case pw::data_link::DataLink::Event::kDataReceived:
reader_link_signals.ready_to_read.release();
break;
case pw::data_link::DataLink::Event::kDataRead:
reader_link_signals.data_read.release();
break;
case pw::data_link::DataLink::Event::kDataSent:
reader_link_signals.ready_to_write.release();
break;
}
}};
PW_CHECK_OK(links_thread.RegisterLink(reader_link));
pw::data_link::SocketDataLink writer_link{kLocalHost, kPort};
PW_CHECK_OK(links_thread.RegisterLink(writer_link));
LinkSignals writer_link_signals{};
writer_link.Open([&writer_link_signals](pw::data_link::DataLink::Event event,
pw::StatusWithSize status) {
writer_link_signals.last_status = status;
switch (event) {
case pw::data_link::DataLink::Event::kOpen: {
if (!writer_link_signals.last_status.ok()) {
PW_LOG_ERROR("Write link failed to open: %s",
writer_link_signals.last_status.status().str());
break;
}
writer_link_signals.ready_to_write.release();
} break;
case pw::data_link::DataLink::Event::kClosed:
writer_link_signals.run = false;
break;
case pw::data_link::DataLink::Event::kDataReceived:
writer_link_signals.ready_to_read.release();
break;
case pw::data_link::DataLink::Event::kDataRead:
writer_link_signals.ready_to_read.release();
break;
case pw::data_link::DataLink::Event::kDataSent:
writer_link_signals.ready_to_write.release();
break;
}
});
PW_LOG_INFO("Starting links thread");
pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);
PW_LOG_INFO("Starting reader thread");
Reader reader_thread{reader_link, reader_link_signals};
pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
PW_LOG_INFO("Starting writer thread");
Writer writer_thread{writer_link, writer_link_signals};
pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
links_thread.UnregisterLink(reader_link);
links_thread.UnregisterLink(writer_link);
reader_link.Close();
writer_link.Close();
links_thread.Stop();
PW_LOG_INFO("Terminating");
return 0;
}