blob: 0c005fd1654bea2956c51fc19671d11c59169fc3 [file] [log] [blame]
// Copyright 2020 The Pigweed Authors
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once
#include <type_traits>
#include "pw_assert/assert.h"
#include "pw_bytes/span.h"
#include "pw_containers/vector.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/hash.h"
#include "pw_rpc/internal/method_lookup.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc/internal/raw_method.h"
#include "pw_rpc/internal/server.h"
namespace pw::rpc {
// Declares a context object that may be used to invoke an RPC. The context is
// declared with the name of the implemented service and the method to invoke.
// The RPC can then be invoked with the call method.
// For a unary RPC, returns the status, and the response
// struct can be accessed via context.response().
// PW_RAW_TEST_METHOD_CONTEXT(my::CoolService, TheMethod) context;
// EXPECT_EQ(OkStatus(),;
// std::memcmp(encoded_response,
// context.response().data(),
// sizeof(encoded_response)));
// For a server streaming RPC, invokes the method. As in a
// normal RPC, the method completes when the ServerWriter's Finish method is
// called (or it goes out of scope).
// PW_RAW_TEST_METHOD_CONTEXT(my::CoolService, TheStreamingMethod) context;
// EXPECT_TRUE(context.done()); // Check that the RPC completed
// EXPECT_EQ(OkStatus(), context.status()); // Check the status
// EXPECT_EQ(3u, context.responses().size());
// ByteSpan& response = context.responses()[0]; // check individual responses
// for (ByteSpan& response : context.responses()) {
// // iterate over the responses
// }
// PW_RAW_TEST_METHOD_CONTEXT forwards its constructor arguments to the
// underlying service. For example:
// PW_RAW_TEST_METHOD_CONTEXT(MyService, Go) context(service, args);
// PW_RAW_TEST_METHOD_CONTEXT takes two optional arguments:
// size_t kMaxResponse: maximum responses to store; ignored unless streaming
// size_t kOutputSizeBytes: buffer size; must be large enough for a packet
// Example:
// PW_RAW_TEST_METHOD_CONTEXT(MyService, BestMethod, 3, 256) context;
// ASSERT_EQ(3u, context.responses().max_size());
#define PW_RAW_TEST_METHOD_CONTEXT(service, method, ...) \
::pw::rpc::RawTestMethodContext<service, \
&service::method, \
::pw::rpc::internal::Hash(#method), \
template <typename Service,
auto method,
uint32_t kMethodId,
size_t kMaxResponse = 4,
size_t kOutputSizeBytes = 128>
class RawTestMethodContext;
// Internal classes that implement RawTestMethodContext.
namespace internal::test::raw {
// A ChannelOutput implementation that stores the outgoing payloads and status.
template <size_t kOutputSize>
class MessageOutput final : public ChannelOutput {
using ResponseBuffer = std::array<std::byte, kOutputSize>;
MessageOutput(Vector<ByteSpan>& responses,
Vector<ResponseBuffer>& buffers,
ByteSpan packet_buffer)
: ChannelOutput("internal::test::raw::MessageOutput"),
packet_buffer_(packet_buffer) {
Status last_status() const { return last_status_; }
void set_last_status(Status status) { last_status_ = status; }
size_t total_responses() const { return total_responses_; }
bool stream_ended() const { return stream_ended_; }
void clear() {
total_responses_ = 0;
stream_ended_ = false;
last_status_ = Status::Unknown();
ByteSpan AcquireBuffer() override { return packet_buffer_; }
Status SendAndReleaseBuffer(std::span<const std::byte> buffer) override;
Vector<ByteSpan>& responses_;
Vector<ResponseBuffer>& buffers_;
ByteSpan packet_buffer_;
size_t total_responses_;
bool stream_ended_;
Status last_status_;
// Collects everything needed to invoke a particular RPC.
template <typename Service,
uint32_t kMethodId,
size_t kMaxResponse,
size_t kOutputSize>
struct InvocationContext {
template <typename... Args>
InvocationContext(Args&&... args)
: output(responses, buffers, packet_buffer),
server(std::span(&channel, 1)),
MethodLookup::GetRawMethod<Service, kMethodId>()) {}
using ResponseBuffer = std::array<std::byte, kOutputSize>;
MessageOutput<kOutputSize> output;
rpc::Channel channel;
rpc::Server server;
Service service;
Vector<ByteSpan, kMaxResponse> responses;
Vector<ResponseBuffer, kMaxResponse> buffers;
std::array<std::byte, kOutputSize> packet_buffer = {};
internal::ServerCall call;
// Method invocation context for a unary RPC. Returns the status in call() and
// provides the response through the response() method.
template <typename Service, auto method, uint32_t kMethodId, size_t kOutputSize>
class UnaryContext {
using Context = InvocationContext<Service, kMethodId, 1, kOutputSize>;
Context ctx_;
template <typename... Args>
UnaryContext(Args&&... args) : ctx_(std::forward<Args>(args)...) {}
Service& service() { return ctx_.service; }
// Invokes the RPC with the provided request. Returns RPC's StatusWithSize.
StatusWithSize call(ConstByteSpan request) {
ctx_.buffers.back() = {};
auto& response = ctx_.responses.back();
response = {ctx_.buffers.back().data(), ctx_.buffers.back().size()};
auto sws = CallMethodImplFunction<method>(, request, response);
response = response.first(sws.size());
return sws;
// Gives access to the RPC's response.
ConstByteSpan response() const {
PW_ASSERT(ctx_.responses.size() > 0u);
return ctx_.responses.back();
// Method invocation context for a server streaming RPC.
template <typename Service,
auto method,
uint32_t kMethodId,
size_t kMaxResponse,
size_t kOutputSize>
class ServerStreamingContext {
using Context =
InvocationContext<Service, kMethodId, kMaxResponse, kOutputSize>;
Context ctx_;
template <typename... Args>
ServerStreamingContext(Args&&... args) : ctx_(std::forward<Args>(args)...) {}
Service& service() { return ctx_.service; }
// Invokes the RPC with the provided request.
void call(ConstByteSpan request) {
BaseServerWriter server_writer(;
return CallMethodImplFunction<method>(, request, static_cast<RawServerWriter&>(server_writer));
// Returns a server writer which writes responses into the context's buffer.
// This should not be called alongside call(); use one or the other.
RawServerWriter writer() {
BaseServerWriter server_writer(;
return std::move(static_cast<RawServerWriter&>(server_writer));
// Returns the responses that have been recorded. The maximum number of
// responses is responses().max_size(). responses().back() is always the most
// recent response, even if total_responses() > responses().max_size().
const Vector<ByteSpan>& responses() const { return ctx_.responses; }
// The total number of responses sent, which may be larger than
// responses.max_size().
size_t total_responses() const { return ctx_.output.total_responses(); }
// True if the stream has terminated.
bool done() const { return ctx_.output.stream_ended(); }
// The status of the stream. Only valid if done() is true.
Status status() const {
return ctx_.output.last_status();
// Alias to select the type of the context object to use based on which type of
// RPC it is for.
template <typename Service,
auto method,
uint32_t kMethodId,
size_t kMaxResponse,
size_t kOutputSize>
using Context = std::tuple_element_t<
std::tuple<UnaryContext<Service, method, kMethodId, kOutputSize>,
// TODO(hepler): Support client and bidi streaming
template <size_t kOutputSize>
Status MessageOutput<kOutputSize>::SendAndReleaseBuffer(
std::span<const std::byte> buffer) {
if (buffer.empty()) {
return OkStatus();
Result<internal::Packet> result = internal::Packet::FromBuffer(buffer);
last_status_ = result.value().status();
switch (result.value().type()) {
case internal::PacketType::RESPONSE: {
// If we run out of space, the back message is always the most recent.
buffers_.back() = {};
auto response = result.value().payload();
std::memcpy(&buffers_.back(),, response.size());
responses_.back() = {buffers_.back().data(), response.size()};
total_responses_ += 1;
case internal::PacketType::SERVER_STREAM_END:
stream_ended_ = true;
pw_assert_HandleFailure(); // Unhandled PacketType
return OkStatus();
} // namespace internal::test::raw
template <typename Service,
auto method,
uint32_t kMethodId,
size_t kMaxResponse,
size_t kOutputSizeBytes>
class RawTestMethodContext
: public internal::test::raw::
Context<Service, method, kMethodId, kMaxResponse, kOutputSizeBytes> {
// Forwards constructor arguments to the service class.
template <typename... ServiceArgs>
RawTestMethodContext(ServiceArgs&&... service_args)
: internal::test::raw::
Context<Service, method, kMethodId, kMaxResponse, kOutputSizeBytes>(
std::forward<ServiceArgs>(service_args)...) {}
} // namespace pw::rpc