// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once

#include <cassert>
#include <cstddef>
#include <utility>

#include "pw_containers/intrusive_list.h"
#include "pw_function/function.h"
#include "pw_rpc/internal/call_context.h"
#include "pw_rpc/internal/channel.h"
#include "pw_rpc/internal/lock.h"
#include "pw_rpc/internal/method.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/method_type.h"
#include "pw_rpc/service.h"
#include "pw_span/span.h"
#include "pw_status/status.h"
#include "pw_sync/lock_annotations.h"

namespace pw::rpc {

class Writer;

namespace internal {

class Endpoint;
class LockedEndpoint;
class Packet;

// Internal RPC Call class. The Call is used to respond to any type of RPC.
// Public classes like ServerWriters inherit from it with private inheritance
// and provide a public API for their use case. The Call's public API is used by
// the Server and Client classes.
//
// Private inheritance is used in place of composition or more complex
// inheritance hierarchy so that these objects all inherit from a common
// IntrusiveList::Item object. Private inheritance also gives the derived classs
// full control over their interfaces.
class Call : public IntrusiveList<Call>::Item {
 public:
  Call(const Call&) = delete;

  // Move support is provided to derived classes through the MoveFrom function.
  Call(Call&&) = delete;

  Call& operator=(const Call&) = delete;
  Call& operator=(Call&&) = delete;

  // True if the Call is active and ready to send responses.
  [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    return active_locked();
  }

  [[nodiscard]] bool active_locked() const
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return rpc_state_ == kActive;
  }

  uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; }

  uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    return channel_id_locked();
  }
  uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return channel_id_;
  }
  uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return service_id_;
  }
  uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return method_id_;
  }

  // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
  // status from sending the packet, or FAILED_PRECONDITION if the Call is not
  // active.
  Status CloseAndSendResponse(ConstByteSpan response, Status status)
      PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    return CloseAndSendResponseLocked(response, status);
  }

  Status CloseAndSendResponseLocked(ConstByteSpan response, Status status)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return CloseAndSendFinalPacketLocked(
        PacketType::RESPONSE, response, status);
  }

  Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
    return CloseAndSendResponse({}, status);
  }

  Status CloseAndSendServerErrorLocked(Status error)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return CloseAndSendFinalPacketLocked(PacketType::SERVER_ERROR, {}, error);
  }

  // Public call that ends the client stream for a client call.
  Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    return CloseClientStreamLocked();
  }

  // Internal call that closes the client stream.
  Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    client_stream_state_ = kClientStreamInactive;
    return SendPacket(PacketType::CLIENT_STREAM_END, {}, {});
  }

  // Sends a payload in either a server or client stream packet.
  Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    return WriteLocked(payload);
  }

  Status WriteLocked(ConstByteSpan payload)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  // Sends the initial request for a client call. If the request fails, the call
  // is closed.
  void SendInitialClientRequest(ConstByteSpan payload)
      PW_UNLOCK_FUNCTION(rpc_lock()) {
    // TODO(b/234876851): Ensure the call object is locked before releasing the
    //     RPC mutex.
    if (const Status status = SendPacket(PacketType::REQUEST, payload);
        !status.ok()) {
      HandleError(status);
    } else {
      rpc_lock().unlock();
    }
  }

  // Whenever a payload arrives (in a server/client stream or in a response),
  // call the on_next_ callback.
  // Precondition: rpc_lock() must be held.
  void HandlePayload(ConstByteSpan message) const
      PW_UNLOCK_FUNCTION(rpc_lock()) {
    const bool invoke = on_next_ != nullptr;
    // TODO(b/234876851): Ensure on_next_ is properly guarded.
    rpc_lock().unlock();

    if (invoke) {
      on_next_(message);
    }
  }

  // Handles an error condition for the call. This closes the call and calls the
  // on_error callback, if set.
  void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
    UnregisterAndMarkClosed();
    CallOnError(status);
  }

  // Aborts the RPC because of a change in the endpoint (e.g. channel closed,
  // service unregistered). Does NOT unregister the call! The calls must be
  // removed when iterating over the list in the endpoint.
  void Abort() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    // Locking here is problematic because CallOnError releases rpc_lock().
    //
    // b/234876851 must be addressed before the locking here can be cleaned up.
    MarkClosed();

    CallOnError(Status::Aborted());

    // Re-lock rpc_lock().
    rpc_lock().lock();
  }

  bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return HasClientStream(type_);
  }

  bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return HasServerStream(type_);
  }

  bool client_stream_open() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return client_stream_state_ == kClientStreamActive;
  }

  // Keep this public so the Nanopb implementation can set it from a helper
  // function.
  void set_on_next(Function<void(ConstByteSpan)>&& on_next)
      PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    set_on_next_locked(std::move(on_next));
  }

 protected:
  // Creates an inactive Call.
  constexpr Call()
      : endpoint_{},
        channel_id_{},
        id_{},
        service_id_{},
        method_id_{},
        rpc_state_{},
        type_{},
        call_type_{},
        client_stream_state_ {}
  {}

  // Creates an active server-side Call.
  Call(const LockedCallContext& context, MethodType type)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  // Creates an active client-side Call.
  Call(LockedEndpoint& client,
       uint32_t channel_id,
       uint32_t service_id,
       uint32_t method_id,
       MethodType type) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  // This call must be in a closed state when this is called.
  void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return *endpoint_;
  }

  void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    on_next_ = std::move(on_next);
  }

  void set_on_error(Function<void(Status)>&& on_error)
      PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    set_on_error_locked(std::move(on_error));
  }

  void set_on_error_locked(Function<void(Status)>&& on_error)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    on_error_ = std::move(on_error);
  }

  // Calls the on_error callback without closing the RPC. This is used when the
  // call has already completed.
  void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()) {
    const bool invoke = on_error_ != nullptr;

    // TODO(b/234876851): Ensure on_error_ is properly guarded.
    rpc_lock().unlock();
    if (invoke) {
      on_error_(error);
    }
  }

  void MarkClientStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    client_stream_state_ = kClientStreamInactive;
  }

  Status CloseAndSendResponseLocked(Status status)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return CloseAndSendFinalPacketLocked(PacketType::RESPONSE, {}, status);
  }

  // Cancels an RPC. For client calls only.
  Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
    LockGuard lock(rpc_lock());
    return CloseAndSendFinalPacketLocked(
        PacketType::CLIENT_ERROR, {}, Status::Cancelled());
  }

  // Unregisters the RPC from the endpoint & marks as closed. The call may be
  // active or inactive when this is called.
  void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  // Define conversions to the generic server/client RPC writer class. These
  // functions are defined in pw_rpc/writer.h after the Writer class is defined.
  constexpr operator Writer&();
  constexpr operator const Writer&() const;

 private:
  enum CallType : bool { kServerCall, kClientCall };

  // Common constructor for server & client calls.
  Call(LockedEndpoint& endpoint,
       uint32_t id,
       uint32_t channel_id,
       uint32_t service_id,
       uint32_t method_id,
       MethodType type,
       CallType call_type);

  Packet MakePacket(PacketType type,
                    ConstByteSpan payload,
                    Status status = OkStatus()) const
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    return Packet(type,
                  channel_id_locked(),
                  service_id(),
                  method_id(),
                  id_,
                  payload,
                  status);
  }

  void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
    channel_id_ = Channel::kUnassignedChannelId;
    rpc_state_ = kInactive;
    client_stream_state_ = kClientStreamInactive;
  }

  // Sends a payload with the specified type. The payload may either be in a
  // previously acquired buffer or in a standalone buffer.
  //
  // Returns FAILED_PRECONDITION if the call is not active().
  Status SendPacket(PacketType type,
                    ConstByteSpan payload,
                    Status status = OkStatus())
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  Status CloseAndSendFinalPacketLocked(PacketType type,
                                       ConstByteSpan response,
                                       Status status)
      PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());

  internal::Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
  uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
  uint32_t id_ PW_GUARDED_BY(rpc_lock());
  uint32_t service_id_ PW_GUARDED_BY(rpc_lock());
  uint32_t method_id_ PW_GUARDED_BY(rpc_lock());

  enum : bool { kInactive, kActive } rpc_state_ PW_GUARDED_BY(rpc_lock());
  MethodType type_ PW_GUARDED_BY(rpc_lock());
  CallType call_type_ PW_GUARDED_BY(rpc_lock());
  enum : bool {
    kClientStreamInactive,
    kClientStreamActive,
  } client_stream_state_ PW_GUARDED_BY(rpc_lock());

  // Called when the RPC is terminated due to an error.
  Function<void(Status error)> on_error_;

  // Called when a request is received. Only used for RPCs with client streams.
  // The raw payload buffer is passed to the callback.
  Function<void(ConstByteSpan payload)> on_next_;
};

}  // namespace internal
}  // namespace pw::rpc
