pw_data_link: Make sample app take roles
The Sample app can take the role of a link writer or reader.
It can also take the role of a socket server or client.
Test:
Terminal 1:
out/host_debug/obj/pw_data_link/bin/sample_app --serve
Terminal 2:
out/host_debug/obj/pw_data_link/bin/sample_app --reader
Change-Id: I9bc86aab6e417251b2bf3beb4f57597112e362e0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/experimental/+/176988
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Anthony DiGirolamo <tonymd@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed-service-accounts.iam.gserviceaccount.com>
diff --git a/pw_data_link/sample_app.cc b/pw_data_link/sample_app.cc
index 48ea3c7..dbc9a95 100644
--- a/pw_data_link/sample_app.cc
+++ b/pw_data_link/sample_app.cc
@@ -1,6 +1,12 @@
+#include <unistd.h>
+
#include <array>
#include <chrono>
#include <cstddef>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
#include <optional>
#include <thread>
@@ -20,7 +26,7 @@
namespace {
const char* kLocalHost = "localhost";
-constexpr uint16_t kPort = 123;
+constexpr int kDefaultPort = 33001;
struct LinkSignals {
bool run = true;
@@ -39,6 +45,9 @@
while (link_signals_.run) {
PW_LOG_DEBUG("Waiting to read");
link_signals_.ready_to_read.acquire();
+ if (!link_signals_.run) {
+ break;
+ }
PW_LOG_DEBUG("Reading");
const pw::Status status = link_.Read(buffer_);
if (!status.ok()) {
@@ -54,6 +63,7 @@
PW_LOG_INFO("%s", reinterpret_cast<char*>(buffer_.data()));
}
}
+ PW_LOG_INFO("Reader thread stopped");
}
private:
@@ -70,6 +80,9 @@
void Run() override {
while (link_signals_.run) {
link_signals_.ready_to_write.acquire();
+ if (!link_signals_.run) {
+ break;
+ }
std::optional<pw::ByteSpan> buffer = link_.GetWriteBuffer();
if (!buffer.has_value()) {
continue;
@@ -83,6 +96,7 @@
PW_LOG_ERROR("Write failed. Error: %s", status.str());
}
}
+ PW_LOG_INFO("Writer thread stopped");
}
private:
@@ -92,97 +106,118 @@
} // namespace
-int main() {
+void print_help_menu() {
+ std::cout << "Data Link sample app." << std::endl << std::endl;
+ std::cout << "Use --server to serve a socket." << std::endl;
+ std::cout << "Use --port <NUMBER> to:" << std::endl;
+ std::cout << " - serve a socket on the given port when --server is set, or"
+ << std::endl;
+ std::cout << " - connect to a socket on the given port." << std::endl;
+ std::cout << " Defaults to port " << kDefaultPort << "." << std::endl;
+ std::cout << "Use --reader to make the link's role read only." << std::endl;
+ std::cout << " Defaults to writer only role." << std::endl;
+ std::cout << "Use -h to print this menu and exit." << std::endl;
+}
+
+int main(int argc, char** argv) {
+ constexpr size_t kMaxLinks = 1;
+ bool is_reader = false;
+ bool is_server = false;
+ int port = kDefaultPort;
+
+ for (int i = 1; i < argc; i++) {
+ if (strcmp(argv[i], "--port") == 0 && i < argc - 1) {
+ port = std::atoi(argv[++i]);
+ } else if (strcmp(argv[i], "--server") == 0) {
+ is_server = true;
+ } else if (strcmp(argv[i], "--reader") == 0) {
+ is_reader = true;
+ } else if (strcmp(argv[i], "-h") == 0) {
+ print_help_menu();
+ exit(0);
+ } else {
+ PW_LOG_ERROR("Invalid argument '%s'", argv[i]);
+ print_help_menu();
+ exit(-1);
+ }
+ }
+
PW_LOG_INFO("Started");
- constexpr size_t kMaxLinks = 2;
- pw::data_link::SocketDataLinkThread<kMaxLinks> links_thread{};
- pw::data_link::ServerSocket server{kMaxLinks};
- PW_CHECK_OK(server.Listen());
- const pw::Result<int> connection_fd = server.Accept();
- PW_CHECK_OK(connection_fd.status());
-
- LinkSignals reader_link_signals{};
- pw::data_link::SocketDataLink reader_link{
- *connection_fd,
- [&reader_link_signals](pw::data_link::DataLink::Event event,
- pw::StatusWithSize status) {
- reader_link_signals.last_status = status;
- switch (event) {
- case pw::data_link::DataLink::Event::kOpen: {
- if (!reader_link_signals.last_status.ok()) {
- PW_LOG_ERROR("Read link failed to open: %s",
- reader_link_signals.last_status.status().str());
- break;
- }
- reader_link_signals.ready_to_write.release();
- reader_link_signals.ready_to_read.release();
- } break;
- case pw::data_link::DataLink::Event::kClosed:
- reader_link_signals.run = false;
- break;
- case pw::data_link::DataLink::Event::kDataReceived:
- reader_link_signals.ready_to_read.release();
- break;
- case pw::data_link::DataLink::Event::kDataRead:
- reader_link_signals.data_read.release();
- break;
- case pw::data_link::DataLink::Event::kDataSent:
- reader_link_signals.ready_to_write.release();
- break;
- }
- }};
- PW_CHECK_OK(links_thread.RegisterLink(reader_link));
-
- pw::data_link::SocketDataLink writer_link{kLocalHost, kPort};
- PW_CHECK_OK(links_thread.RegisterLink(writer_link));
- LinkSignals writer_link_signals{};
- writer_link.Open([&writer_link_signals](pw::data_link::DataLink::Event event,
- pw::StatusWithSize status) {
- writer_link_signals.last_status = status;
+ LinkSignals link_signals{};
+ auto event_callback = [&link_signals](pw::data_link::DataLink::Event event,
+ pw::StatusWithSize status) {
+ link_signals.last_status = status;
switch (event) {
case pw::data_link::DataLink::Event::kOpen: {
- if (!writer_link_signals.last_status.ok()) {
- PW_LOG_ERROR("Write link failed to open: %s",
- writer_link_signals.last_status.status().str());
- break;
+ if (!link_signals.last_status.ok()) {
+ PW_LOG_ERROR("Link failed to open: %s",
+ link_signals.last_status.status().str());
+ link_signals.run = false;
}
- writer_link_signals.ready_to_write.release();
+ link_signals.ready_to_write.release();
+ link_signals.ready_to_read.release();
} break;
case pw::data_link::DataLink::Event::kClosed:
- writer_link_signals.run = false;
+ link_signals.run = false;
+ link_signals.ready_to_read.release();
+ link_signals.ready_to_write.release();
break;
case pw::data_link::DataLink::Event::kDataReceived:
- writer_link_signals.ready_to_read.release();
+ link_signals.ready_to_read.release();
break;
case pw::data_link::DataLink::Event::kDataRead:
- writer_link_signals.ready_to_read.release();
+ link_signals.data_read.release();
break;
case pw::data_link::DataLink::Event::kDataSent:
- writer_link_signals.ready_to_write.release();
+ link_signals.ready_to_write.release();
break;
}
- });
+ };
+
+ std::unique_ptr<pw::data_link::SocketDataLink> link;
+ if (is_server) {
+ PW_LOG_INFO("Serving on port %d", static_cast<int>(port));
+ pw::data_link::ServerSocket server{kMaxLinks};
+ PW_CHECK_OK(server.Listen(port));
+
+ PW_LOG_INFO("Waiting for connection");
+ const pw::Result<int> connection_fd = server.Accept();
+ PW_CHECK_OK(connection_fd.status());
+
+ PW_LOG_INFO("Creating Link");
+ link = std::make_unique<pw::data_link::SocketDataLink>(*connection_fd,
+ event_callback);
+ } else {
+ PW_LOG_INFO("Openning Link");
+ link = std::make_unique<pw::data_link::SocketDataLink>(kLocalHost, port);
+ link->Open(event_callback);
+ }
+
+ pw::data_link::SocketDataLinkThread<kMaxLinks> links_thread{};
+ PW_CHECK_OK(links_thread.RegisterLink(*link));
PW_LOG_INFO("Starting links thread");
pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);
- PW_LOG_INFO("Starting reader thread");
- Reader reader_thread{reader_link, reader_link_signals};
- pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
-
- PW_LOG_INFO("Starting writer thread");
- Writer writer_thread{writer_link, writer_link_signals};
- pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
-
- while (true) {
- std::this_thread::sleep_for(std::chrono::seconds(10));
+ if (is_reader) {
+ PW_LOG_INFO("Starting reader thread");
+ Reader reader_thread{*link, link_signals};
+ pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
+ } else {
+ PW_LOG_INFO("Starting writer thread");
+ Writer writer_thread{*link, link_signals};
+ pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
}
- links_thread.UnregisterLink(reader_link);
- links_thread.UnregisterLink(writer_link);
- reader_link.Close();
- writer_link.Close();
+ PW_LOG_INFO("Running for some time");
+ for (int i = 0; i < 30 && link_signals.run; ++i) {
+ sleep(1);
+ }
+
+ PW_LOG_INFO("Closing link");
+ links_thread.UnregisterLink(*link);
+ PW_LOG_INFO("Stopping links thread");
links_thread.Stop();
PW_LOG_INFO("Terminating");
return 0;
diff --git a/pw_data_link/socket_data_link.cc b/pw_data_link/socket_data_link.cc
index 8d8009b..0189ff7 100644
--- a/pw_data_link/socket_data_link.cc
+++ b/pw_data_link/socket_data_link.cc
@@ -51,6 +51,7 @@
SocketDataLink::SocketDataLink(int connection_fd,
EventHandlerCallback&& event_handler)
: connection_fd_(connection_fd) {
+ PW_DCHECK(connection_fd > 0);
std::lock_guard lock(lock_);
event_handler_ = std::move(event_handler);
set_link_state(LinkState::kOpen);