pw_data_link: The experiment is over.
This code is no longer needed here for experimentation.
Change-Id: I0b757d07c1f49677cab2c1d40b5306e7e7f6bcc5
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/experimental/+/195920
Reviewed-by: Taylor Cramer <cramertj@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed-service-accounts.iam.gserviceaccount.com>
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Carlos Chinchilla <cachinchilla@google.com>
GitOrigin-RevId: 7c7737fee466282be1aaea657a6d853d2248dee1
diff --git a/BUILD.gn b/BUILD.gn
index 91c2572..cca71ab 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -44,7 +44,6 @@
deps = [
":host_tests(//targets/host:host_debug_tests)",
"$dir_pw_color:tests.run(//targets/host:host_debug_tests)",
- "$dir_pw_data_link:tests.run(//targets/host:host_debug_tests)",
"$dir_pw_display:tests.run(//targets/host:host_debug_tests)",
"$dir_pw_draw:tests.run(//targets/host:host_debug_tests)",
"$dir_pw_framebuffer:tests.run(//targets/host:host_debug_tests)",
@@ -287,7 +286,6 @@
# Host applications steps.
deps += [
":applications_tests(//targets/host:host_debug_tests)",
- "$dir_pw_data_link:sample_app(//targets/host:host_debug)",
"//applications/blinky:blinky(//targets/host:host_debug)",
"//applications/strings:all(//targets/host:host_debug)",
]
diff --git a/modules.gni b/modules.gni
index dcd3e73..26bfd48 100644
--- a/modules.gni
+++ b/modules.gni
@@ -30,7 +30,6 @@
get_path_info("pw_board_led_stm32cube", "abspath")
dir_pw_color = get_path_info("pw_graphics/pw_color", "abspath")
dir_pw_math = get_path_info("pw_graphics/pw_math", "abspath")
- dir_pw_data_link = get_path_info("pw_data_link", "abspath")
dir_pw_digital_io_arduino = get_path_info("pw_digital_io_arduino", "abspath")
dir_pw_digital_io_stm32cube =
get_path_info("pw_digital_io_stm32cube", "abspath")
diff --git a/pw_data_link/BUILD.gn b/pw_data_link/BUILD.gn
deleted file mode 100644
index ea49ba9..0000000
--- a/pw_data_link/BUILD.gn
+++ /dev/null
@@ -1,135 +0,0 @@
-# Copyright 2023 The Pigweed Authors
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-import("//build_overrides/pigweed.gni")
-
-import("$dir_pw_unit_test/test.gni")
-
-config("default_config") {
- include_dirs = [ "public" ]
-}
-
-pw_source_set("data_link") {
- public_configs = [ ":default_config" ]
- public = [ "public/pw_data_link/data_link.h" ]
- public_deps = [
- "$dir_pw_allocator",
- "$dir_pw_bytes",
- "$dir_pw_function",
- "$dir_pw_multibuf",
- "$dir_pw_status",
- ]
-}
-
-pw_source_set("socket_data_link") {
- public_configs = [ ":default_config" ]
- public = [ "public/pw_data_link/socket_data_link.h" ]
- public_deps = [
- ":data_link",
- "$dir_pw_allocator",
- "$dir_pw_assert",
- "$dir_pw_bytes",
- "$dir_pw_function",
- "$dir_pw_multibuf",
- "$dir_pw_status",
- "$dir_pw_string:to_string",
- "$dir_pw_sync:lock_annotations",
- "$dir_pw_sync:mutex",
- ]
- deps = [
- "$dir_pw_log",
- "$dir_pw_multibuf:header_chunk_region_tracker",
- ]
- sources = [ "socket_data_link.cc" ]
-}
-
-pw_source_set("data_link_thread") {
- public_configs = [ ":default_config" ]
- public = [ "public/pw_data_link/socket_data_link_thread.h" ]
- public_deps = [
- ":socket_data_link",
- "$dir_pw_status",
- "$dir_pw_sync:lock_annotations",
- "$dir_pw_sync:mutex",
- "$dir_pw_sync:thread_notification",
- "$dir_pw_thread:thread_core",
- "$dir_pw_thread:yield",
- ]
-}
-
-pw_source_set("server_socket") {
- public_configs = [ ":default_config" ]
- public = [ "public/pw_data_link/server_socket.h" ]
- sources = [ "server_socket.cc" ]
- public_deps = [
- "$dir_pw_assert",
- "$dir_pw_result",
- "$dir_pw_status",
- ]
-}
-
-# pw_data_link requires Linux sys/epoll.h
-if (host_os == "linux") {
- pw_executable("sample_app") {
- sources = [ "sample_app.cc" ]
- deps = [
- ":data_link",
- ":data_link_thread",
- ":server_socket",
- ":socket_data_link",
- "$dir_pw_allocator:block_allocator",
- "$dir_pw_assert",
- "$dir_pw_log",
- "$dir_pw_sync:thread_notification",
- "$dir_pw_thread:thread",
- "$dir_pw_thread:thread_core",
- ]
- }
-} else {
- group("sample_app") {
- }
-}
-
-pw_test("data_link_test") {
- sources = [ "data_link_test.cc" ]
- deps = [
- ":data_link",
- "$dir_pw_allocator",
- "$dir_pw_bytes",
- "$dir_pw_multibuf",
- "$dir_pw_status",
- ]
-}
-
-pw_test("socket_data_link_test") {
- sources = [ "socket_data_link_test.cc" ]
- deps = [
- ":socket_data_link",
- "$dir_pw_allocator:block_allocator",
- "$dir_pw_bytes",
- "$dir_pw_status",
- ]
-}
-
-pw_test_group("tests") {
- tests = []
-
- # pw_data_link requires Linux sys/epoll.h
- if (host_os == "linux") {
- tests += [
- ":data_link_test",
- ":socket_data_link_test",
- ]
- }
-}
diff --git a/pw_data_link/data_link_test.cc b/pw_data_link/data_link_test.cc
deleted file mode 100644
index c7ebdee..0000000
--- a/pw_data_link/data_link_test.cc
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2023 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#include "pw_data_link/data_link.h"
-
-#include <optional>
-#include <utility>
-
-#include "gtest/gtest.h"
-#include "pw_allocator/allocator.h"
-#include "pw_bytes/span.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-
-namespace pw::data_link {
-namespace {
-
-class MockedDataLink : public DataLink {
- public:
- ~MockedDataLink() {}
- constexpr size_t mtu() { return 255; }
- constexpr size_t max_payload_size() { return 255; }
- void Open(EventHandlerCallback&& /*event_handler*/,
- allocator::Allocator& /*write_buffer_allocator*/) override {}
- void Close() override {}
- std::optional<multibuf::MultiBuf> GetWriteBuffer(size_t /*size*/) override {
- return std::nullopt;
- }
- Status Write(multibuf::MultiBuf&& /*buffer*/) override {
- return Status::Unimplemented();
- }
- Status Read(ByteSpan /*buffer*/) override { return Status::Unimplemented(); }
-};
-
-TEST(DataLinkTest, CompileTest) {
- MockedDataLink data_link{};
- multibuf::MultiBuf buffer;
- EXPECT_EQ(data_link.Write(std::move(buffer)), Status::Unimplemented());
-}
-
-} // namespace
-} // namespace pw::data_link
\ No newline at end of file
diff --git a/pw_data_link/public/pw_data_link/data_link.h b/pw_data_link/public/pw_data_link/data_link.h
deleted file mode 100644
index ec405a4..0000000
--- a/pw_data_link/public/pw_data_link/data_link.h
+++ /dev/null
@@ -1,131 +0,0 @@
-// Copyright 2023 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include <optional>
-
-#include "pw_allocator/allocator.h"
-#include "pw_bytes/span.h"
-#include "pw_function/function.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-#include "pw_status/status_with_size.h"
-
-namespace pw::data_link {
-
-// Generic Data Link interface.
-class DataLink {
- public:
- // Events escalated to upper layers.
- enum class Event {
- kOpen, // The link is now open.
- kClosed, // The link is now closed.
- kDataReceived, // Data has been received. Use Read() to fetch it.
- kDataRead, // Reading into the provided buffer in Read() is complete.
- // The buffer is now free.
- // WriteState: kReadPending -> kReadDataReady.
- kDataSent, // The data in Write() is in the send process and the buffer
- // provided is now free. The link is ready to send more
- // data.
- };
-
- // Logical states:
- // WriteState - { kWriteIdle, kWritePending }
- // ReadState - { kReadIdle, kReadPending, kReadDataReady }
- // LinkState - { kConnected, kConnectionPending, kDisconnected }
-
- using EventHandlerCallback = pw::Function<void(Event, pw::StatusWithSize)>;
-
- // Closes the underlying links. There will not be any events after this.
- virtual ~DataLink() = 0;
-
- // The MTU byte size of the packet which is the payload size plus header(s).
- //
- // This should only be used to size receive buffers to enable zero copying.
- constexpr size_t mtu() { return 0; }
-
- // Initializes the link peripherals if necessary, and the working thread or
- // threads depending on the implementation.
- // The event handler callback will be called in the working thread.
- // Wait for a kConnected event and check its status.
- //
- // Precondition: link is closed.
- virtual void Open(EventHandlerCallback&& event_handler,
- allocator::Allocator& write_buffer_allocator) = 0;
-
- // Closes the underlying link, cancelling any pending operations.
- //
- // Precondition: link is open.
- virtual void Close() = 0;
-
- // Gets the location where the outgoing data can be written to if there is no
- // ongoing write process. Otherwise, wait for the next kDataSent event.
- //
- // Precondition:
- // Link is open.
- virtual std::optional<multibuf::MultiBuf> GetWriteBuffer(size_t size) = 0;
-
- // Writes the entire MultiBuf. The send operation finishes when the kDataSent
- // event is emitted. The event has the operation status and how many bytes
- // were sent, which must be the size of the provided buffer, since no partial
- // writes are supported.
- //
- // Note: If the caller has a MultiBuf partially filled with data to send, they
- // must remove any unused ``MultiBuf::Chunk``s and truncate any partially
- // filled ones to avoid writing the empty ``MultiBuf::Chunk``s.
- //
- // Precondition:
- // Link is open.
- // No write operation is in progress.
- //
- // Returns:
- // OK: The buffer is successfully in the send process.
- // FAILED_PRECONDITION: A write operation is in process. Wait for the next
- // kDataSent event.
- // INVALID_ARGUMENT: The write buffer is empty.
- //
- // To send data:
- // 1. Get a buffer to write to with GetWriteBuffer().
- // 2. Write into buffer.
- // 3. Call write with the buffer written to.
- // 4. Wait for kDataSent.
- // 5. Another buffer can be requested.
- virtual Status Write(multibuf::MultiBuf&& buffer) = 0;
-
- // Reads a packet into the provided buffer without blocking. The user cannot
- // modify the buffer when Read() is called until the read operation is done
- // and the kDataRead event is emitted. The event has the operation status and
- // how many bytes were read.
- //
- // Precondition:
- // Link is open.
- // No read operation is in progress.
- //
- // Returns:
- // OK: The buffer is successfully in the read process.
- // Wait for kDataRead event.
- // FAILED_PRECONDITION: A read operation is in progress. Wait for the next
- // kDataReceived event.
- //
- // To read data:
- // 1. (Optional) Wait for the kDataReceived event.
- // 2. Pass in the input buffer with the amount of bytes to read.
- // 3. Wait for the kDataRead event.
- // 4. The buffer can be reused now.
- virtual Status Read(pw::ByteSpan buffer) = 0;
-};
-
-inline DataLink::~DataLink() {}
-
-} // namespace pw::data_link
diff --git a/pw_data_link/public/pw_data_link/server_socket.h b/pw_data_link/public/pw_data_link/server_socket.h
deleted file mode 100644
index 5595b1f..0000000
--- a/pw_data_link/public/pw_data_link/server_socket.h
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2023 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include <cstdint>
-
-#include "pw_assert/assert.h"
-#include "pw_result/result.h"
-#include "pw_status/status.h"
-
-namespace pw::data_link {
-
-/// `ServerSocket` wraps a POSIX-style server socket, producing a `SocketStream`
-/// for each accepted client connection.
-///
-/// Call `Listen` to create the socket and start listening for connections.
-/// Then call `Accept` any number of times to accept client connections.
-class ServerSocket {
- public:
- explicit ServerSocket(int backlog) : backlog_(backlog) {
- PW_DASSERT(backlog > 0);
- }
- ~ServerSocket() { Close(); }
-
- ServerSocket(const ServerSocket& other) = delete;
- ServerSocket& operator=(const ServerSocket& other) = delete;
-
- // Listen for connections on the given port.
- // If port is 0, a random unused port is chosen and can be retrieved with
- // port().
- Status Listen(uint16_t port = 0);
-
- // Accepts a connection. Blocks until after a client is connected.
- Result<int> Accept();
-
- // Close the server socket, preventing further connections.
- void Close();
-
- // Returns the port this socket is listening on.
- uint16_t port() const { return port_; }
-
- private:
- static constexpr int kInvalidFd = -1;
-
- uint16_t port_ = -1;
- int socket_fd_ = kInvalidFd;
- int backlog_ = 0;
-};
-
-} // namespace pw::data_link
diff --git a/pw_data_link/public/pw_data_link/socket_data_link.h b/pw_data_link/public/pw_data_link/socket_data_link.h
deleted file mode 100644
index 4331782..0000000
--- a/pw_data_link/public/pw_data_link/socket_data_link.h
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright 2023 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#pragma once
-
-#include <netdb.h>
-#include <sys/epoll.h>
-
-#include <array>
-#include <cstdint>
-#include <optional>
-
-#include "pw_allocator/allocator.h"
-#include "pw_bytes/span.h"
-#include "pw_data_link/data_link.h"
-#include "pw_multibuf/multibuf.h"
-#include "pw_status/status.h"
-#include "pw_string/to_string.h"
-#include "pw_sync/lock_annotations.h"
-#include "pw_sync/mutex.h"
-
-namespace pw::data_link {
-
-class SocketDataLink : public DataLink {
- public:
- SocketDataLink(const char* host, uint16_t port);
- SocketDataLink(int connection_fd,
- EventHandlerCallback&& event_handler,
- allocator::Allocator& write_buffer_allocator);
-
- ~SocketDataLink() override PW_LOCKS_EXCLUDED(lock_);
-
- // Waits for link state changes or events.
- void WaitAndConsumeEvents() PW_LOCKS_EXCLUDED(lock_);
-
- void Open(EventHandlerCallback&& event_handler,
- allocator::Allocator& write_buffer_allocator) override
- PW_LOCKS_EXCLUDED(lock_);
- void Close() override PW_LOCKS_EXCLUDED(lock_);
-
- std::optional<multibuf::MultiBuf> GetWriteBuffer(size_t size) override;
- Status Write(multibuf::MultiBuf&& buffer) override;
- Status Read(ByteSpan buffer) override;
-
- private:
- static constexpr size_t kMtu = 1024;
- static constexpr int kInvalidFd = -1;
-
- enum class LinkState {
- kOpen,
- kOpenRequest,
- kWaitingForOpen,
- kClosed,
- } link_state_ PW_GUARDED_BY(lock_) = LinkState::kClosed;
-
- enum class ReadState {
- kIdle,
- kReadRequested,
- kClosed,
- } read_state_ PW_GUARDED_BY(read_lock_) = ReadState::kClosed;
-
- enum class WriteState {
- kIdle,
- kPending, // Write operation will occur.
- kClosed,
- } write_state_ PW_GUARDED_BY(write_lock_) = WriteState::kClosed;
-
- void set_link_state(LinkState new_state) PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
-
- void HandleOpenFailure(addrinfo* info) PW_UNLOCK_FUNCTION(lock_);
- bool ConfigureEpoll() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
-
- void DoOpen() PW_UNLOCK_FUNCTION(lock_)
- PW_LOCKS_EXCLUDED(read_lock_, write_lock_);
- void DoClose(bool notify_closed) PW_UNLOCK_FUNCTION(lock_)
- PW_LOCKS_EXCLUDED(read_lock_, write_lock_);
- void DoRead() PW_LOCKS_EXCLUDED(lock_) PW_UNLOCK_FUNCTION(read_lock_);
- void DoWrite() PW_LOCKS_EXCLUDED(lock_) PW_UNLOCK_FUNCTION(write_lock_);
-
- const char* host_;
- char port_[6];
- int connection_fd_ PW_GUARDED_BY(lock_) = kInvalidFd;
- int epoll_fd_ PW_GUARDED_BY(lock_) = kInvalidFd;
- epoll_event epoll_event_;
-
- allocator::Allocator* write_buffer_allocator_;
- multibuf::MultiBuf tx_multibuf_ PW_GUARDED_BY(write_lock_);
- size_t num_bytes_to_send_ = 0;
- size_t num_bytes_sent_ = 0;
- ByteSpan rx_buffer_ PW_GUARDED_BY(read_lock_);
- EventHandlerCallback event_handler_;
-
- // These internal locks must not be acquired when the event handler is called.
- // The main lock cannot be held when acquiring either the read or write locks.
- // However, the main lock can be acquired when one of the read or write locks
- // is held.
- // The read and write locks are held independently and should not overlap.
- sync::Mutex lock_; // Main lock.
- sync::Mutex read_lock_;
- sync::Mutex write_lock_;
-};
-
-} // namespace pw::data_link
diff --git a/pw_data_link/public/pw_data_link/socket_data_link_thread.h b/pw_data_link/public/pw_data_link/socket_data_link_thread.h
deleted file mode 100644
index 7a31fd8..0000000
--- a/pw_data_link/public/pw_data_link/socket_data_link_thread.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#pragma once
-
-#include <algorithm>
-#include <array>
-#include <mutex>
-
-#include "pw_data_link/socket_data_link.h"
-#include "pw_status/status.h"
-#include "pw_sync/lock_annotations.h"
-#include "pw_sync/mutex.h"
-#include "pw_sync/thread_notification.h"
-#include "pw_thread/thread_core.h"
-#include "pw_thread/yield.h"
-
-namespace pw::data_link {
-
-class SocketDataLinkThread : public pw::thread::ThreadCore {
- public:
- explicit SocketDataLinkThread(span<SocketDataLink*> active_links)
- : active_links_(active_links) {}
-
- pw::Status RegisterLink(SocketDataLink& link) PW_LOCKS_EXCLUDED(lock_) {
- std::lock_guard lock(lock_);
- auto ptr = std::find(active_links_.begin(), active_links_.end(), nullptr);
- if (ptr == active_links_.end()) {
- return pw::Status::ResourceExhausted();
- }
- *ptr = &link;
- return pw::OkStatus();
- }
-
- pw::Status UnregisterLink(SocketDataLink& link) PW_LOCKS_EXCLUDED(lock_) {
- std::lock_guard lock(lock_);
- auto ptr = std::find(active_links_.begin(), active_links_.end(), &link);
- if (ptr == active_links_.end()) {
- return pw::Status::NotFound();
- }
- *ptr = nullptr;
- return pw::OkStatus();
- }
-
- void Run() override PW_LOCKS_EXCLUDED(lock_) {
- run_ = true;
- while (run_) {
- std::lock_guard lock(lock_);
- for (SocketDataLink* link : active_links_) {
- if (link != nullptr) {
- link->WaitAndConsumeEvents();
- }
- }
- this_thread::yield();
- }
- }
-
- void Stop() { run_ = false; }
-
- private:
- pw::sync::Mutex lock_;
- span<SocketDataLink*> active_links_ PW_GUARDED_BY(lock_);
- bool run_ = false;
-};
-
-template <size_t max_links>
-class SocketDataLinkThreadWithContainer : public SocketDataLinkThread {
- public:
- SocketDataLinkThreadWithContainer()
- : SocketDataLinkThread(active_links_), active_links_{} {}
-
- private:
- std::array<SocketDataLink*, max_links> active_links_;
-};
-
-} // namespace pw::data_link
diff --git a/pw_data_link/sample_app.cc b/pw_data_link/sample_app.cc
deleted file mode 100644
index 6f74900..0000000
--- a/pw_data_link/sample_app.cc
+++ /dev/null
@@ -1,302 +0,0 @@
-#include <unistd.h>
-
-#include <array>
-#include <chrono>
-#include <cstddef>
-#include <cstdlib>
-#include <cstring>
-#include <iostream>
-#include <memory>
-#include <optional>
-#include <thread>
-#include <utility>
-
-#include "pw_log/levels.h"
-#define PW_LOG_LEVEL PW_LOG_LEVEL_INFO
-
-#include "pw_allocator/block_allocator.h"
-#include "pw_assert/check.h"
-#include "pw_data_link/data_link.h"
-#include "pw_data_link/server_socket.h"
-#include "pw_data_link/socket_data_link.h"
-#include "pw_data_link/socket_data_link_thread.h"
-#include "pw_log/log.h"
-#include "pw_result/result.h"
-#include "pw_status/status.h"
-#include "pw_sync/thread_notification.h"
-#include "pw_thread/detached_thread.h"
-#include "pw_thread/thread_core.h"
-#include "pw_thread_stl/options.h"
-
-namespace {
-
-const char* kLocalHost = "localhost";
-constexpr int kDefaultPort = 33001;
-
-constexpr size_t kReadBufferSize = 1024;
-constexpr size_t kWriteBufferSize = 1024;
-constexpr size_t kAllocatorSize = 2 * kWriteBufferSize;
-
-struct LinkSignals {
- std::atomic<bool> run = true;
- pw::sync::ThreadNotification ready_to_read;
- pw::sync::ThreadNotification data_read;
- pw::sync::ThreadNotification ready_to_write;
- // Note: the last_status variable is not thread-safe. It is set in the link
- // event callback, called in the link worker thread, and read in the user
- // reader or writer threads.
- std::atomic<pw::StatusWithSize> last_status = pw::StatusWithSize(0);
-};
-
-class LinkThread : public pw::thread::ThreadCore {
- public:
- LinkThread(std::shared_ptr<pw::data_link::SocketDataLink> link,
- std::shared_ptr<LinkSignals> link_signals)
- : link_(link), link_signals_(link_signals), bytes_transferred_(0) {}
-
- void Run() override {
- while (link_signals_->run) {
- Step();
- }
- end_time_ = std::chrono::high_resolution_clock::now();
- PW_LOG_INFO("LinkThread stopped");
- }
-
- // Thread must be stopped to avoid data races.
- size_t bytes_transferred() const { return bytes_transferred_; }
- std::chrono::duration<int> transfer_time() const {
- return std::chrono::duration_cast<std::chrono::seconds>(end_time_ -
- start_time_);
- }
-
- void Stop() {
- link_signals_->run = false;
- link_signals_->ready_to_write.release();
- link_signals_->ready_to_read.release();
- }
-
- protected:
- virtual void Step() = 0;
-
- std::shared_ptr<pw::data_link::SocketDataLink> link_;
- std::shared_ptr<LinkSignals> link_signals_;
- size_t bytes_transferred_ = 0;
- bool start_time_set_ = false;
- std::chrono::steady_clock::time_point start_time_;
- std::chrono::steady_clock::time_point end_time_;
-};
-
-class Reader : public LinkThread {
- public:
- Reader(std::shared_ptr<pw::data_link::SocketDataLink> link,
- std::shared_ptr<LinkSignals> link_signals)
- : LinkThread(link, link_signals), buffer_() {}
-
- void Step() override {
- PW_LOG_DEBUG("Waiting to read");
- link_signals_->ready_to_read.acquire();
- if (!link_signals_->run) {
- return;
- }
- PW_LOG_DEBUG("Reading");
- if (const pw::Status status = link_->Read(buffer_); !status.ok()) {
- PW_LOG_ERROR("Failed to read. Error: %s", status.str());
- return;
- }
- if (!start_time_set_) {
- start_time_set_ = true;
- start_time_ = std::chrono::high_resolution_clock::now();
- }
- PW_LOG_DEBUG("Waiting for read to be done");
- link_signals_->data_read.acquire();
- const pw::StatusWithSize status = link_signals_->last_status;
- PW_LOG_DEBUG("Read returned %s (%d bytes)",
- status.status().str(),
- static_cast<int>(status.size()));
- if (status.ok()) {
- bytes_transferred_ += status.size();
- // PW_LOG_DEBUG("%s", reinterpret_cast<char*>(buffer_.data()));
- }
- }
-
- private:
- std::array<std::byte, kReadBufferSize> buffer_{};
-};
-
-class Writer : public LinkThread {
- public:
- Writer(std::shared_ptr<pw::data_link::SocketDataLink> link,
- std::shared_ptr<LinkSignals> link_signals)
- : LinkThread(link, link_signals) {}
-
- void Step() override {
- PW_LOG_DEBUG("Waiting to write");
- link_signals_->ready_to_write.acquire();
- if (!link_signals_->run) {
- return;
- }
- PW_LOG_DEBUG("Waiting for write buffer");
- std::optional<pw::multibuf::MultiBuf> buffer =
- link_->GetWriteBuffer(kWriteBufferSize);
- if (!buffer.has_value()) {
- return;
- }
- if (!start_time_set_) {
- start_time_set_ = true;
- start_time_ = std::chrono::high_resolution_clock::now();
- }
- for (auto& chunk_byte : *buffer) {
- chunk_byte = std::byte('C');
- }
- PW_LOG_DEBUG("Writing");
- const size_t bytes_written = buffer->size();
- const pw::Status status = link_->Write(std::move(*buffer));
- if (status.ok()) {
- bytes_transferred_ += bytes_written;
- } else {
- PW_LOG_ERROR("Write failed. Error: %s", status.str());
- }
- }
-};
-
-} // namespace
-
-void print_help_menu() {
- std::cout << "Data Link sample app." << std::endl << std::endl;
- std::cout << "Use --server to serve a socket." << std::endl;
- std::cout << "Use --port <NUMBER> to:" << std::endl;
- std::cout << " - serve a socket on the given port when --server is set, or"
- << std::endl;
- std::cout << " - connect to a socket on the given port." << std::endl;
- std::cout << " Defaults to port " << kDefaultPort << "." << std::endl;
- std::cout << "Use --reader to make the link's role read only." << std::endl;
- std::cout << " Defaults to writer only role." << std::endl;
- std::cout << "Use -h to print this menu and exit." << std::endl;
-}
-
-int main(int argc, char** argv) {
- constexpr size_t kMaxLinks = 1;
- bool is_reader = false;
- bool is_server = false;
- int port = kDefaultPort;
- const std::chrono::duration<int> test_time = std::chrono::seconds(10);
-
- for (int i = 1; i < argc; i++) {
- if (strcmp(argv[i], "--port") == 0 && i < argc - 1) {
- port = std::atoi(argv[++i]);
- } else if (strcmp(argv[i], "--server") == 0) {
- is_server = true;
- } else if (strcmp(argv[i], "--reader") == 0) {
- is_reader = true;
- } else if (strcmp(argv[i], "-h") == 0) {
- print_help_menu();
- exit(0);
- } else {
- PW_LOG_ERROR("Invalid argument '%s'", argv[i]);
- print_help_menu();
- exit(-1);
- }
- }
-
- PW_LOG_INFO("Started");
-
- std::shared_ptr<LinkSignals> link_signals = std::make_shared<LinkSignals>();
- auto event_callback = [&link_signals](pw::data_link::DataLink::Event event,
- pw::StatusWithSize status) {
- link_signals->last_status = status;
- switch (event) {
- case pw::data_link::DataLink::Event::kOpen: {
- if (!status.ok()) {
- PW_LOG_ERROR("Link failed to open: %s", status.status().str());
- link_signals->run = false;
- } else {
- PW_LOG_DEBUG("Link open");
- }
- link_signals->ready_to_write.release();
- link_signals->ready_to_read.release();
- } break;
- case pw::data_link::DataLink::Event::kClosed:
- link_signals->run = false;
- link_signals->ready_to_read.release();
- link_signals->ready_to_write.release();
- break;
- case pw::data_link::DataLink::Event::kDataReceived:
- link_signals->ready_to_read.release();
- break;
- case pw::data_link::DataLink::Event::kDataRead:
- link_signals->data_read.release();
- break;
- case pw::data_link::DataLink::Event::kDataSent:
- link_signals->ready_to_write.release();
- break;
- }
- };
-
- std::shared_ptr<pw::data_link::SocketDataLink> link;
- pw::allocator::FirstFitBlockAllocator<uint32_t> link_buffer_allocator{};
- std::array<std::byte, kAllocatorSize> allocator_storage{};
- PW_CHECK_OK(link_buffer_allocator.Init(allocator_storage));
- if (is_server) {
- PW_LOG_INFO("Serving on port %d", static_cast<int>(port));
- pw::data_link::ServerSocket server{kMaxLinks};
- PW_CHECK_OK(server.Listen(port));
-
- PW_LOG_INFO("Waiting for connection");
- const pw::Result<int> connection_fd = server.Accept();
- PW_CHECK_OK(connection_fd.status());
-
- PW_LOG_INFO("New Connection! Creating Link");
- link = std::make_shared<pw::data_link::SocketDataLink>(
- *connection_fd, std::move(event_callback), link_buffer_allocator);
- } else {
- PW_LOG_INFO("Openning Link");
- link = std::make_shared<pw::data_link::SocketDataLink>(kLocalHost, port);
- link->Open(std::move(event_callback), link_buffer_allocator);
- }
-
- pw::data_link::SocketDataLinkThreadWithContainer<kMaxLinks> links_thread{};
- PW_CHECK_OK(links_thread.RegisterLink(*link));
-
- PW_LOG_INFO("Starting links thread");
- pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);
-
- std::shared_ptr<LinkThread> link_thread;
- if (is_reader) {
- PW_LOG_INFO("Starting reader thread");
- auto reader_thread = std::make_shared<Reader>(link, link_signals);
- link_thread = reader_thread;
- pw::thread::DetachedThread(pw::thread::stl::Options(), *reader_thread);
- } else {
- PW_LOG_INFO("Starting writer thread");
- auto writer_thread = std::make_shared<Writer>(link, link_signals);
- link_thread = writer_thread;
- pw::thread::DetachedThread(pw::thread::stl::Options(), *writer_thread);
- }
-
- if (link_signals->run) {
- PW_LOG_INFO("Running for %lld seconds",
- std::chrono::seconds(test_time).count());
- for (int i = 0;
- link_signals->run && i < std::chrono::seconds(test_time).count();
- ++i) {
- sleep(1);
- }
- PW_LOG_INFO("Stopping link's work");
- link_thread->Stop();
- }
-
- // Wait for link thread to stop.
- // TODO(cachinchilla): Figure out how to join these threads properly.
- sleep(3);
-
- PW_LOG_INFO("Link transferred %d bytes in %lld seconds",
- static_cast<int>(link_thread->bytes_transferred()),
- std::chrono::seconds(link_thread->transfer_time()).count());
-
- PW_LOG_INFO("Cleaning up");
- links_thread.UnregisterLink(*link);
- PW_LOG_INFO("Stopping links thread");
- links_thread.Stop();
- PW_LOG_INFO("Terminating");
- return 0;
-}
diff --git a/pw_data_link/server_socket.cc b/pw_data_link/server_socket.cc
deleted file mode 100644
index a5ccb90..0000000
--- a/pw_data_link/server_socket.cc
+++ /dev/null
@@ -1,84 +0,0 @@
-#include "pw_data_link/server_socket.h"
-
-#include <cstdint>
-
-#if defined(_WIN32) && _WIN32
-// TODO(cachinchilla): add support for windows.
-#error Windows not supported yet!
-#else
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#endif // defined(_WIN32) && _WIN32
-
-#include "pw_status/status.h"
-
-namespace pw::data_link {
-
-// Listen for connections on the given port.
-// If port is 0, a random unused port is chosen and can be retrieved with
-// port().
-Status ServerSocket::Listen(uint16_t port) {
- socket_fd_ = socket(AF_INET6, SOCK_STREAM, 0);
- if (socket_fd_ == kInvalidFd) {
- return Status::Unknown();
- }
-
- // Allow binding to an address that may still be in use by a closed socket.
- constexpr int value = 1;
- setsockopt(socket_fd_,
- SOL_SOCKET,
- SO_REUSEADDR,
- reinterpret_cast<const char*>(&value),
- sizeof(int));
-
- if (port != 0) {
- struct sockaddr_in6 addr = {};
- socklen_t addr_len = sizeof(addr);
- addr.sin6_family = AF_INET6;
- addr.sin6_port = htons(port);
- addr.sin6_addr = in6addr_any;
- if (bind(socket_fd_, reinterpret_cast<sockaddr*>(&addr), addr_len) < 0) {
- return Status::Unknown();
- }
- }
-
- if (listen(socket_fd_, backlog_) < 0) {
- return Status::Unknown();
- }
-
- // Find out which port the socket is listening on, and fill in port_.
- struct sockaddr_in6 addr = {};
- socklen_t addr_len = sizeof(addr);
- if (getsockname(socket_fd_, reinterpret_cast<sockaddr*>(&addr), &addr_len) <
- 0 ||
- static_cast<size_t>(addr_len) > sizeof(addr)) {
- close(socket_fd_);
- return Status::Unknown();
- }
-
- port_ = ntohs(addr.sin6_port);
-
- return OkStatus();
-}
-
-Result<int> ServerSocket::Accept() {
- struct sockaddr_in6 sockaddr_client_ = {};
- socklen_t len = sizeof(sockaddr_client_);
- const int connection_fd =
- accept(socket_fd_, reinterpret_cast<sockaddr*>(&sockaddr_client_), &len);
- if (connection_fd == kInvalidFd) {
- return Status::Unknown();
- }
- return connection_fd;
-}
-
-// Close the server socket, preventing further connections.
-void ServerSocket::Close() {
- if (socket_fd_ != kInvalidFd) {
- close(socket_fd_);
- socket_fd_ = kInvalidFd;
- }
-}
-
-} // namespace pw::data_link
diff --git a/pw_data_link/socket_data_link.cc b/pw_data_link/socket_data_link.cc
deleted file mode 100644
index e4fe726..0000000
--- a/pw_data_link/socket_data_link.cc
+++ /dev/null
@@ -1,525 +0,0 @@
-// Copyright 2023 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-#include "pw_data_link/socket_data_link.h"
-
-#if defined(_WIN32) && _WIN32
-// TODO(cachinchilla): add support for windows.
-#error Windows not supported yet!
-#else
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <sys/epoll.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-#endif // defined(_WIN32) && _WIN32
-
-#include <chrono>
-#include <optional>
-
-#include "pw_allocator/allocator.h"
-#include "pw_assert/check.h"
-#include "pw_bytes/span.h"
-#include "pw_log/log.h"
-#include "pw_multibuf/header_chunk_region_tracker.h"
-#include "pw_status/status.h"
-#include "pw_status/status_with_size.h"
-
-namespace pw::data_link {
-namespace {
-
-const char* kLinkStateNameOpen = "Open";
-const char* kLinkStateNameOpenRequest = "Open Request";
-const char* kLinkStateNameClosed = "Closed";
-const char* kLinkStateNameWaitingForOpen = "Waiting For Open";
-const char* kLinkStateNameUnknown = "Unknown";
-const auto kEpollTimeout = std::chrono::seconds(1);
-
-// Configures the file descriptor as non blocking. Returns true when successful.
-bool MakeSocketNonBlocking(int fd) {
- int flags = fcntl(fd, F_GETFL, 0);
- if (flags == -1) {
- return false;
- }
- flags += O_NONBLOCK;
- if (fcntl(fd, F_SETFL, flags) == -1) {
- PW_LOG_ERROR("Failed to create a socket: %s", std::strerror(errno));
- return false;
- }
- return true;
-}
-
-} // namespace
-
-SocketDataLink::SocketDataLink(const char* host, uint16_t port) : host_(host) {
- PW_CHECK(host_ != nullptr);
- PW_CHECK(ToString(port, port_).ok());
-}
-
-SocketDataLink::SocketDataLink(int connection_fd,
- EventHandlerCallback&& event_handler,
- allocator::Allocator& write_buffer_allocator)
- : connection_fd_(connection_fd),
- write_buffer_allocator_(&write_buffer_allocator) {
- PW_DCHECK(connection_fd > 0);
- PW_CHECK(MakeSocketNonBlocking(connection_fd_));
- PW_CHECK(ConfigureEpoll());
- event_handler_ = std::move(event_handler);
- set_link_state(LinkState::kOpen);
- write_state_ = WriteState::kIdle;
- read_state_ = ReadState::kIdle;
- event_handler_(DataLink::Event::kOpen, StatusWithSize());
-}
-
-SocketDataLink::~SocketDataLink() {
- lock_.lock();
- if (link_state_ != LinkState::kClosed) {
- DoClose(/*notify_closed=*/true);
- return;
- }
- lock_.unlock();
-}
-
-void SocketDataLink::set_link_state(LinkState new_state) {
- const char* link_name;
- switch (link_state_) {
- case LinkState::kOpen:
- link_name = kLinkStateNameOpen;
- break;
- case LinkState::kOpenRequest:
- link_name = kLinkStateNameOpenRequest;
- break;
- case LinkState::kClosed:
- link_name = kLinkStateNameClosed;
- break;
- case LinkState::kWaitingForOpen:
- link_name = kLinkStateNameWaitingForOpen;
- break;
- default:
- link_name = kLinkStateNameUnknown;
- };
-
- const char* new_link_name;
- switch (new_state) {
- case LinkState::kOpen:
- new_link_name = kLinkStateNameOpen;
- break;
- case LinkState::kOpenRequest:
- new_link_name = kLinkStateNameOpenRequest;
- break;
- case LinkState::kClosed:
- new_link_name = kLinkStateNameClosed;
- break;
- case LinkState::kWaitingForOpen:
- new_link_name = kLinkStateNameWaitingForOpen;
- break;
- default:
- new_link_name = kLinkStateNameUnknown;
- };
- PW_LOG_DEBUG("Transitioning from %s to %s", link_name, new_link_name);
- link_state_ = new_state;
-}
-
-void SocketDataLink::Open(EventHandlerCallback&& event_handler,
- allocator::Allocator& write_buffer_allocator) {
- std::lock_guard lock(lock_);
- PW_CHECK(link_state_ == LinkState::kClosed);
-
- write_buffer_allocator_ = &write_buffer_allocator;
- event_handler_ = std::move(event_handler);
- set_link_state(LinkState::kOpenRequest);
-}
-
-void SocketDataLink::WaitAndConsumeEvents() {
- // Manually lock and unlock, since some functions may perform unlocking before
- // calling the user's event callback, when locks cannot be held.
-
- lock_.lock();
- switch (link_state_) {
- case LinkState::kOpen:
- break;
- case LinkState::kWaitingForOpen: {
- // Copy the epoll file descriptor to reduce the critical section.
- const int fd = epoll_fd_;
- lock_.unlock();
- epoll_event event;
- const int count = epoll_wait(
- fd, &event, 1, std::chrono::milliseconds(kEpollTimeout).count());
- if (count == 0) {
- return;
- }
- if (event.events & EPOLLERR || event.events & EPOLLHUP) {
- lock_.lock();
- // Check if link was closed on another thread.
- if (link_state_ != LinkState::kClosed) {
- DoClose(/*notify_closed=*/false);
- } else {
- lock_.unlock();
- }
- event_handler_(DataLink::Event::kOpen, StatusWithSize::Unknown());
- return;
- }
- if (event.events == EPOLLOUT) {
- {
- std::lock_guard lock(lock_);
- set_link_state(LinkState::kOpen);
- }
- {
- std::lock_guard lock(write_lock_);
- write_state_ = WriteState::kIdle;
- }
- {
- std::lock_guard lock(read_lock_);
- read_state_ = ReadState::kIdle;
- }
- event_handler_(DataLink::Event::kOpen, StatusWithSize());
- return;
- }
- PW_LOG_ERROR("Unhandled event %d while waiting to open link",
- static_cast<int>(event.events));
- }
- return;
- case LinkState::kClosed:
- lock_.unlock();
- return;
- case LinkState::kOpenRequest:
- DoOpen();
- return;
- }
- // Copy the epoll file descriptor to reduce the critical section.
- const int fd = epoll_fd_;
- lock_.unlock();
-
- epoll_event event;
- const int count = epoll_wait(
- fd, &event, 1, std::chrono::milliseconds(kEpollTimeout).count());
- if (count == 0) {
- return;
- }
-
- if (event.events & EPOLLERR || event.events & EPOLLHUP) {
- lock_.lock();
- // Check if link was closed on another thread.
- if (link_state_ != LinkState::kClosed) {
- DoClose(/*notify_closed=*/true);
- } else {
- lock_.unlock();
- }
- return;
- }
-
- if (event.events & EPOLLIN) {
- // Can read!
- read_lock_.lock();
- if (read_state_ == ReadState::kReadRequested) {
- DoRead();
- } else if (read_state_ == ReadState::kIdle) {
- read_lock_.unlock();
- event_handler_(DataLink::Event::kDataReceived, StatusWithSize());
- } else {
- read_lock_.unlock();
- }
- }
- if (event.events & EPOLLOUT) {
- // Can Write!
- write_lock_.lock();
- if (write_state_ == WriteState::kPending) {
- DoWrite();
- } else {
- write_lock_.unlock();
- }
- }
- if ((event.events & EPOLLIN) == 0 && (event.events & EPOLLOUT) == 0) {
- PW_LOG_WARN("Unhandled event %d", static_cast<int>(event.events));
- }
-}
-
-void SocketDataLink::DoOpen() {
- addrinfo hints = {};
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_NUMERICSERV;
- addrinfo* res;
- if (getaddrinfo(host_, port_, &hints, &res) != 0) {
- PW_LOG_ERROR("Failed to configure connection address for socket");
- HandleOpenFailure(/*info=*/nullptr);
- return;
- }
-
- addrinfo* rp;
- for (rp = res; rp != nullptr; rp = rp->ai_next) {
- PW_LOG_DEBUG("Opening socket");
- connection_fd_ = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
- if (connection_fd_ != kInvalidFd) {
- break;
- }
- }
- if (connection_fd_ == kInvalidFd) {
- HandleOpenFailure(res);
- return;
- }
- // Set necessary options on a socket file descriptor.
- PW_LOG_DEBUG("Configuring socket");
- if (!MakeSocketNonBlocking(connection_fd_)) {
- HandleOpenFailure(res);
- return;
- }
- PW_LOG_DEBUG("Connecting socket");
- if (connect(connection_fd_, rp->ai_addr, rp->ai_addrlen) == -1) {
- if (errno != EINPROGRESS) {
- HandleOpenFailure(res);
- return;
- }
- set_link_state(LinkState::kWaitingForOpen);
- } else {
- set_link_state(LinkState::kOpen);
- }
-
- PW_LOG_DEBUG("Configuring Epoll");
- if (!ConfigureEpoll()) {
- HandleOpenFailure(res);
- return;
- }
- const bool open_completed = link_state_ == LinkState::kOpen;
- lock_.unlock();
- freeaddrinfo(res);
- if (open_completed) {
- {
- std::lock_guard lock(write_lock_);
- write_state_ = WriteState::kIdle;
- }
- {
- std::lock_guard lock(read_lock_);
- read_state_ = ReadState::kIdle;
- }
- event_handler_(DataLink::Event::kOpen, StatusWithSize());
- }
-}
-
-void SocketDataLink::HandleOpenFailure(addrinfo* info) {
- if (connection_fd_ != kInvalidFd) {
- close(connection_fd_);
- connection_fd_ = kInvalidFd;
- }
- if (epoll_fd_ != kInvalidFd) {
- close(epoll_fd_);
- epoll_fd_ = kInvalidFd;
- }
- set_link_state(LinkState::kClosed);
- lock_.unlock();
- PW_LOG_ERROR(
- "Failed to connect to %s:%s: %s", host_, port_, std::strerror(errno));
- if (info != nullptr) {
- freeaddrinfo(info);
- }
- event_handler_(DataLink::Event::kOpen, StatusWithSize::Unknown());
-}
-
-bool SocketDataLink::ConfigureEpoll() {
- epoll_fd_ = epoll_create1(0);
- if (epoll_fd_ == kInvalidFd) {
- return false;
- }
- epoll_event_.events = EPOLLOUT | EPOLLIN;
- epoll_event_.data.fd = connection_fd_;
- if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, connection_fd_, &epoll_event_) ==
- -1) {
- return false;
- }
- return true;
-}
-
-void SocketDataLink::Close() {
- lock_.lock();
- PW_DCHECK(link_state_ != LinkState::kClosed);
- DoClose(/*notify_closed=*/true);
-}
-
-void SocketDataLink::DoClose(bool notify_closed) {
- set_link_state(LinkState::kClosed);
- // Copy file descriptors and unlock to minimize the critical section.
- const int connection_fd = connection_fd_;
- connection_fd_ = kInvalidFd;
- const int epoll_fd = epoll_fd_;
- epoll_fd_ = kInvalidFd;
- lock_.unlock();
- {
- std::lock_guard lock(write_lock_);
- write_state_ = WriteState::kClosed;
- }
- {
- std::lock_guard lock(read_lock_);
- read_state_ = ReadState::kClosed;
- }
-
- // Close file descriptors if valid.
- if (connection_fd != kInvalidFd) {
- close(connection_fd);
- }
- if (epoll_fd != kInvalidFd) {
- close(epoll_fd);
- }
- if (notify_closed) {
- event_handler_(DataLink::Event::kClosed, StatusWithSize());
- }
-}
-
-std::optional<multibuf::MultiBuf> SocketDataLink::GetWriteBuffer(size_t size) {
- if (size == 0) {
- return multibuf::MultiBuf();
- }
- {
- std::lock_guard lock(write_lock_);
- if (write_state_ != WriteState::kIdle) {
- return std::nullopt;
- }
- }
- multibuf::MultiBuf buffer;
- std::optional<multibuf::OwnedChunk> chunk =
- multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
- write_buffer_allocator_, size);
- if (!chunk) {
- return std::nullopt;
- }
- buffer.PushFrontChunk(std::move(*chunk));
- return buffer;
-}
-
-Status SocketDataLink::Write(multibuf::MultiBuf&& buffer) {
- if (buffer.size() == 0) {
- return Status::InvalidArgument();
- }
- std::lock_guard lock(write_lock_);
- if (write_state_ != WriteState::kIdle) {
- return Status::FailedPrecondition();
- }
-
- tx_multibuf_ = std::move(buffer);
- num_bytes_to_send_ = tx_multibuf_.size();
- num_bytes_sent_ = 0;
- write_state_ = WriteState::kPending;
- return OkStatus();
-}
-
-void SocketDataLink::DoWrite() {
- int send_flags = 0;
-#if defined(__linux__)
- // Use MSG_NOSIGNAL to avoid getting a SIGPIPE signal when the remote
- // peer drops the connection. This is supported on Linux only.
- send_flags |= MSG_NOSIGNAL;
-#endif // defined(__linux__)
-
- auto [chunk_iter, chunk] =
- tx_multibuf_.TakeChunk(tx_multibuf_.Chunks().begin());
- ssize_t bytes_sent;
- {
- std::lock_guard lock(lock_);
- // TODO: Send one chunk at a time.
- bytes_sent = send(connection_fd_,
- reinterpret_cast<const char*>(chunk.data()),
- chunk.size(),
- send_flags);
- }
-
- // Check for errors.
- if (bytes_sent < 0) {
- if (errno == EPIPE) {
- // An EPIPE indicates that the connection is closed.
- tx_multibuf_.Release();
- write_lock_.unlock();
- event_handler_(DataLink::Event::kDataSent, StatusWithSize::OutOfRange());
- Close();
- return;
- }
- tx_multibuf_.Release();
- write_lock_.unlock();
- event_handler_(DataLink::Event::kDataSent, StatusWithSize::Unknown());
- return;
- }
- num_bytes_sent_ += bytes_sent;
-
- // Resize chunk if it was a partial write.
- if (static_cast<size_t>(bytes_sent) < chunk.size()) {
- chunk->DiscardPrefix(static_cast<size_t>(bytes_sent));
- tx_multibuf_.PushFrontChunk(std::move(chunk));
- write_lock_.unlock();
- return;
- }
-
- // Check if we are done with the MultiBuf.
- if (num_bytes_sent_ >= num_bytes_to_send_) {
- write_state_ = WriteState::kIdle;
- tx_multibuf_.Release();
- write_lock_.unlock();
- event_handler_(DataLink::Event::kDataSent,
- StatusWithSize(num_bytes_to_send_));
- return;
- }
-
- write_lock_.unlock();
-}
-
-Status SocketDataLink::Read(ByteSpan buffer) {
- PW_DCHECK(buffer.size() > 0);
- std::lock_guard lock(read_lock_);
- if (read_state_ != ReadState::kIdle) {
- return Status::FailedPrecondition();
- }
- rx_buffer_ = buffer;
- read_state_ = ReadState::kReadRequested;
- return OkStatus();
-}
-
-void SocketDataLink::DoRead() {
- ssize_t bytes_rcvd;
- {
- std::lock_guard lock(lock_);
- bytes_rcvd = recv(connection_fd_,
- reinterpret_cast<char*>(rx_buffer_.data()),
- rx_buffer_.size_bytes(),
- 0);
- }
-
- if (bytes_rcvd == 0) {
- // Remote peer has closed the connection.
- read_state_ = ReadState::kClosed;
- read_lock_.unlock();
- event_handler_(DataLink::Event::kDataRead, StatusWithSize::Internal());
- Close();
- return;
- } else if (bytes_rcvd < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- // Socket timed out when trying to read.
- // This should only occur if SO_RCVTIMEO was configured to be nonzero, or
- // if the socket was opened with the O_NONBLOCK flag to prevent any
- // blocking when performing reads or writes.
- read_state_ = ReadState::kIdle;
- read_lock_.unlock();
- event_handler_(DataLink::Event::kDataRead,
- StatusWithSize::ResourceExhausted());
- return;
- }
- read_state_ = ReadState::kIdle;
- read_lock_.unlock();
- event_handler_(DataLink::Event::kDataRead, StatusWithSize::Unknown());
- return;
- }
- read_state_ = ReadState::kIdle;
- read_lock_.unlock();
- event_handler_(DataLink::Event::kDataRead, StatusWithSize(bytes_rcvd));
-}
-
-} // namespace pw::data_link
diff --git a/pw_data_link/socket_data_link_test.cc b/pw_data_link/socket_data_link_test.cc
deleted file mode 100644
index 29bfa9c..0000000
--- a/pw_data_link/socket_data_link_test.cc
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2023 The Pigweed Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not
-// use this file except in compliance with the License. You may obtain a copy of
-// the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-// License for the specific language governing permissions and limitations under
-// the License.
-
-#include "pw_data_link/socket_data_link.h"
-
-#include <array>
-#include <optional>
-
-#include "gtest/gtest.h"
-#include "pw_allocator/block_allocator.h"
-#include "pw_bytes/span.h"
-#include "pw_status/status.h"
-#include "pw_status/status_with_size.h"
-
-namespace pw::data_link {
-namespace {
-
-struct EventCallbackTracker {
- size_t call_count = 0;
- DataLink::Event last_event = DataLink::Event::kClosed;
- StatusWithSize last_event_status = StatusWithSize();
-};
-
-TEST(DataLinkTest, CompileTest) {
- SocketDataLink data_link{"localhost", 123};
- allocator::FirstFitBlockAllocator<uint32_t> write_buffer_allocator{};
- std::array<std::byte, 100> allocator_storage{};
- ASSERT_EQ(write_buffer_allocator.Init(allocator_storage), OkStatus());
- EventCallbackTracker callback_tracker{};
- data_link.Open(
- [&callback_tracker](DataLink::Event event, StatusWithSize status) {
- callback_tracker.last_event = event;
- callback_tracker.last_event_status = status;
- ++callback_tracker.call_count;
- },
- write_buffer_allocator);
-
- data_link.WaitAndConsumeEvents();
-
- ASSERT_EQ(callback_tracker.call_count, 0u);
- data_link.Close();
-}
-
-} // namespace
-} // namespace pw::data_link