pw_data_link: Make sample app take roles

The Sample app can take the role of a link writer or reader.
It can also take the role of a socket server or client.

Test:
  Terminal 1:
    out/host_debug/obj/pw_data_link/bin/sample_app --serve
  Terminal 2:
    out/host_debug/obj/pw_data_link/bin/sample_app --reader

Change-Id: I9bc86aab6e417251b2bf3beb4f57597112e362e0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/experimental/+/176988
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Anthony DiGirolamo <tonymd@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed-service-accounts.iam.gserviceaccount.com>
diff --git a/pw_data_link/sample_app.cc b/pw_data_link/sample_app.cc
index 48ea3c7..dbc9a95 100644
--- a/pw_data_link/sample_app.cc
+++ b/pw_data_link/sample_app.cc
@@ -1,6 +1,12 @@
+#include <unistd.h>
+
 #include <array>
 #include <chrono>
 #include <cstddef>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
 #include <optional>
 #include <thread>
 
@@ -20,7 +26,7 @@
 namespace {
 
 const char* kLocalHost = "localhost";
-constexpr uint16_t kPort = 123;
+constexpr int kDefaultPort = 33001;
 
 struct LinkSignals {
   bool run = true;
@@ -39,6 +45,9 @@
     while (link_signals_.run) {
       PW_LOG_DEBUG("Waiting to read");
       link_signals_.ready_to_read.acquire();
+      if (!link_signals_.run) {
+        break;
+      }
       PW_LOG_DEBUG("Reading");
       const pw::Status status = link_.Read(buffer_);
       if (!status.ok()) {
@@ -54,6 +63,7 @@
         PW_LOG_INFO("%s", reinterpret_cast<char*>(buffer_.data()));
       }
     }
+    PW_LOG_INFO("Reader thread stopped");
   }
 
  private:
@@ -70,6 +80,9 @@
   void Run() override {
     while (link_signals_.run) {
       link_signals_.ready_to_write.acquire();
+      if (!link_signals_.run) {
+        break;
+      }
       std::optional<pw::ByteSpan> buffer = link_.GetWriteBuffer();
       if (!buffer.has_value()) {
         continue;
@@ -83,6 +96,7 @@
         PW_LOG_ERROR("Write failed. Error: %s", status.str());
       }
     }
+    PW_LOG_INFO("Writer thread stopped");
   }
 
  private:
@@ -92,97 +106,118 @@
 
 }  // namespace
 
-int main() {
+void print_help_menu() {
+  std::cout << "Data Link sample app." << std::endl << std::endl;
+  std::cout << "Use --server to serve a socket." << std::endl;
+  std::cout << "Use --port <NUMBER> to:" << std::endl;
+  std::cout << "  - serve a socket on the given port when --server is set, or"
+            << std::endl;
+  std::cout << "  - connect to a socket on the given port." << std::endl;
+  std::cout << "  Defaults to port " << kDefaultPort << "." << std::endl;
+  std::cout << "Use --reader to make the link's role read only." << std::endl;
+  std::cout << "  Defaults to writer only role." << std::endl;
+  std::cout << "Use -h to print this menu and exit." << std::endl;
+}
+
+int main(int argc, char** argv) {
+  constexpr size_t kMaxLinks = 1;
+  bool is_reader = false;
+  bool is_server = false;
+  int port = kDefaultPort;
+
+  for (int i = 1; i < argc; i++) {
+    if (strcmp(argv[i], "--port") == 0 && i < argc - 1) {
+      port = std::atoi(argv[++i]);
+    } else if (strcmp(argv[i], "--server") == 0) {
+      is_server = true;
+    } else if (strcmp(argv[i], "--reader") == 0) {
+      is_reader = true;
+    } else if (strcmp(argv[i], "-h") == 0) {
+      print_help_menu();
+      exit(0);
+    } else {
+      PW_LOG_ERROR("Invalid argument '%s'", argv[i]);
+      print_help_menu();
+      exit(-1);
+    }
+  }
+
   PW_LOG_INFO("Started");
-  constexpr size_t kMaxLinks = 2;
-  pw::data_link::SocketDataLinkThread<kMaxLinks> links_thread{};
 
-  pw::data_link::ServerSocket server{kMaxLinks};
-  PW_CHECK_OK(server.Listen());
-  const pw::Result<int> connection_fd = server.Accept();
-  PW_CHECK_OK(connection_fd.status());
-
-  LinkSignals reader_link_signals{};
-  pw::data_link::SocketDataLink reader_link{
-      *connection_fd,
-      [&reader_link_signals](pw::data_link::DataLink::Event event,
-                             pw::StatusWithSize status) {
-        reader_link_signals.last_status = status;
-        switch (event) {
-          case pw::data_link::DataLink::Event::kOpen: {
-            if (!reader_link_signals.last_status.ok()) {
-              PW_LOG_ERROR("Read link failed to open: %s",
-                           reader_link_signals.last_status.status().str());
-              break;
-            }
-            reader_link_signals.ready_to_write.release();
-            reader_link_signals.ready_to_read.release();
-          } break;
-          case pw::data_link::DataLink::Event::kClosed:
-            reader_link_signals.run = false;
-            break;
-          case pw::data_link::DataLink::Event::kDataReceived:
-            reader_link_signals.ready_to_read.release();
-            break;
-          case pw::data_link::DataLink::Event::kDataRead:
-            reader_link_signals.data_read.release();
-            break;
-          case pw::data_link::DataLink::Event::kDataSent:
-            reader_link_signals.ready_to_write.release();
-            break;
-        }
-      }};
-  PW_CHECK_OK(links_thread.RegisterLink(reader_link));
-
-  pw::data_link::SocketDataLink writer_link{kLocalHost, kPort};
-  PW_CHECK_OK(links_thread.RegisterLink(writer_link));
-  LinkSignals writer_link_signals{};
-  writer_link.Open([&writer_link_signals](pw::data_link::DataLink::Event event,
-                                          pw::StatusWithSize status) {
-    writer_link_signals.last_status = status;
+  LinkSignals link_signals{};
+  auto event_callback = [&link_signals](pw::data_link::DataLink::Event event,
+                                        pw::StatusWithSize status) {
+    link_signals.last_status = status;
     switch (event) {
       case pw::data_link::DataLink::Event::kOpen: {
-        if (!writer_link_signals.last_status.ok()) {
-          PW_LOG_ERROR("Write link failed to open: %s",
-                       writer_link_signals.last_status.status().str());
-          break;
+        if (!link_signals.last_status.ok()) {
+          PW_LOG_ERROR("Link failed to open: %s",
+                       link_signals.last_status.status().str());
+          link_signals.run = false;
         }
-        writer_link_signals.ready_to_write.release();
+        link_signals.ready_to_write.release();
+        link_signals.ready_to_read.release();
       } break;
       case pw::data_link::DataLink::Event::kClosed:
-        writer_link_signals.run = false;
+        link_signals.run = false;
+        link_signals.ready_to_read.release();
+        link_signals.ready_to_write.release();
         break;
       case pw::data_link::DataLink::Event::kDataReceived:
-        writer_link_signals.ready_to_read.release();
+        link_signals.ready_to_read.release();
         break;
       case pw::data_link::DataLink::Event::kDataRead:
-        writer_link_signals.ready_to_read.release();
+        link_signals.data_read.release();
         break;
       case pw::data_link::DataLink::Event::kDataSent:
-        writer_link_signals.ready_to_write.release();
+        link_signals.ready_to_write.release();
         break;
     }
-  });
+  };
+
+  std::unique_ptr<pw::data_link::SocketDataLink> link;
+  if (is_server) {
+    PW_LOG_INFO("Serving on port %d", static_cast<int>(port));
+    pw::data_link::ServerSocket server{kMaxLinks};
+    PW_CHECK_OK(server.Listen(port));
+
+    PW_LOG_INFO("Waiting for connection");
+    const pw::Result<int> connection_fd = server.Accept();
+    PW_CHECK_OK(connection_fd.status());
+
+    PW_LOG_INFO("Creating Link");
+    link = std::make_unique<pw::data_link::SocketDataLink>(*connection_fd,
+                                                           event_callback);
+  } else {
+    PW_LOG_INFO("Openning Link");
+    link = std::make_unique<pw::data_link::SocketDataLink>(kLocalHost, port);
+    link->Open(event_callback);
+  }
+
+  pw::data_link::SocketDataLinkThread<kMaxLinks> links_thread{};
+  PW_CHECK_OK(links_thread.RegisterLink(*link));
 
   PW_LOG_INFO("Starting links thread");
   pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);
 
-  PW_LOG_INFO("Starting reader thread");
-  Reader reader_thread{reader_link, reader_link_signals};
-  pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
-
-  PW_LOG_INFO("Starting writer thread");
-  Writer writer_thread{writer_link, writer_link_signals};
-  pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
-
-  while (true) {
-    std::this_thread::sleep_for(std::chrono::seconds(10));
+  if (is_reader) {
+    PW_LOG_INFO("Starting reader thread");
+    Reader reader_thread{*link, link_signals};
+    pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
+  } else {
+    PW_LOG_INFO("Starting writer thread");
+    Writer writer_thread{*link, link_signals};
+    pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
   }
 
-  links_thread.UnregisterLink(reader_link);
-  links_thread.UnregisterLink(writer_link);
-  reader_link.Close();
-  writer_link.Close();
+  PW_LOG_INFO("Running for some time");
+  for (int i = 0; i < 30 && link_signals.run; ++i) {
+    sleep(1);
+  }
+
+  PW_LOG_INFO("Closing link");
+  links_thread.UnregisterLink(*link);
+  PW_LOG_INFO("Stopping links thread");
   links_thread.Stop();
   PW_LOG_INFO("Terminating");
   return 0;
diff --git a/pw_data_link/socket_data_link.cc b/pw_data_link/socket_data_link.cc
index 8d8009b..0189ff7 100644
--- a/pw_data_link/socket_data_link.cc
+++ b/pw_data_link/socket_data_link.cc
@@ -51,6 +51,7 @@
 SocketDataLink::SocketDataLink(int connection_fd,
                                EventHandlerCallback&& event_handler)
     : connection_fd_(connection_fd) {
+  PW_DCHECK(connection_fd > 0);
   std::lock_guard lock(lock_);
   event_handler_ = std::move(event_handler);
   set_link_state(LinkState::kOpen);