pw_protobuf_compiler: Improved proto repr

- Support nested messages, enums, oneof, mappings, optional fields, and
  repeated fields in the proto_repr function.
- Display only hexadecimal for bytes when fewer than half the characters
  are ASCII.
- Add the flag for optional fields in proto3 to dynamic proto
  compilation so that optional fields can be used in tests.

Change-Id: I87dd646de1e339e80ceec729476af89548af21e5
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/32880
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py b/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py
index 6c20d75..0c4a8cc 100644
--- a/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py
+++ b/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py
@@ -13,6 +13,7 @@
 # the License.
 """Tools for compiling and importing Python protos on the fly."""
 
+from collections.abc import Mapping
 import importlib.util
 import logging
 import os
@@ -47,6 +48,7 @@
 
     cmd: Tuple[PathOrStr, ...] = (
         'protoc',
+        '--experimental_allow_proto3_optional',
         '--python_out',
         os.path.abspath(output_dir),
         *(f'-I{d}' for d in include_paths),
@@ -311,20 +313,73 @@
             yield from module_list
 
 
-def proto_repr(message) -> str:
-    """Creates a repr-like string for a protobuf."""
-    fields = []
+def _repr_char(char: int) -> str:
+    r"""Returns an ASCII char or the \x code for non-printable values."""
+    if ord(' ') <= char <= ord('~'):
+        return r"\'" if chr(char) == "'" else chr(char)
 
+    return f'\\x{char:02X}'
+
+
+def bytes_repr(value: bytes) -> str:
+    """Prints bytes as mixed ASCII only if at least half are printable."""
+    ascii_char_count = sum(ord(' ') <= c <= ord('~') for c in value)
+    if ascii_char_count >= len(value) / 2:
+        contents = ''.join(_repr_char(c) for c in value)
+    else:
+        contents = ''.join(f'\\x{c:02X}' for c in value)
+
+    return f"b'{contents}'"
+
+
+def _field_repr(field, value) -> str:
+    if field.type == field.TYPE_ENUM:
+        try:
+            enum = field.enum_type.values_by_number[value]
+            return f'{field.enum_type.full_name}.{enum.name}'
+        except KeyError:
+            return repr(value)
+
+    if field.type == field.TYPE_MESSAGE:
+        return proto_repr(value)
+
+    if field.type == field.TYPE_BYTES:
+        return bytes_repr(value)
+
+    return repr(value)
+
+
+def _proto_repr(message) -> Iterator[str]:
     for field in message.DESCRIPTOR.fields:
         value = getattr(message, field.name)
 
-        # Include fields if has_<field>() is true or the value is non-default.
-        if hasattr(message, 'has_' + field.name):
-            if not getattr(message, 'has_' + field.name)():
+        # Skip fields that are not present.
+        try:
+            if not message.HasField(field.name):
                 continue
-        elif value == field.default_value:
-            continue
+        except ValueError:
+            # Skip default-valued fields that don't support HasField.
+            if (field.label != field.LABEL_REPEATED
+                    and value == field.default_value):
+                continue
 
-        fields.append(f'{field.name}={value!r}')
+        if field.label == field.LABEL_REPEATED:
+            if not value:
+                continue
 
-    return f'{message.DESCRIPTOR.full_name}({", ".join(fields)})'
+            if isinstance(value, Mapping):
+                key_desc, value_desc = field.message_type.fields
+                values = ', '.join(
+                    f'{_field_repr(key_desc, k)}: {_field_repr(value_desc, v)}'
+                    for k, v in value.items())
+                yield f'{field.name}={{{values}}}'
+            else:
+                values = ', '.join(_field_repr(field, v) for v in value)
+                yield f'{field.name}=[{values}]'
+        else:
+            yield f'{field.name}={_field_repr(field, value)}'
+
+
+def proto_repr(message) -> str:
+    """Creates a repr-like string for a protobuf."""
+    return f'{message.DESCRIPTOR.full_name}({", ".join(_proto_repr(message))})'
diff --git a/pw_protobuf_compiler/py/python_protos_test.py b/pw_protobuf_compiler/py/python_protos_test.py
index 53b1372..82ed547 100755
--- a/pw_protobuf_compiler/py/python_protos_test.py
+++ b/pw_protobuf_compiler/py/python_protos_test.py
@@ -19,6 +19,7 @@
 import unittest
 
 from pw_protobuf_compiler import python_protos
+from pw_protobuf_compiler.python_protos import bytes_repr, proto_repr
 
 PROTO_1 = """\
 syntax = "proto3";
@@ -201,5 +202,170 @@
             _ = self._library.packages.pw.protobuf_compiler['not here']
 
 
+PROTO_FOR_REPR = """\
+syntax = "proto3";
+
+package pw.test3;
+
+enum Enum {
+  ZERO = 0;
+  ONE = 1;
+}
+
+message Nested {
+  repeated int64 value = 1;
+  Enum an_enum = 2;
+}
+
+message Message {
+  Nested message = 1;
+  repeated Nested repeated_message = 2;
+
+  fixed32 regular_int = 3;
+  optional int64 optional_int = 4;
+  repeated int32 repeated_int = 5;
+
+  bytes regular_bytes = 6;
+  optional bytes optional_bytes = 7;
+  repeated bytes repeated_bytes = 8;
+
+  string regular_string = 9;
+  optional string optional_string = 10;
+  repeated string repeated_string = 11;
+
+  Enum regular_enum = 12;
+  optional Enum optional_enum = 13;
+  repeated Enum repeated_enum = 14;
+
+  oneof oneof_test {
+    string oneof_1 = 15;
+    int32 oneof_2 = 16;
+    float oneof_3 = 17;
+  }
+
+  map<string, Nested> mapping = 18;
+}
+"""
+
+
+class TestProtoRepr(unittest.TestCase):
+    """Tests printing protobufs."""
+    def setUp(self):
+        protos = python_protos.Library.from_strings(PROTO_FOR_REPR)
+        self.enum = protos.packages.pw.test3.Enum
+        self.nested = protos.packages.pw.test3.Nested
+        self.message = protos.packages.pw.test3.Message
+
+    def test_empty(self):
+        self.assertEqual('pw.test3.Nested()', proto_repr(self.nested()))
+        self.assertEqual('pw.test3.Message()', proto_repr(self.message()))
+
+    def test_int_fields(self):
+        self.assertEqual(
+            'pw.test3.Message('
+            'regular_int=999, '
+            'optional_int=-1, '
+            'repeated_int=[0, 1, 2])',
+            proto_repr(
+                self.message(repeated_int=[0, 1, 2],
+                             regular_int=999,
+                             optional_int=-1)))
+
+    def test_bytes_fields(self):
+        self.assertEqual(
+            'pw.test3.Message('
+            r"regular_bytes=b'\xFE\xED\xBE\xEF', "
+            r"optional_bytes=b'', "
+            r"repeated_bytes=[b'Hello\'\'\''])",
+            proto_repr(
+                self.message(
+                    regular_bytes=b'\xfe\xed\xbe\xef',
+                    optional_bytes=b'',
+                    repeated_bytes=[b"Hello'''"],
+                )))
+
+    def test_string_fields(self):
+        self.assertEqual(
+            'pw.test3.Message('
+            "regular_string='hi', "
+            "optional_string='', "
+            'repeated_string=["\'"])',
+            proto_repr(
+                self.message(
+                    regular_string='hi',
+                    optional_string='',
+                    repeated_string=[b"'"],
+                )))
+
+    def test_enum_fields(self):
+        self.assertEqual('pw.test3.Nested(an_enum=pw.test3.Enum.ONE)',
+                         proto_repr(self.nested(an_enum=1)))
+        self.assertEqual('pw.test3.Message(optional_enum=pw.test3.Enum.ONE)',
+                         proto_repr(self.message(optional_enum=self.enum.ONE)))
+        self.assertEqual(
+            'pw.test3.Message(repeated_enum='
+            '[pw.test3.Enum.ONE, pw.test3.Enum.ONE, pw.test3.Enum.ZERO])',
+            proto_repr(self.message(repeated_enum=[1, 1, 0])))
+
+    def test_message_fields(self):
+        self.assertEqual(
+            'pw.test3.Message(message=pw.test3.Nested(value=[123]))',
+            proto_repr(self.message(message=self.nested(value=[123]))))
+        self.assertEqual(
+            'pw.test3.Message('
+            'repeated_message=[pw.test3.Nested(value=[123]), '
+            'pw.test3.Nested()])',
+            proto_repr(
+                self.message(
+                    repeated_message=[self.nested(
+                        value=[123]), self.nested()])))
+
+    def test_optional_shown_if_set_to_default(self):
+        self.assertEqual(
+            "pw.test3.Message("
+            "optional_int=0, optional_bytes=b'', optional_string='', "
+            "optional_enum=pw.test3.Enum.ZERO)",
+            proto_repr(
+                self.message(optional_int=0,
+                             optional_bytes=b'',
+                             optional_string='',
+                             optional_enum=0)))
+
+    def test_oneof(self):
+        self.assertEqual(proto_repr(self.message(oneof_1='test')),
+                         "pw.test3.Message(oneof_1='test')")
+        self.assertEqual(proto_repr(self.message(oneof_2=123)),
+                         "pw.test3.Message(oneof_2=123)")
+        self.assertEqual(proto_repr(self.message(oneof_3=123)),
+                         "pw.test3.Message(oneof_3=123.0)")
+
+        msg = self.message(oneof_1='test')
+        msg.oneof_2 = 99
+        self.assertEqual(proto_repr(msg), "pw.test3.Message(oneof_2=99)")
+
+    def test_map(self):
+        msg = self.message()
+        msg.mapping['zero'].MergeFrom(self.nested())
+        msg.mapping['one'].MergeFrom(
+            self.nested(an_enum=self.enum.ONE, value=[1]))
+
+        result = proto_repr(msg)
+        self.assertRegex(result, r'^pw.test3.Message\(mapping={.*}\)$')
+        self.assertIn("'zero': pw.test3.Nested()", result)
+        self.assertIn(
+            "'one': pw.test3.Nested(value=[1], an_enum=pw.test3.Enum.ONE)",
+            result)
+
+    def test_bytes_repr(self):
+        self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef'),
+                         r"b'\xFE\xED\xBE\xEF'")
+        self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef123'),
+                         r"b'\xFE\xED\xBE\xEF\x31\x32\x33'")
+        self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef1234'),
+                         r"b'\xFE\xED\xBE\xEF1234'")
+        self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef12345'),
+                         r"b'\xFE\xED\xBE\xEF12345'")
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/pw_rpc/py/pw_rpc/callback_client.py b/pw_rpc/py/pw_rpc/callback_client.py
index 057bae2..7e71420 100644
--- a/pw_rpc/py/pw_rpc/callback_client.py
+++ b/pw_rpc/py/pw_rpc/callback_client.py
@@ -66,7 +66,7 @@
 OptionalTimeout = Union[UseDefault, float, None]
 
 ResponseCallback = Callable[[PendingRpc, Any], Any]
-CompletionCallback = Callable[[PendingRpc, Any], Any]
+CompletionCallback = Callable[[PendingRpc, Status], Any]
 ErrorCallback = Callable[[PendingRpc, Status], Any]