#include <unistd.h>

#include <array>
#include <chrono>
#include <cstddef>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>
#include <optional>
#include <thread>

#include "pw_log/levels.h"
#define PW_LOG_LEVEL PW_LOG_LEVEL_INFO

#include "pw_assert/check.h"
#include "pw_data_link/data_link.h"
#include "pw_data_link/server_socket.h"
#include "pw_data_link/socket_data_link.h"
#include "pw_data_link/socket_data_link_thread.h"
#include "pw_log/log.h"
#include "pw_result/result.h"
#include "pw_status/status.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/detached_thread.h"
#include "pw_thread/thread_core.h"
#include "pw_thread_stl/options.h"

namespace {

const char* kLocalHost = "localhost";
constexpr int kDefaultPort = 33001;

struct LinkSignals {
  std::atomic<bool> run = true;
  pw::sync::ThreadNotification ready_to_read;
  pw::sync::ThreadNotification data_read;
  pw::sync::ThreadNotification ready_to_write;
  // Note: the last_status variable is not thread-safe. It is set in the link
  // event callback, called in the link worker thread, and read in the user
  // reader or writer threads.
  std::atomic<pw::StatusWithSize> last_status = pw::StatusWithSize(0);
};

class LinkThread : public pw::thread::ThreadCore {
 public:
  LinkThread(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
      : link_(link), link_signals_(link_signals), bytes_transferred_(0) {}

  void Run() override {
    while (link_signals_.run) {
      Step();
    }
    end_time_ = std::chrono::high_resolution_clock::now();
    PW_LOG_INFO("LinkThread stopped");
  }

  // Thread must be stopped to avoid data races.
  size_t bytes_transferred() const { return bytes_transferred_; }
  std::chrono::duration<int> transfer_time() const {
    return std::chrono::duration_cast<std::chrono::seconds>(end_time_ -
                                                            start_time_);
  }

  void Stop() {
    link_signals_.run = false;
    link_signals_.ready_to_write.release();
    link_signals_.ready_to_read.release();
  }

 protected:
  virtual void Step() = 0;

  pw::data_link::SocketDataLink& link_;
  LinkSignals& link_signals_;
  size_t bytes_transferred_ = 0;
  bool start_time_set_ = false;
  std::chrono::steady_clock::time_point start_time_;
  std::chrono::steady_clock::time_point end_time_;
};

class Reader : public LinkThread {
 public:
  Reader(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
      : LinkThread(link, link_signals) {}

  void Step() override {
    PW_LOG_DEBUG("Waiting to read");
    link_signals_.ready_to_read.acquire();
    if (!link_signals_.run) {
      return;
    }
    PW_LOG_DEBUG("Reading");
    if (const pw::Status status = link_.Read(buffer_); !status.ok()) {
      PW_LOG_ERROR("Failed to read. Error: %s", status.str());
      return;
    }
    if (!start_time_set_) {
      start_time_set_ = true;
      start_time_ = std::chrono::high_resolution_clock::now();
    }
    PW_LOG_DEBUG("Waiting for read to be done");
    link_signals_.data_read.acquire();
    const pw::StatusWithSize status = link_signals_.last_status;
    PW_LOG_DEBUG("Read returned %s (%d bytes)",
                 status.status().str(),
                 static_cast<int>(status.size()));
    if (status.ok()) {
      bytes_transferred_ += status.size();
      PW_LOG_DEBUG("%s", reinterpret_cast<char*>(buffer_.data()));
    }
  }

 private:
  std::array<std::byte, 1024> buffer_;
};

class Writer : public LinkThread {
 public:
  Writer(pw::data_link::SocketDataLink& link, LinkSignals& link_signals)
      : LinkThread(link, link_signals) {}

  void Step() override {
    PW_LOG_DEBUG("Waiting to write");
    link_signals_.ready_to_write.acquire();
    if (!link_signals_.run) {
      return;
    }
    PW_LOG_DEBUG("Waiting for write buffer");
    std::optional<pw::ByteSpan> buffer = link_.GetWriteBuffer();
    if (!buffer.has_value()) {
      return;
    }
    if (!start_time_set_) {
      start_time_set_ = true;
      start_time_ = std::chrono::high_resolution_clock::now();
    }
    for (size_t i = 0; i < buffer->size(); ++i) {
      buffer.value()[i] = std::byte('C');
    }
    buffer.value()[buffer->size() - 1] = std::byte(0);
    PW_LOG_DEBUG("Writing");
    const pw::Status status = link_.Write(*buffer);
    if (status.ok()) {
      bytes_transferred_ += buffer->size();
    } else {
      PW_LOG_ERROR("Write failed. Error: %s", status.str());
    }
  }
};

}  // namespace

void print_help_menu() {
  std::cout << "Data Link sample app." << std::endl << std::endl;
  std::cout << "Use --server to serve a socket." << std::endl;
  std::cout << "Use --port <NUMBER> to:" << std::endl;
  std::cout << "  - serve a socket on the given port when --server is set, or"
            << std::endl;
  std::cout << "  - connect to a socket on the given port." << std::endl;
  std::cout << "  Defaults to port " << kDefaultPort << "." << std::endl;
  std::cout << "Use --reader to make the link's role read only." << std::endl;
  std::cout << "  Defaults to writer only role." << std::endl;
  std::cout << "Use -h to print this menu and exit." << std::endl;
}

int main(int argc, char** argv) {
  constexpr size_t kMaxLinks = 1;
  bool is_reader = false;
  bool is_server = false;
  int port = kDefaultPort;
  const std::chrono::duration<int> test_time = std::chrono::seconds(10);

  for (int i = 1; i < argc; i++) {
    if (strcmp(argv[i], "--port") == 0 && i < argc - 1) {
      port = std::atoi(argv[++i]);
    } else if (strcmp(argv[i], "--server") == 0) {
      is_server = true;
    } else if (strcmp(argv[i], "--reader") == 0) {
      is_reader = true;
    } else if (strcmp(argv[i], "-h") == 0) {
      print_help_menu();
      exit(0);
    } else {
      PW_LOG_ERROR("Invalid argument '%s'", argv[i]);
      print_help_menu();
      exit(-1);
    }
  }

  PW_LOG_INFO("Started");

  LinkSignals link_signals{};
  auto event_callback = [&link_signals](pw::data_link::DataLink::Event event,
                                        pw::StatusWithSize status) {
    link_signals.last_status = status;
    switch (event) {
      case pw::data_link::DataLink::Event::kOpen: {
        if (!status.ok()) {
          PW_LOG_ERROR("Link failed to open: %s", status.status().str());
          link_signals.run = false;
        } else {
          PW_LOG_DEBUG("Link open");
        }
        link_signals.ready_to_write.release();
        link_signals.ready_to_read.release();
      } break;
      case pw::data_link::DataLink::Event::kClosed:
        link_signals.run = false;
        link_signals.ready_to_read.release();
        link_signals.ready_to_write.release();
        break;
      case pw::data_link::DataLink::Event::kDataReceived:
        link_signals.ready_to_read.release();
        break;
      case pw::data_link::DataLink::Event::kDataRead:
        link_signals.data_read.release();
        break;
      case pw::data_link::DataLink::Event::kDataSent:
        link_signals.ready_to_write.release();
        break;
    }
  };

  std::unique_ptr<pw::data_link::SocketDataLink> link;
  if (is_server) {
    PW_LOG_INFO("Serving on port %d", static_cast<int>(port));
    pw::data_link::ServerSocket server{kMaxLinks};
    PW_CHECK_OK(server.Listen(port));

    PW_LOG_INFO("Waiting for connection");
    const pw::Result<int> connection_fd = server.Accept();
    PW_CHECK_OK(connection_fd.status());

    PW_LOG_INFO("New Connection! Creating Link");
    link = std::make_unique<pw::data_link::SocketDataLink>(*connection_fd,
                                                           event_callback);
  } else {
    PW_LOG_INFO("Openning Link");
    link = std::make_unique<pw::data_link::SocketDataLink>(kLocalHost, port);
    link->Open(event_callback);
  }

  pw::data_link::SocketDataLinkThreadWithContainer<kMaxLinks> links_thread{};
  PW_CHECK_OK(links_thread.RegisterLink(*link));

  PW_LOG_INFO("Starting links thread");
  pw::thread::DetachedThread(pw::thread::stl::Options(), links_thread);

  LinkThread* link_thread = nullptr;
  if (is_reader) {
    PW_LOG_INFO("Starting reader thread");
    Reader reader_thread{*link, link_signals};
    link_thread = &reader_thread;
    pw::thread::DetachedThread(pw::thread::stl::Options(), reader_thread);
  } else {
    PW_LOG_INFO("Starting writer thread");
    Writer writer_thread{*link, link_signals};
    link_thread = &writer_thread;
    pw::thread::DetachedThread(pw::thread::stl::Options(), writer_thread);
  }

  if (link_signals.run) {
    PW_LOG_INFO("Running for %lld seconds",
                std::chrono::seconds(test_time).count());
    for (int i = 0;
         link_signals.run && i < std::chrono::seconds(test_time).count();
         ++i) {
      sleep(1);
    }
    PW_LOG_INFO("Stopping link's work");
    link_thread->Stop();
  }

  // Wait for link thread to stop.
  // TODO(cachinchilla): Figure out how to join these threads properly.
  sleep(3);

  PW_LOG_INFO("Link transferred %d bytes in %lld seconds",
              static_cast<int>(link_thread->bytes_transferred()),
              std::chrono::seconds(link_thread->transfer_time()).count());

  PW_LOG_INFO("Cleaning up");
  links_thread.UnregisterLink(*link);
  PW_LOG_INFO("Stopping links thread");
  links_thread.Stop();
  PW_LOG_INFO("Terminating");
  return 0;
}
