pw_data_link: Add SocketDataLink
For experimentaition only!
Change-Id: I932b867c6526b5399b94244c743d2bd115cbf4e2
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/experimental/+/175676
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
Reviewed-by: Erik Gilling <konkers@google.com>
Commit-Queue: Carlos Chinchilla <cachinchilla@google.com>
diff --git a/pw_data_link/BUILD.gn b/pw_data_link/BUILD.gn
index 44063b9..eea6fb2 100644
--- a/pw_data_link/BUILD.gn
+++ b/pw_data_link/BUILD.gn
@@ -30,6 +30,23 @@
]
}
+pw_source_set("socket_data_link") {
+ public_configs = [ ":default_config" ]
+ public = [ "public/pw_data_link/socket_data_link.h" ]
+ public_deps = [
+ ":data_link",
+ "$dir_pw_assert",
+ "$dir_pw_bytes",
+ "$dir_pw_function",
+ "$dir_pw_status",
+ "$dir_pw_string:to_string",
+ "$dir_pw_sync:lock_annotations",
+ "$dir_pw_sync:mutex",
+ ]
+ deps = [ "$dir_pw_log" ]
+ sources = [ "socket_data_link.cc" ]
+}
+
pw_test("data_link_test") {
sources = [ "data_link_test.cc" ]
deps = [
@@ -39,6 +56,18 @@
]
}
+pw_test("socket_data_link_test") {
+ sources = [ "socket_data_link_test.cc" ]
+ deps = [
+ ":socket_data_link",
+ "$dir_pw_bytes",
+ "$dir_pw_status",
+ ]
+}
+
pw_test_group("tests") {
- tests = [ ":data_link_test" ]
+ tests = [
+ ":data_link_test",
+ ":socket_data_link_test",
+ ]
}
diff --git a/pw_data_link/public/pw_data_link/socket_data_link.h b/pw_data_link/public/pw_data_link/socket_data_link.h
new file mode 100644
index 0000000..c3243cd
--- /dev/null
+++ b/pw_data_link/public/pw_data_link/socket_data_link.h
@@ -0,0 +1,90 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <array>
+#include <cstdint>
+#include <optional>
+
+#include "pw_bytes/span.h"
+#include "pw_data_link/data_link.h"
+#include "pw_status/status.h"
+#include "pw_string/to_string.h"
+#include "pw_sync/lock_annotations.h"
+#include "pw_sync/mutex.h"
+
+namespace pw::data_link {
+
+class SocketDataLink : public DataLink {
+ public:
+ SocketDataLink(const char* host, uint16_t port);
+ SocketDataLink(int connection_fd, EventHandlerCallback&& event_handler);
+
+ ~SocketDataLink() override PW_LOCKS_EXCLUDED(lock_);
+
+ constexpr size_t mtu() { return kMtu; }
+ constexpr size_t max_payload_size() { return kMtu; }
+
+ // Waits for link state changes or events.
+ void WaitAndConsumeEvents() PW_LOCKS_EXCLUDED(lock_);
+
+ void Open(EventHandlerCallback&& event_handler) override
+ PW_LOCKS_EXCLUDED(lock_);
+ void Close() override PW_LOCKS_EXCLUDED(lock_);
+
+ std::optional<ByteSpan> GetWriteBuffer() override;
+ Status Write(ByteSpan buffer) override;
+ Status Read(ByteSpan buffer) override;
+
+ private:
+ static constexpr size_t kMtu = 1024;
+ static constexpr int kInvalidFd = -1;
+
+ enum class LinkState {
+ kOpen,
+ kOpenRequest,
+ kClosed,
+ } link_state_ PW_GUARDED_BY(lock_) = LinkState::kClosed;
+
+ enum class ReadState {
+ kIdle,
+ kReadRequested,
+ } read_state_ PW_GUARDED_BY(lock_) = ReadState::kIdle;
+
+ enum class WriteState {
+ kIdle,
+ kWaitingForWrite, // Buffer was provided.
+ kPending, // Write operation will occur.
+ } write_state_ PW_GUARDED_BY(lock_) = WriteState::kIdle;
+
+ void set_link_state(LinkState new_state) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+
+ void DoOpen() PW_UNLOCK_FUNCTION(lock_);
+ void DoClose() PW_UNLOCK_FUNCTION(lock_);
+ void DoRead() PW_UNLOCK_FUNCTION(lock_);
+ void DoWrite() PW_UNLOCK_FUNCTION(lock_);
+
+ const char* host_;
+ char port_[6];
+ int connection_fd_ PW_GUARDED_BY(lock_) = kInvalidFd;
+
+ std::array<std::byte, kMtu> tx_buffer_storage_{};
+ pw::ByteSpan tx_buffer_{};
+ size_t num_bytes_to_send_ = 0;
+ pw::ByteSpan rx_buffer_{};
+ EventHandlerCallback event_handler_;
+ pw::sync::Mutex lock_;
+};
+
+} // namespace pw::data_link
diff --git a/pw_data_link/socket_data_link.cc b/pw_data_link/socket_data_link.cc
new file mode 100644
index 0000000..8d8009b
--- /dev/null
+++ b/pw_data_link/socket_data_link.cc
@@ -0,0 +1,325 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#include "pw_data_link/socket_data_link.h"
+
+#if defined(_WIN32) && _WIN32
+// TODO(cachinchilla): add support for windows.
+#error Windows not supported yet!
+#else
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+#endif // defined(_WIN32) && _WIN32
+
+#include <optional>
+
+#include "pw_assert/check.h"
+#include "pw_bytes/span.h"
+#include "pw_log/log.h"
+#include "pw_status/status.h"
+#include "pw_status/status_with_size.h"
+
+namespace pw::data_link {
+namespace {
+
+const char* kLinkStateNameOpen = "Open";
+const char* kLinkStateNameOpenRequest = "kOpenRequest";
+const char* kLinkStateNameClosed = "Closed";
+const char* kLinkStateNameUnknown = "Unknown";
+
+} // namespace
+
+SocketDataLink::SocketDataLink(const char* host, uint16_t port) : host_(host) {
+ PW_CHECK(host_ != nullptr);
+ PW_CHECK(ToString(port, port_).ok());
+}
+
+SocketDataLink::SocketDataLink(int connection_fd,
+ EventHandlerCallback&& event_handler)
+ : connection_fd_(connection_fd) {
+ std::lock_guard lock(lock_);
+ event_handler_ = std::move(event_handler);
+ set_link_state(LinkState::kOpen);
+}
+
+SocketDataLink::~SocketDataLink() {
+ lock_.lock();
+ if (link_state_ != LinkState::kClosed) {
+ DoClose();
+ return;
+ }
+ lock_.unlock();
+}
+
+void SocketDataLink::set_link_state(LinkState new_state) {
+ const char* link_name;
+ switch (link_state_) {
+ case LinkState::kOpen:
+ link_name = kLinkStateNameOpen;
+ break;
+ case LinkState::kOpenRequest:
+ link_name = kLinkStateNameOpenRequest;
+ break;
+ case LinkState::kClosed:
+ link_name = kLinkStateNameClosed;
+ break;
+ default:
+ link_name = kLinkStateNameUnknown;
+ };
+
+ const char* new_link_name;
+ switch (new_state) {
+ case LinkState::kOpen:
+ new_link_name = kLinkStateNameOpen;
+ break;
+ case LinkState::kOpenRequest:
+ new_link_name = kLinkStateNameOpenRequest;
+ break;
+ case LinkState::kClosed:
+ new_link_name = kLinkStateNameClosed;
+ break;
+ default:
+ new_link_name = kLinkStateNameUnknown;
+ };
+ PW_LOG_DEBUG("Transitioning from %s to %s", link_name, new_link_name);
+ link_state_ = new_state;
+}
+
+void SocketDataLink::Open(EventHandlerCallback&& event_handler) {
+ std::lock_guard lock(lock_);
+ PW_CHECK(link_state_ == LinkState::kClosed);
+
+ event_handler_ = std::move(event_handler);
+ set_link_state(LinkState::kOpenRequest);
+}
+
+void SocketDataLink::WaitAndConsumeEvents() {
+ // Manually lock and unlock, since some functions may perform unlocking before
+ // calling the user's event callback, when locks cannot be held.
+ lock_.lock();
+ switch (link_state_) {
+ case LinkState::kOpen:
+ break;
+ case LinkState::kClosed:
+ break;
+ case LinkState::kOpenRequest:
+ DoOpen();
+ return;
+ }
+
+ // Read and Write procedures should be on their own thread, so they don't
+ // block each other. For now keep them in a single thread.
+ switch (write_state_) {
+ case WriteState::kIdle:
+ break;
+ case WriteState::kWaitingForWrite:
+ break;
+ case WriteState::kPending:
+ DoWrite();
+ lock_.lock();
+ }
+
+ switch (read_state_) {
+ case ReadState::kIdle:
+ break;
+ case ReadState::kReadRequested:
+ DoRead();
+ return;
+ }
+ lock_.unlock();
+}
+
+void SocketDataLink::DoOpen() {
+ addrinfo hints = {};
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_NUMERICSERV;
+ addrinfo* res;
+ if (getaddrinfo(host_, port_, &hints, &res) != 0) {
+ PW_LOG_ERROR("Failed to configure connection address for socket");
+ set_link_state(LinkState::kClosed);
+ lock_.unlock();
+ event_handler_(DataLink::Event::kOpen, StatusWithSize::InvalidArgument());
+ return;
+ }
+
+ addrinfo* rp;
+ for (rp = res; rp != nullptr; rp = rp->ai_next) {
+ connection_fd_ = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (connection_fd_ != kInvalidFd) {
+ break;
+ }
+ }
+
+ if (connection_fd_ == kInvalidFd) {
+ PW_LOG_ERROR("Failed to create a socket: %s", std::strerror(errno));
+ set_link_state(LinkState::kClosed);
+ lock_.unlock();
+ freeaddrinfo(res);
+ event_handler_(DataLink::Event::kOpen, StatusWithSize::Unknown());
+ return;
+ }
+
+// Set necessary options on a socket file descriptor.
+#if defined(__APPLE__)
+ // Use SO_NOSIGPIPE to avoid getting a SIGPIPE signal when the remote peer
+ // drops the connection. This is supported on macOS only.
+ constexpr int value = 1;
+ if (setsockopt(socket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(int)) < 0) {
+ PW_LOG_WARN("Failed to set SO_NOSIGPIPE: %s", std::strerror(errno));
+ }
+#endif // defined(__APPLE__)
+
+ if (connect(connection_fd_, rp->ai_addr, rp->ai_addrlen) == -1) {
+ close(connection_fd_);
+ connection_fd_ = kInvalidFd;
+ set_link_state(LinkState::kClosed);
+ lock_.unlock();
+ PW_LOG_ERROR(
+ "Failed to connect to %s:%s: %s", host_, port_, std::strerror(errno));
+ freeaddrinfo(res);
+ event_handler_(DataLink::Event::kOpen, StatusWithSize::Unknown());
+ return;
+ }
+ set_link_state(LinkState::kOpen);
+ lock_.unlock();
+ freeaddrinfo(res);
+ event_handler_(DataLink::Event::kOpen, StatusWithSize());
+}
+
+void SocketDataLink::Close() {
+ lock_.lock();
+ PW_DCHECK(link_state_ != LinkState::kClosed);
+ DoClose();
+}
+
+void SocketDataLink::DoClose() {
+ set_link_state(LinkState::kClosed);
+ if (connection_fd_ != kInvalidFd) {
+ close(connection_fd_);
+ connection_fd_ = kInvalidFd;
+ }
+ lock_.unlock();
+ event_handler_(DataLink::Event::kClosed, StatusWithSize());
+}
+
+std::optional<ByteSpan> SocketDataLink::GetWriteBuffer() {
+ std::lock_guard lock(lock_);
+ PW_CHECK(link_state_ == LinkState::kOpen);
+ if (write_state_ != WriteState::kIdle) {
+ return std::nullopt;
+ }
+ write_state_ = WriteState::kWaitingForWrite;
+ return tx_buffer_storage_;
+}
+
+Status SocketDataLink::Write(ByteSpan buffer) {
+ PW_DCHECK(buffer.size() > 0);
+ std::lock_guard lock(lock_);
+ PW_DCHECK(link_state_ == LinkState::kOpen);
+ if (write_state_ != WriteState::kWaitingForWrite) {
+ return pw::Status::FailedPrecondition();
+ }
+
+ tx_buffer_ = buffer;
+ num_bytes_to_send_ = tx_buffer_.size_bytes();
+ write_state_ = WriteState::kPending;
+ return OkStatus();
+}
+
+void SocketDataLink::DoWrite() {
+ int send_flags = 0;
+#if defined(__linux__)
+ // Use MSG_NOSIGNAL to avoid getting a SIGPIPE signal when the remote
+ // peer drops the connection. This is supported on Linux only.
+ send_flags |= MSG_NOSIGNAL;
+#endif // defined(__linux__)
+
+ ssize_t bytes_sent = send(connection_fd_,
+ reinterpret_cast<const char*>(tx_buffer_.data()),
+ tx_buffer_.size_bytes(),
+ send_flags);
+ if (static_cast<size_t>(bytes_sent) == tx_buffer_.size_bytes()) {
+ write_state_ = WriteState::kIdle;
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataSent,
+ StatusWithSize(num_bytes_to_send_));
+ return;
+ }
+
+ if (bytes_sent < 0) {
+ if (errno == EPIPE) {
+ // An EPIPE indicates that the connection is closed.
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataSent, StatusWithSize::OutOfRange());
+ Close();
+ return;
+ }
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataSent, StatusWithSize::Unknown());
+ return;
+ }
+
+ // Partial send.
+ tx_buffer_ = tx_buffer_.subspan(static_cast<size_t>(bytes_sent));
+ lock_.unlock();
+}
+
+Status SocketDataLink::Read(ByteSpan buffer) {
+ PW_DCHECK(buffer.size() > 0);
+ std::lock_guard lock(lock_);
+ PW_DCHECK(link_state_ == LinkState::kOpen);
+ if (read_state_ != ReadState::kIdle) {
+ return Status::FailedPrecondition();
+ }
+ rx_buffer_ = buffer;
+ read_state_ = ReadState::kReadRequested;
+ return OkStatus();
+}
+
+void SocketDataLink::DoRead() {
+ ssize_t bytes_rcvd = recv(connection_fd_,
+ reinterpret_cast<char*>(rx_buffer_.data()),
+ rx_buffer_.size_bytes(),
+ 0);
+
+ if (bytes_rcvd == 0) {
+ // Remote peer has closed the connection.
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataRead, StatusWithSize::Internal());
+ Close();
+ return;
+ } else if (bytes_rcvd < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Socket timed out when trying to read.
+ // This should only occur if SO_RCVTIMEO was configured to be nonzero, or
+ // if the socket was opened with the O_NONBLOCK flag to prevent any
+ // blocking when performing reads or writes.
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataRead,
+ StatusWithSize::ResourceExhausted());
+ return;
+ }
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataRead, StatusWithSize::Unknown());
+ return;
+ }
+ lock_.unlock();
+ event_handler_(DataLink::Event::kDataRead, StatusWithSize(bytes_rcvd));
+}
+
+} // namespace pw::data_link
diff --git a/pw_data_link/socket_data_link_test.cc b/pw_data_link/socket_data_link_test.cc
new file mode 100644
index 0000000..2457ac1
--- /dev/null
+++ b/pw_data_link/socket_data_link_test.cc
@@ -0,0 +1,57 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_data_link/socket_data_link.h"
+
+#include <array>
+#include <optional>
+
+#include "gtest/gtest.h"
+#include "pw_bytes/span.h"
+#include "pw_status/status.h"
+#include "pw_status/status_with_size.h"
+
+namespace pw::data_link {
+namespace {
+
+struct EventCallbackTracker {
+ size_t call_count = 0;
+ DataLink::Event last_event;
+ pw::StatusWithSize last_event_status;
+};
+
+TEST(DataLinkTest, CompileTest) {
+ SocketDataLink data_link{"localhost", 123};
+
+ EventCallbackTracker callback_tracker;
+ data_link.Open(
+ [&callback_tracker](DataLink::Event event, pw::StatusWithSize status) {
+ callback_tracker.last_event = event;
+ callback_tracker.last_event_status = status;
+ ++callback_tracker.call_count;
+ });
+
+ data_link.WaitAndConsumeEvents();
+
+ ASSERT_EQ(callback_tracker.call_count, 1u);
+ EXPECT_EQ(callback_tracker.last_event, DataLink::Event::kOpen);
+ ASSERT_NE(callback_tracker.last_event_status.status(), OkStatus());
+
+ // std::array<std::byte, 2> buffer{};
+ // EXPECT_EQ(data_link.Write(buffer), Status::Unknown());
+ // EXPECT_EQ(data_link.Read(buffer), Status::Unknown());
+}
+
+} // namespace
+} // namespace pw::data_link
\ No newline at end of file