blob: 756ae415daacc0646411e773f6b7c8fe34f6fc29 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include <cstring>
#include <map>
#include <string>
#include <string_view>
#include <type_traits>
#include "pw_allocator/libc_allocator.h"
#include "pw_assert/check.h"
#include "pw_bytes/byte_builder.h"
#include "pw_bytes/span.h"
#include "pw_checksum/crc32.h"
#include "pw_grpc/connection.h"
#include "pw_grpc/examples/echo/echo.rpc.pwpb.h"
#include "pw_grpc/grpc_channel_output.h"
#include "pw_grpc/pw_rpc_handler.h"
#include "pw_log/log.h"
#include "pw_result/result.h"
#include "pw_rpc/internal/hash.h"
#include "pw_rpc/internal/packet.h"
#include "pw_rpc_transport/service_registry.h"
#include "pw_span/span.h"
#include "pw_status/status.h"
#include "pw_status/try.h"
#include "pw_stream/socket_stream.h"
#include "pw_stream/stream.h"
#include "pw_string/string.h"
#include "pw_thread/test_thread_context.h"
#include "pw_thread/thread.h"
using pw::grpc::StreamId;
namespace {
static constexpr size_t kBufferSize = 512;
class EchoService
: public ::grpc::examples::echo::pw_rpc::pwpb::Echo::Service<EchoService> {
public:
void UnaryEcho(pw::ConstByteSpan request,
pw::rpc::RawUnaryResponder& responder) {
auto message =
::grpc::examples::echo::pwpb::EchoRequest::FindMessage(request);
if (!message.ok()) {
responder.Finish({}, message.status());
}
if (message->size() < 100) {
PW_LOG_INFO("UnaryEcho %s", message->data());
} else {
PW_LOG_INFO("UnaryEcho (len=%zu)", message->size());
}
quiet_ = message->compare("quiet") == 0;
last_unary_responder_ = std::move(responder);
if (quiet_) {
return;
}
std::array<std::byte, kBufferSize> mem_writer_buffer_;
std::array<std::byte, kBufferSize> encoder_scratch_buffer_;
pw::stream::MemoryWriter writer(mem_writer_buffer_);
::grpc::examples::echo::pwpb::EchoResponse::StreamEncoder encoder(
writer, encoder_scratch_buffer_);
auto checksum = message->rfind("crc32:", 0) == 0;
if (checksum) {
uint32_t crc32 = pw::checksum::Crc32::Calculate(
pw::span(reinterpret_cast<const std::byte*>(message->data()),
message->size()));
encoder.Write({.message = std::string_view(std::to_string(crc32))})
.IgnoreError();
} else {
encoder.Write({.message = *message}).IgnoreError();
}
last_unary_responder_.Finish(writer.WrittenData(), pw::OkStatus())
.IgnoreError();
}
void ServerStreamingEcho(
const ::grpc::examples::echo::pwpb::EchoRequest::Message& request,
ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>&
writer) {
PW_LOG_INFO("ServerStreamingEcho %s", request.message.c_str());
quiet_ = request.message.compare("quiet") == 0;
last_writer_ = std::move(writer);
if (quiet_) {
PW_LOG_INFO("not writing server streaming echo");
return;
}
for (size_t i = 0; i < 3; ++i) {
last_writer_.Write({.message = request.message}).IgnoreError();
}
last_writer_.Finish(pw::OkStatus()).IgnoreError();
}
void ClientStreamingEcho(
ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
::grpc::examples::echo::pwpb::EchoResponse::Message>&
reader) {
PW_LOG_INFO("ClientStreamingEcho");
last_reader_ = std::move(reader);
last_reader_.set_on_next(
[this](
const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
quiet_ = request.message.compare("quiet") == 0;
PW_LOG_INFO("ClientStreaming message %s", request.message.c_str());
});
last_reader_.set_on_completion_requested([this]() {
if (quiet_) {
return;
}
last_reader_.Finish({.message = "done"}).IgnoreError();
});
}
void BidirectionalStreamingEcho(
ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
::grpc::examples::echo::pwpb::EchoResponse::Message>&
reader_writer) {
PW_LOG_INFO("BidirectionalStreamingEcho");
last_reader_writer_ = std::move(reader_writer);
last_reader_writer_.set_on_completion_requested([this]() {
if (quiet_) {
return;
}
last_reader_writer_.Finish(pw::OkStatus()).IgnoreError();
});
last_reader_writer_.set_on_next(
[this](
const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
PW_LOG_INFO("BidiStreaming message %s", request.message.c_str());
quiet_ = request.message.compare("quiet") == 0;
if (quiet_) {
return;
}
last_reader_writer_.Write({.message = request.message}).IgnoreError();
});
}
private:
pw::rpc::RawUnaryResponder last_unary_responder_{};
ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>
last_writer_{};
ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
::grpc::examples::echo::pwpb::EchoResponse::Message>
last_reader_{};
ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
::grpc::examples::echo::pwpb::EchoResponse::Message>
last_reader_writer_{};
bool quiet_ = false;
};
constexpr uint32_t kTestChannelId = 1;
} // namespace
int main(int argc, char* argv[]) {
std::vector<std::string> args(argv, argv + argc);
int port = 3400;
int num_connections = 1;
if (args.size() > 1) {
if (args[1] == "--help") {
PW_LOG_INFO("Usage: [port=3400] [num_connections=1]");
PW_LOG_INFO(
" num_connections positional arg sets how many socket connections "
"should be processed before exit");
exit(0);
}
port = stoi(args[1]);
}
if (args.size() > 2) {
num_connections = stoi(args[2]);
}
std::setbuf(stdout, nullptr); // unbuffered stdout
pw::stream::ServerSocket server;
pw::grpc::GrpcChannelOutput rpc_egress;
std::array<pw::rpc::Channel, 1> tx_channels(
{pw::rpc::Channel::Create<kTestChannelId>(&rpc_egress)});
pw::rpc::ServiceRegistry service_registry(tx_channels);
EchoService echo_service;
service_registry.RegisterService(echo_service);
pw::grpc::PwRpcHandler handler(kTestChannelId,
service_registry.client_server().server());
rpc_egress.set_callbacks(handler);
PW_LOG_INFO("Main.Listen on port=%d", port);
if (auto status = server.Listen(port); !status.ok()) {
PW_LOG_ERROR("Main.Listen failed code=%d", status.code());
return 1;
}
for (int i = 0; i < num_connections; ++i) {
PW_LOG_INFO("Main.Accept");
auto socket = server.Accept();
if (!socket.ok()) {
PW_LOG_ERROR("Main.Accept failed code=%d", socket.status().code());
return 1;
}
PW_LOG_INFO("Main.Run");
pw::allocator::LibCAllocator message_assembly_allocator;
pw::thread::test::TestThreadContext connection_thread_context;
pw::thread::test::TestThreadContext send_thread_context;
pw::grpc::ConnectionThread conn(
*socket,
send_thread_context.options(),
handler,
[&socket]() { socket->Close(); },
&message_assembly_allocator);
rpc_egress.set_connection(conn);
pw::thread::Thread conn_thread(connection_thread_context.options(), conn);
conn_thread.join();
}
PW_LOG_INFO("Main.Run completed");
return 0;
}