blob: 463de4fc11b0ae113b706bea4f6ab933f5bfa3ea [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""This module generates the code for nanopb-based pw_rpc services."""
import os
from typing import Iterable, NamedTuple, Optional
from pw_protobuf.output_file import OutputFile
from pw_protobuf.proto_tree import ProtoServiceMethod
from pw_protobuf.proto_tree import build_node_tree
from pw_rpc import codegen
from pw_rpc.codegen import (client_call_type, get_id, CodeGenerator,
RPC_NAMESPACE)
PROTO_H_EXTENSION = '.pb.h'
PROTO_CC_EXTENSION = '.pb.cc'
NANOPB_H_EXTENSION = '.pb.h'
def _serde(method: ProtoServiceMethod) -> str:
"""Returns the NanopbMethodSerde for this method."""
return (f'{RPC_NAMESPACE}::internal::kNanopbMethodSerde<'
f'{method.request_type().nanopb_fields()}, '
f'{method.response_type().nanopb_fields()}>')
def _proto_filename_to_nanopb_header(proto_file: str) -> str:
"""Returns the generated nanopb header name for a .proto file."""
return os.path.splitext(proto_file)[0] + NANOPB_H_EXTENSION
def _proto_filename_to_generated_header(proto_file: str) -> str:
"""Returns the generated C++ RPC header name for a .proto file."""
filename = os.path.splitext(proto_file)[0]
return f'{filename}.rpc{PROTO_H_EXTENSION}'
def _client_call(method: ProtoServiceMethod) -> str:
template_args = []
if method.client_streaming():
template_args.append(method.request_type().nanopb_struct())
template_args.append(method.response_type().nanopb_struct())
return f'{client_call_type(method, "Nanopb")}<{", ".join(template_args)}>'
def _function(method: ProtoServiceMethod) -> str:
return f'{_client_call(method)} {method.name()}'
def _user_args(method: ProtoServiceMethod) -> Iterable[str]:
if not method.client_streaming():
yield f'const {method.request_type().nanopb_struct()}& request'
response = method.response_type().nanopb_struct()
if method.server_streaming():
yield f'::pw::Function<void(const {response}&)>&& on_next = nullptr'
yield '::pw::Function<void(::pw::Status)>&& on_completed = nullptr'
else:
yield (f'::pw::Function<void(const {response}&, ::pw::Status)>&& '
'on_completed = nullptr')
yield '::pw::Function<void(::pw::Status)>&& on_error = nullptr'
class NanopbCodeGenerator(CodeGenerator):
"""Generates an RPC service and client using the Nanopb API."""
def name(self) -> str:
return 'nanopb'
def method_union_name(self) -> str:
return 'NanopbMethodUnion'
def includes(self, proto_file_name: str) -> Iterable[str]:
yield '#include "pw_rpc/nanopb/client_reader_writer.h"'
yield '#include "pw_rpc/nanopb/internal/method_union.h"'
yield '#include "pw_rpc/nanopb/server_reader_writer.h"'
# Include the corresponding nanopb header file for this proto file, in
# which the file's messages and enums are generated. All other files
# imported from the .proto file are #included in there.
nanopb_header = _proto_filename_to_nanopb_header(proto_file_name)
yield f'#include "{nanopb_header}"'
def service_aliases(self) -> None:
self.line('template <typename Response>')
self.line('using ServerWriter = '
f'{RPC_NAMESPACE}::NanopbServerWriter<Response>;')
self.line('template <typename Request, typename Response>')
self.line('using ServerReader = '
f'{RPC_NAMESPACE}::NanopbServerReader<Request, Response>;')
self.line('template <typename Request, typename Response>')
self.line(
'using ServerReaderWriter = '
f'{RPC_NAMESPACE}::NanopbServerReaderWriter<Request, Response>;')
def method_descriptor(self, method: ProtoServiceMethod) -> None:
self.line(f'{RPC_NAMESPACE}::internal::'
f'GetNanopbOrRawMethodFor<&Implementation::{method.name()}, '
f'{method.type().cc_enum()}, '
f'{method.request_type().nanopb_struct()}, '
f'{method.response_type().nanopb_struct()}>(')
with self.indent(4):
self.line(f'{get_id(method)}, // Hash of "{method.name()}"')
self.line(f'{_serde(method)}),')
def client_member_function(self, method: ProtoServiceMethod) -> None:
"""Outputs client code for a single RPC method."""
self.line(f'{_function(method)}(')
self.indented_list(*_user_args(method), end=') const {')
with self.indent():
client_call = _client_call(method)
base = 'Stream' if method.server_streaming() else 'Unary'
self.line(f'return {RPC_NAMESPACE}::internal::'
f'Nanopb{base}ResponseClientCall<'
f'{method.response_type().nanopb_struct()}>::'
f'Start<{client_call}>(')
service_client = RPC_NAMESPACE + '::internal::ServiceClient'
args = [
f'{service_client}::client()',
f'{service_client}::channel_id()',
'kServiceId',
get_id(method),
_serde(method),
]
if method.server_streaming():
args.append('std::move(on_next)')
args.append('std::move(on_completed)')
args.append('std::move(on_error)')
if not method.client_streaming():
args.append('request')
self.indented_list(*args, end=');')
self.line('}')
def client_static_function(self, method: ProtoServiceMethod) -> None:
self.line(f'static {_function(method)}(')
self.indented_list(f'{RPC_NAMESPACE}::Client& client',
'uint32_t channel_id',
*_user_args(method),
end=') {')
with self.indent():
self.line(f'return Client(client, channel_id).{method.name()}(')
args = []
if not method.client_streaming():
args.append('request')
if method.server_streaming():
args.append('std::move(on_next)')
self.indented_list(*args,
'std::move(on_completed)',
'std::move(on_error)',
end=');')
self.line('}')
def method_info_specialization(self, method: ProtoServiceMethod) -> None:
self.line()
self.line(f'using Request = {method.request_type().nanopb_struct()};')
self.line(
f'using Response = {method.response_type().nanopb_struct()};')
self.line()
self.line(f'static constexpr const {RPC_NAMESPACE}::internal::'
'NanopbMethodSerde& serde() {')
with self.indent():
self.line(f'return {_serde(method)};')
self.line('}')
class _CallbackFunction(NamedTuple):
"""Represents a callback function parameter in a client RPC call."""
function_type: str
name: str
default_value: Optional[str] = None
def __str__(self):
param = f'::pw::Function<{self.function_type}>&& {self.name}'
if self.default_value:
param += f' = {self.default_value}'
return param
class StubGenerator(codegen.StubGenerator):
"""Generates Nanopb RPC stubs."""
def unary_signature(self, method: ProtoServiceMethod, prefix: str) -> str:
return (f'::pw::Status {prefix}{method.name()}( '
f'const {method.request_type().nanopb_struct()}& request, '
f'{method.response_type().nanopb_struct()}& response)')
def unary_stub(self, method: ProtoServiceMethod,
output: OutputFile) -> None:
output.write_line(codegen.STUB_REQUEST_TODO)
output.write_line('static_cast<void>(request);')
output.write_line(codegen.STUB_RESPONSE_TODO)
output.write_line('static_cast<void>(response);')
output.write_line('return ::pw::Status::Unimplemented();')
def server_streaming_signature(self, method: ProtoServiceMethod,
prefix: str) -> str:
return (
f'void {prefix}{method.name()}( '
f'const {method.request_type().nanopb_struct()}& request, '
f'ServerWriter<{method.response_type().nanopb_struct()}>& writer)')
def client_streaming_signature(self, method: ProtoServiceMethod,
prefix: str) -> str:
return (f'void {prefix}{method.name()}( '
f'ServerReader<{method.request_type().nanopb_struct()}, '
f'{method.response_type().nanopb_struct()}>& reader)')
def bidirectional_streaming_signature(self, method: ProtoServiceMethod,
prefix: str) -> str:
return (f'void {prefix}{method.name()}( '
f'ServerReaderWriter<{method.request_type().nanopb_struct()}, '
f'{method.response_type().nanopb_struct()}>& reader_writer)')
def process_proto_file(proto_file) -> Iterable[OutputFile]:
"""Generates code for a single .proto file."""
_, package_root = build_node_tree(proto_file)
output_filename = _proto_filename_to_generated_header(proto_file.name)
generator = NanopbCodeGenerator(output_filename)
codegen.generate_package(proto_file, package_root, generator)
codegen.package_stubs(package_root, generator, StubGenerator())
return [generator.output]