blob: a6afaf9646cfbd9ddd7f218f9b2e37efd3727378 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Tests compiling and importing Python protos on the fly."""
from pathlib import Path
import tempfile
import unittest
from pw_protobuf_compiler import python_protos
from pw_protobuf_compiler.python_protos import bytes_repr, proto_repr
PROTO_1 = """\
syntax = "proto3";
package pw.protobuf_compiler.test1;
message SomeMessage {
uint32 magic_number = 1;
}
message AnotherMessage {
enum Result {
FAILED = 0;
FAILED_MISERABLY = 1;
I_DONT_WANT_TO_TALK_ABOUT_IT = 2;
}
Result result = 1;
string payload = 2;
}
service PublicService {
rpc Unary(SomeMessage) returns (AnotherMessage) {}
rpc ServerStreaming(SomeMessage) returns (stream AnotherMessage) {}
rpc ClientStreaming(stream SomeMessage) returns (AnotherMessage) {}
rpc BidiStreaming(stream SomeMessage) returns (stream AnotherMessage) {}
}
"""
PROTO_2 = """\
syntax = "proto2";
package pw.protobuf_compiler.test2;
message Request {
optional float magic_number = 1;
}
message Response {
}
service Alpha {
rpc Unary(Request) returns (Response) {}
}
service Bravo {
rpc BidiStreaming(stream Request) returns (stream Response) {}
}
"""
PROTO_3 = """\
syntax = "proto3";
package pw.protobuf_compiler.test2;
enum Greeting {
YO = 0;
HI = 1;
}
message Hello {
repeated int64 value = 1;
Greeting hi = 2;
}
message NestingMessage {
message NestedMessage {
message NestedNestedMessage {
int32 nested_nested_field = 1;
}
NestedNestedMessage nested_nested_message = 1;
}
NestedMessage nested_message = 1;
}
"""
class TestCompileAndImport(unittest.TestCase):
"""Test compiling and importing."""
def setUp(self):
self._proto_dir = tempfile.TemporaryDirectory(prefix='proto_test')
self._protos = []
for i, contents in enumerate([PROTO_1, PROTO_2, PROTO_3], 1):
self._protos.append(Path(self._proto_dir.name, f'test_{i}.proto'))
self._protos[-1].write_text(contents)
def tearDown(self):
self._proto_dir.cleanup()
def test_compile_to_temp_dir_and_import(self):
modules = {
m.DESCRIPTOR.name: m
for m in python_protos.compile_and_import(self._protos)
}
self.assertEqual(3, len(modules))
# Make sure the protobuf modules contain what we expect them to.
mod = modules['test_1.proto']
self.assertEqual(
4, len(mod.DESCRIPTOR.services_by_name['PublicService'].methods))
mod = modules['test_2.proto']
self.assertEqual(mod.Request(magic_number=1.5).magic_number, 1.5)
self.assertEqual(2, len(mod.DESCRIPTOR.services_by_name))
mod = modules['test_3.proto']
self.assertEqual(mod.Hello(value=[123, 456]).value, [123, 456])
class TestProtoLibrary(TestCompileAndImport):
"""Tests the Library class."""
def setUp(self):
super().setUp()
self._library = python_protos.Library(
python_protos.compile_and_import(self._protos))
def test_packages_can_access_messages(self):
msg = self._library.packages.pw.protobuf_compiler.test1.SomeMessage
self.assertEqual(msg(magic_number=123).magic_number, 123)
def test_packages_finds_across_modules(self):
msg = self._library.packages.pw.protobuf_compiler.test2.Request
self.assertEqual(msg(magic_number=50).magic_number, 50)
val = self._library.packages.pw.protobuf_compiler.test2.YO
self.assertEqual(val, 0)
def test_packages_invalid_name(self):
with self.assertRaises(AttributeError):
_ = self._library.packages.nothing
with self.assertRaises(AttributeError):
_ = self._library.packages.pw.NOT_HERE
with self.assertRaises(AttributeError):
_ = self._library.packages.pw.protobuf_compiler.test1.NotARealMsg
def test_access_modules_by_package(self):
test1 = self._library.modules_by_package['pw.protobuf_compiler.test1']
self.assertEqual(len(test1), 1)
self.assertEqual(test1[0].AnotherMessage.Result.Value('FAILED'), 0)
test2 = self._library.modules_by_package['pw.protobuf_compiler.test2']
self.assertEqual(len(test2), 2)
def test_access_modules_by_package_unknown(self):
with self.assertRaises(KeyError):
_ = self._library.modules_by_package['pw.not_real']
def test_library_from_strings(self):
# Replace the package to avoid conflicts with the other proto imports
new_protos = [
p.replace('pw.protobuf_compiler', 'proto.library.test')
for p in [PROTO_1, PROTO_2, PROTO_3]
]
library = python_protos.Library.from_strings(new_protos)
# Make sure we can safely import the same proto contents multiple times.
library = python_protos.Library.from_strings(new_protos)
msg = library.packages.proto.library.test.test2.Request
self.assertEqual(msg(magic_number=50).magic_number, 50)
val = library.packages.proto.library.test.test2.YO
self.assertEqual(val, 0)
def test_access_nested_packages_by_name(self):
self.assertIs(self._library.packages['pw.protobuf_compiler.test1'],
self._library.packages.pw.protobuf_compiler.test1)
self.assertIs(self._library.packages.pw['protobuf_compiler.test1'],
self._library.packages.pw.protobuf_compiler.test1)
self.assertIs(self._library.packages.pw.protobuf_compiler['test1'],
self._library.packages.pw.protobuf_compiler.test1)
def test_access_nested_packages_by_name_unknown_package(self):
with self.assertRaises(KeyError):
_ = self._library.packages['']
with self.assertRaises(KeyError):
_ = self._library.packages['.']
with self.assertRaises(KeyError):
_ = self._library.packages['protobuf_compiler.test1']
with self.assertRaises(KeyError):
_ = self._library.packages.pw['pw.protobuf_compiler.test1']
with self.assertRaises(KeyError):
_ = self._library.packages.pw.protobuf_compiler['not here']
def test_messages(self):
protos = self._library.packages.pw.protobuf_compiler
self.assertEqual(
set(self._library.messages()), {
protos.test1.SomeMessage,
protos.test1.AnotherMessage,
protos.test2.Request,
protos.test2.Response,
protos.test2.Hello,
protos.test2.NestingMessage,
protos.test2.NestingMessage.NestedMessage,
protos.test2.NestingMessage.NestedMessage.NestedNestedMessage,
})
PROTO_FOR_REPR = """\
syntax = "proto3";
package pw.test3;
enum Enum {
ZERO = 0;
ONE = 1;
}
message Nested {
repeated int64 value = 1;
Enum an_enum = 2;
}
message Message {
Nested message = 1;
repeated Nested repeated_message = 2;
fixed32 regular_int = 3;
optional int64 optional_int = 4;
repeated int32 repeated_int = 5;
bytes regular_bytes = 6;
optional bytes optional_bytes = 7;
repeated bytes repeated_bytes = 8;
string regular_string = 9;
optional string optional_string = 10;
repeated string repeated_string = 11;
Enum regular_enum = 12;
optional Enum optional_enum = 13;
repeated Enum repeated_enum = 14;
oneof oneof_test {
string oneof_1 = 15;
int32 oneof_2 = 16;
float oneof_3 = 17;
}
map<string, Nested> mapping = 18;
}
"""
class TestProtoRepr(unittest.TestCase):
"""Tests printing protobufs."""
def setUp(self):
protos = python_protos.Library.from_strings(PROTO_FOR_REPR)
self.enum = protos.packages.pw.test3.Enum
self.nested = protos.packages.pw.test3.Nested
self.message = protos.packages.pw.test3.Message
def test_empty(self):
self.assertEqual('pw.test3.Nested()', proto_repr(self.nested()))
self.assertEqual('pw.test3.Message()', proto_repr(self.message()))
def test_int_fields(self):
self.assertEqual(
'pw.test3.Message('
'regular_int=999, '
'optional_int=-1, '
'repeated_int=[0, 1, 2])',
proto_repr(
self.message(repeated_int=[0, 1, 2],
regular_int=999,
optional_int=-1)))
def test_bytes_fields(self):
self.assertEqual(
'pw.test3.Message('
r"regular_bytes=b'\xFE\xED\xBE\xEF', "
r"optional_bytes=b'', "
r"repeated_bytes=[b'Hello\'\'\''])",
proto_repr(
self.message(
regular_bytes=b'\xfe\xed\xbe\xef',
optional_bytes=b'',
repeated_bytes=[b"Hello'''"],
)))
def test_string_fields(self):
self.assertEqual(
'pw.test3.Message('
"regular_string='hi', "
"optional_string='', "
'repeated_string=["\'"])',
proto_repr(
self.message(
regular_string='hi',
optional_string='',
repeated_string=[b"'"],
)))
def test_enum_fields(self):
self.assertEqual('pw.test3.Nested(an_enum=pw.test3.Enum.ONE)',
proto_repr(self.nested(an_enum=1)))
self.assertEqual('pw.test3.Message(optional_enum=pw.test3.Enum.ONE)',
proto_repr(self.message(optional_enum=self.enum.ONE)))
self.assertEqual(
'pw.test3.Message(repeated_enum='
'[pw.test3.Enum.ONE, pw.test3.Enum.ONE, pw.test3.Enum.ZERO])',
proto_repr(self.message(repeated_enum=[1, 1, 0])))
def test_message_fields(self):
self.assertEqual(
'pw.test3.Message(message=pw.test3.Nested(value=[123]))',
proto_repr(self.message(message=self.nested(value=[123]))))
self.assertEqual(
'pw.test3.Message('
'repeated_message=[pw.test3.Nested(value=[123]), '
'pw.test3.Nested()])',
proto_repr(
self.message(
repeated_message=[self.nested(
value=[123]), self.nested()])))
def test_optional_shown_if_set_to_default(self):
self.assertEqual(
"pw.test3.Message("
"optional_int=0, optional_bytes=b'', optional_string='', "
"optional_enum=pw.test3.Enum.ZERO)",
proto_repr(
self.message(optional_int=0,
optional_bytes=b'',
optional_string='',
optional_enum=0)))
def test_oneof(self):
self.assertEqual(proto_repr(self.message(oneof_1='test')),
"pw.test3.Message(oneof_1='test')")
self.assertEqual(proto_repr(self.message(oneof_2=123)),
"pw.test3.Message(oneof_2=123)")
self.assertEqual(proto_repr(self.message(oneof_3=123)),
"pw.test3.Message(oneof_3=123.0)")
msg = self.message(oneof_1='test')
msg.oneof_2 = 99
self.assertEqual(proto_repr(msg), "pw.test3.Message(oneof_2=99)")
def test_map(self):
msg = self.message()
msg.mapping['zero'].MergeFrom(self.nested())
msg.mapping['one'].MergeFrom(
self.nested(an_enum=self.enum.ONE, value=[1]))
result = proto_repr(msg)
self.assertRegex(result, r'^pw.test3.Message\(mapping={.*}\)$')
self.assertIn("'zero': pw.test3.Nested()", result)
self.assertIn(
"'one': pw.test3.Nested(value=[1], an_enum=pw.test3.Enum.ONE)",
result)
def test_bytes_repr(self):
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef'),
r"b'\xFE\xED\xBE\xEF'")
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef123'),
r"b'\xFE\xED\xBE\xEF\x31\x32\x33'")
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef1234'),
r"b'\xFE\xED\xBE\xEF1234'")
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef12345'),
r"b'\xFE\xED\xBE\xEF12345'")
if __name__ == '__main__':
unittest.main()