// Copyright 2022 The Centipede Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "./centipede/command.h"

#include <fcntl.h>
#include <sys/poll.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <csignal>
#include <cstdlib>
#include <filesystem>  // NOLINT
#include <string>
#include <string_view>
#include <system_error>  // NOLINT

#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_replace.h"
#include "absl/strings/str_split.h"
#include "absl/time/clock.h"
#include "./centipede/logging.h"
#include "./centipede/util.h"

namespace centipede {

// See the definition of --fork_server flag.
inline constexpr std::string_view kCommandLineSeparator(" \\\n");
inline constexpr std::string_view kNoForkServerRequestPrefix("%f");

// TODO(ussuri): Encapsulate as much of the fork server functionality from
//  this source as possible in this struct, and make it a class.
struct Command::ForkServerProps {
  // The file paths of the comms pipes.
  std::string fifo_path_[2];
  // The file descriptors of the comms pipes.
  int pipe_[2] = {-1, -1};
  // The file path to write the PID of the fork server process to.
  std::string pid_file_path_;
  // The PID of the fork server process. Used to verify that the fork server is
  // running and the pipes are ready for comms.
  pid_t pid_ = -1;
  // A `stat` of the fork server's binary right after it's started. Used to
  // detect that the running process with `pid_` is still the original fork
  // server, not a PID recycled by the OS.
  struct stat exe_stat_ = {};

  ~ForkServerProps() {
    for (int i = 0; i < 2; ++i) {
      if (pipe_[i] >= 0 && close(pipe_[i]) != 0) {
        LOG(ERROR) << "Failed to close fork server pipe for " << fifo_path_[i];
      }
      std::error_code ec;
      if (!fifo_path_[i].empty() &&
          !std::filesystem::remove(fifo_path_[i], ec)) {
        LOG(ERROR) << "Failed to remove fork server pipe file " << fifo_path_[i]
                   << ": " << ec;
      }
    }
  }
};

// NOTE: Because std::unique_ptr<T> requires T to be a complete type wherever
// the deleter is instantiated, the special member functions must be defined
// out-of-line here, now that ForkServerProps is complete (that's by-the-book
// PIMPL).
Command::Command(Command &&other) noexcept = default;
Command::~Command() = default;

Command::Command(std::string_view path, std::vector<std::string> args,
                 std::vector<std::string> env, std::string_view out,
                 std::string_view err, absl::Duration timeout,
                 std::string_view temp_file_path)
    : path_(path),
      args_(std::move(args)),
      env_(std::move(env)),
      out_(out),
      err_(err),
      timeout_(timeout),
      temp_file_path_(temp_file_path) {}

std::string Command::ToString() const {
  std::vector<std::string> ss;
  // env.
  ss.reserve(env_.size());
  for (const auto &env : env_) {
    ss.emplace_back(env);
  }
  // path.
  std::string path = path_;
  // Strip the % prefixes, if any.
  if (absl::StartsWith(path, kNoForkServerRequestPrefix)) {
    path = path.substr(kNoForkServerRequestPrefix.size());
  }
  // Replace @@ with temp_file_path_.
  constexpr std::string_view kTempFileWildCard = "@@";
  if (absl::StrContains(path, kTempFileWildCard)) {
    CHECK(!temp_file_path_.empty());
    path = absl::StrReplaceAll(path, {{kTempFileWildCard, temp_file_path_}});
  }
  ss.emplace_back(path);
  // args.
  for (const auto &arg : args_) {
    ss.emplace_back(arg);
  }
  // out/err.
  if (!out_.empty()) {
    ss.emplace_back(absl::StrCat("> ", out_));
  }
  if (!err_.empty()) {
    if (out_ != err_) {
      ss.emplace_back(absl::StrCat("2> ", err_));
    } else {
      ss.emplace_back("2>&1");
    }
  }
  // Trim trailing space and return.
  return absl::StrJoin(ss, kCommandLineSeparator);
}

bool Command::StartForkServer(std::string_view temp_dir_path,
                              std::string_view prefix) {
  if (absl::StartsWith(path_, kNoForkServerRequestPrefix)) {
    VLOG(2) << "Fork server disabled for " << path();
    return false;
  }
  VLOG(2) << "Starting fork server for " << path();

  fork_server_.reset(new ForkServerProps);
  fork_server_->fifo_path_[0] = std::filesystem::path(temp_dir_path)
                                    .append(absl::StrCat(prefix, "_FIFO0"));
  fork_server_->fifo_path_[1] = std::filesystem::path(temp_dir_path)
                                    .append(absl::StrCat(prefix, "_FIFO1"));
  const std::string pid_file_path =
      std::filesystem::path(temp_dir_path).append("pid");
  (void)std::filesystem::create_directory(temp_dir_path);  // it may not exist.
  for (int i = 0; i < 2; ++i) {
    PCHECK(mkfifo(fork_server_->fifo_path_[i].c_str(), 0600) == 0)
        << VV(i) << VV(fork_server_->fifo_path_[i]);
  }

  // NOTE: A background process does not return its exit status to the subshell,
  // so failures will never propagate to the caller of `system()`. Instead, we
  // save out the background process's PID to a file and use it later to assert
  // that the process has started and is still running.
  static constexpr std::string_view kForkServerCommandStub = R"sh(
  {
    CENTIPEDE_FORK_SERVER_FIFO0="%s" \
    CENTIPEDE_FORK_SERVER_FIFO1="%s" \
    %s
  } &
  echo -n $! > "%s"
)sh";
  const std::string fork_server_command = absl::StrFormat(
      kForkServerCommandStub, fork_server_->fifo_path_[0],
      fork_server_->fifo_path_[1], command_line_, pid_file_path);
  VLOG(2) << "Fork server command:" << fork_server_command;

  const int exit_code = system(fork_server_command.c_str());

  // Check if `system()` was able to parse and run the command at all.
  if (exit_code != EXIT_SUCCESS) {
    LogProblemInfo(
        "Failed to parse or run command to launch fork server; will proceed "
        "without it");
    return false;
  }

  // The fork server is probably running now. However, one failure scenario is
  // that it starts and exits early. Try opening the read/write comms pipes with
  // it: if that fails, something is wrong.
  // We use non-blocking I/O to open the pipes. That is good and safe, because:
  // 1) This prevents the `open()` calls from hanging when the fork server fails
  // to open the pipes on its side (note the use of O_RDWR, not O_WRONLY, to
  // avoid ENXIO).
  // 2) In `Command::Execute`, we wait for the return channel pipe with a
  // `poll()`, so it should always have data when we attempt to `read()` from
  // it.
  // See more at
  // https://www.gnu.org/software/libc/manual/html_node/Operating-Modes.html.
  if ((fork_server_->pipe_[0] = open(fork_server_->fifo_path_[0].c_str(),
                                     O_RDWR | O_NONBLOCK)) < 0 ||
      (fork_server_->pipe_[1] = open(fork_server_->fifo_path_[1].c_str(),
                                     O_RDONLY | O_NONBLOCK)) < 0) {
    LogProblemInfo(
        "Failed to establish communication with fork server; will proceed "
        "without it");
    return false;
  }

  // The fork server has started and the comms pipes got opened successfully.
  // Read the fork server's PID and the initial /proc/<PID>/exe symlink pointing
  // at the fork server's binary, written to the provided files by `command`.
  // `Execute()` uses these to monitor the fork server health.
  std::string pid_str;
  ReadFromLocalFile(pid_file_path, pid_str);
  CHECK(absl::SimpleAtoi(pid_str, &fork_server_->pid_)) << VV(pid_str);
  std::string proc_exe = absl::StrFormat("/proc/%d/exe", fork_server_->pid_);
  if (stat(proc_exe.c_str(), &fork_server_->exe_stat_) != EXIT_SUCCESS) {
    LogProblemInfo(
        absl::StrCat("Fork server appears not running; will proceed without it "
                     "(failed to stat ",
                     proc_exe, ")"));
    return false;
  }

  return true;
}

absl::Status Command::VerifyForkServerIsHealthy() {
  // Preconditions: the callers (`Execute()`) should call us only when the fork
  // server is presumed to be running (`fork_server_pid_` >= 0). If it is, the
  // comms pipes are guaranteed to be opened by `StartForkServer()`.
  CHECK(fork_server_ != nullptr) << "Fork server wasn't started";
  CHECK(fork_server_->pid_ >= 0) << "Fork server process failed to start";
  CHECK(fork_server_->pipe_[0] >= 0 && fork_server_->pipe_[1] >= 0)
      << "Failed to connect to fork server";

  // A process with the fork server PID exists (_some_ process, possibly with a
  // recycled PID)...
  if (kill(fork_server_->pid_, 0) != EXIT_SUCCESS) {
    return absl::UnknownError(absl::StrCat(
        "Can't communicate with fork server, PID=", fork_server_->pid_));
  }
  // ...and it is a process with our expected binary, so it's practically
  // guaranteed to be our original fork server process.
  const std::string proc_exe =
      absl::StrFormat("/proc/%d/exe", fork_server_->pid_);
  struct stat proc_exe_stat = {};
  if (stat(proc_exe.c_str(), &proc_exe_stat) != EXIT_SUCCESS) {
    return absl::UnknownError(absl::StrCat(
        "Failed to stat fork server's /proc/<PID>/exe symlink, PID=",
        fork_server_->pid_));
  }
  // TODO(b/281882892): Disable for now. Find a proper solution later.
  if constexpr (false) {
    if (proc_exe_stat.st_dev != fork_server_->exe_stat_.st_dev ||
        proc_exe_stat.st_ino != fork_server_->exe_stat_.st_ino) {
      return absl::UnknownError(absl::StrCat(
          "Fork server's /proc/<PID>/exe symlink changed (new process?), PID=",
          fork_server_->pid_));
    }
  }
  return absl::OkStatus();
}

int Command::Execute() {
  VLOG(1) << "Executing command '" << command_line_ << "'...";

  int exit_code = EXIT_SUCCESS;

  if (fork_server_ != nullptr) {
    VLOG(1) << "Sending execution request to fork server: " << VV(timeout_);

    if (const auto status = VerifyForkServerIsHealthy(); !status.ok()) {
      LogProblemInfo(absl::StrCat("Fork server should be running, but isn't: ",
                                  status.message()));
      return EXIT_FAILURE;
    }

    // Wake up the fork server.
    char x = ' ';
    CHECK_EQ(1, write(fork_server_->pipe_[0], &x, 1));

    // The fork server forks, the child is running. Block until some readable
    // data appears in the pipe (that is, after the fork server writes the
    // execution result to it).
    struct pollfd poll_fd = {};
    int poll_ret = -1;
    auto poll_deadline = absl::Now() + timeout_;
    // The `poll()` syscall can get interrupted: it sets errno==EINTR in that
    // case. We should tolerate that.
    do {
      // NOTE: `poll_fd` has to be reset every time.
      poll_fd = {
          .fd = fork_server_->pipe_[1],  // The file descriptor to wait for.
          .events = POLLIN,              // Wait until `fd` gets readable data.
      };
      const int poll_timeout_ms = static_cast<int>(absl::ToInt64Milliseconds(
          std::max(poll_deadline - absl::Now(), absl::Milliseconds(1))));
      poll_ret = poll(&poll_fd, 1, poll_timeout_ms);
    } while (poll_ret < 0 && errno == EINTR);

    if (poll_ret != 1 || (poll_fd.revents & POLLIN) == 0) {
      // The fork server errored out or timed out, or some other error occurred,
      // e.g. the syscall was interrupted.
      if (poll_ret == 0) {
        LogProblemInfo(
            absl::StrCat("Timeout while waiting for fork server: timeout is ",
                         absl::FormatDuration(timeout_)));
      } else {
        LogProblemInfo(absl::StrCat(
            "Error while waiting for fork server: poll() returned ", poll_ret));
      }
      return EXIT_FAILURE;
    }

    // The fork server wrote the execution result to the pipe: read it.
    CHECK_EQ(sizeof(exit_code),
             read(fork_server_->pipe_[1], &exit_code, sizeof(exit_code)));
  } else {
    VLOG(1) << "Fork server disabled - executing command directly";
    // No fork server, use system().
    exit_code = system(command_line_.c_str());
  }

  if (WIFEXITED(exit_code) && WEXITSTATUS(exit_code) != EXIT_SUCCESS) {
    const auto exit_status = WEXITSTATUS(exit_code);
    VlogProblemInfo(
        absl::StrCat("Command errored out: exit status=", exit_status),
        /*vlog_level=*/1);
    exit_code = exit_status;
  } else if (WIFSIGNALED(exit_code)) {
    const auto signal = WTERMSIG(exit_code);
    if (signal == SIGINT) {
      RequestEarlyExit(EXIT_FAILURE);
      // When the user kills Centipede via ^C, they are unlikely to be
      // interested in any of the subprocesses' outputs. Also, ^C terminates all
      // the subprocesses, including all the runners, so all their outputs would
      // get printed simultaneously, flooding the log. Hence log at a high
      // `vlog_level`.
      VlogProblemInfo("Command killed: signal=SIGINT (likely Ctrl-C)",
                      /*vlog_level=*/10);
    } else {
      // The fork server subprocess was killed by something other than ^C: log
      // at a lower `vlog_level` to help diagnose problems.
      VlogProblemInfo(absl::StrCat("Command killed: signal=", signal),
                      /*vlog_level=*/1);
    }
    exit_code = signal;
  }

  return exit_code;
}

std::string Command::ReadRedirectedStdout() const {
  std::string ret;
  if (!out_.empty()) {
    ReadFromLocalFile(out_, ret);
    if (ret.empty()) ret = "<EMPTY>";
  }
  return ret;
}

std::string Command::ReadRedirectedStderr() const {
  std::string ret;
  if (!err_.empty()) {
    if (err_ == "2>&1" || err_ == out_) {
      ret = "<DUPED TO STDOUT>";
    } else {
      ReadFromLocalFile(err_, ret);
      if (ret.empty()) ret = "<EMPTY>";
    }
  }
  return ret;
}

void Command::LogProblemInfo(std::string_view message) const {
  PLOG(ERROR) << message;
  LOG(ERROR).NoPrefix() << VV(command_line_);
  LOG(ERROR).NoPrefix() << "=== STDOUT ===";
  for (const auto &line : absl::StrSplit(ReadRedirectedStdout(), '\n')) {
    LOG(ERROR).NoPrefix() << line;
  }
  LOG(ERROR).NoPrefix() << "=== STDERR ===";
  for (const auto &line : absl::StrSplit(ReadRedirectedStderr(), '\n')) {
    LOG(ERROR).NoPrefix() << line;
  }
}

void Command::VlogProblemInfo(std::string_view message, int vlog_level) const {
  if (VLOG_IS_ON(vlog_level)) LogProblemInfo(message);
}

}  // namespace centipede
