pw_hdlc_lite: Make RPC console more generic
- Make the HDLC frame handling code more generic.
- Move code for collecting protos from paths or modules into
python_protos.py.
- Provide a protos variable in the RPC console for accessing proto
messages by package.
Change-Id: I0d4e514944103e0657b4228a1e1aee236aefd8cd
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/25080
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/pw_cli/py/pw_cli/log.py b/pw_cli/py/pw_cli/log.py
index 8d9d1f7..d84b657 100644
--- a/pw_cli/py/pw_cli/log.py
+++ b/pw_cli/py/pw_cli/log.py
@@ -14,7 +14,8 @@
"""Configure the system logger for the default pw command log format."""
import logging
-from typing import NamedTuple, Optional
+from pathlib import Path
+from typing import NamedTuple, Union
import pw_cli.color
import pw_cli.env
@@ -62,8 +63,9 @@
def install(level: int = logging.INFO,
- use_color: Optional[bool] = None,
- hide_timestamp: bool = False) -> None:
+ use_color: bool = None,
+ hide_timestamp: bool = False,
+ log_file: Union[str, Path] = None) -> None:
"""Configure the system logger for the default pw command log format."""
colors = pw_cli.color.colors(use_color)
@@ -84,11 +86,12 @@
root = logging.getLogger()
root.setLevel(logging.DEBUG)
- _STDERR_HANDLER.setLevel(level)
- _STDERR_HANDLER.setFormatter(
+ handler = logging.FileHandler(log_file) if log_file else _STDERR_HANDLER
+ handler.setLevel(level)
+ handler.setFormatter(
logging.Formatter(timestamp_fmt + '%(levelname)s %(message)s',
'%Y%m%d %H:%M:%S'))
- root.addHandler(_STDERR_HANDLER)
+ root.addHandler(handler)
if env.PW_EMOJI:
name_attr = 'emoji'
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
index 57b74b2..2e79b5a 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
@@ -14,14 +14,12 @@
"""Utilities for using HDLC with pw_rpc."""
import logging
-from pathlib import Path
import sys
import threading
import time
-from types import ModuleType
-from typing import Any, BinaryIO, Callable, Iterable, List, NoReturn, Union
+from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn
-from pw_hdlc_lite.decode import FrameDecoder
+from pw_hdlc_lite.decode import Frame, FrameDecoder
from pw_hdlc_lite import encode
import pw_rpc
from pw_rpc import callback_client
@@ -51,10 +49,18 @@
return lambda data: writer(encode.information_frame(address, data))
-def read_and_process_data(rpc_client: pw_rpc.Client,
- device: BinaryIO,
- output: Callable[[bytes], Any],
- rpc_address: int = DEFAULT_ADDRESS) -> NoReturn:
+def _handle_error(frame: Frame) -> None:
+ _LOG.error('Failed to parse frame: %s', frame.status.value)
+ _LOG.debug('%s', frame.data)
+
+
+_FrameHandlers = Dict[int, Callable[[Frame], Any]]
+
+
+def read_and_process_data(
+ device: BinaryIO,
+ frame_handlers: _FrameHandlers,
+ error_handler: Callable[[Frame], Any] = _handle_error) -> NoReturn:
"""Reads HDLC frames from the device and passes them to the RPC client."""
decoder = FrameDecoder()
@@ -62,20 +68,14 @@
byte = device.read()
for frame in decoder.process_valid_frames(byte):
if not frame.ok():
- _LOG.error('Failed to parse frame: %s', frame.status.value)
+ error_handler(frame)
continue
- if frame.address == rpc_address:
- if not rpc_client.process_packet(frame.data):
- _LOG.error('Packet not handled by RPC client: %s', frame)
- elif frame.address == STDOUT_ADDRESS:
- output(frame.data)
- else:
+ try:
+ frame_handlers[frame.address](frame)
+ except KeyError:
_LOG.error('Unhandled frame for address %d: %s', frame.address,
- frame.data.decode(errors='replace'))
-
-
-_PathOrModule = Union[str, Path, ModuleType]
+ frame)
def write_to_file(data: bytes, output: BinaryIO = sys.stdout.buffer):
@@ -88,7 +88,7 @@
"""An RPC client configured to run over HDLC."""
def __init__(self,
device: BinaryIO,
- proto_paths_or_modules: Iterable[_PathOrModule],
+ proto_paths_or_modules: Iterable[python_protos.PathOrModule],
output: Callable[[bytes], Any] = write_to_file,
channels: Iterable[pw_rpc.Channel] = None,
client_impl: pw_rpc.client.ClientImpl = None):
@@ -100,16 +100,7 @@
output: where to write "stdout" output from the device
"""
self.device = device
-
- proto_modules = []
- proto_paths: List[Union[Path, str]] = []
- for proto in proto_paths_or_modules:
- if isinstance(proto, (Path, str)):
- proto_paths.append(proto)
- else:
- proto_modules.append(proto)
-
- proto_modules += python_protos.compile_and_import(proto_paths)
+ self.protos = python_protos.Library.from_paths(proto_paths_or_modules)
if channels is None:
channels = [pw_rpc.Channel(1, channel_output(device.write))]
@@ -118,12 +109,16 @@
client_impl = callback_client.Impl()
self.client = pw_rpc.Client.from_modules(client_impl, channels,
- proto_modules)
+ self.protos.modules())
+ frame_handlers: _FrameHandlers = {
+ DEFAULT_ADDRESS: self._handle_rpc_packet,
+ STDOUT_ADDRESS: lambda frame: output(frame.data),
+ }
# Start background thread that reads and processes RPC packets.
threading.Thread(target=read_and_process_data,
daemon=True,
- args=(self.client, device, output)).start()
+ args=(device, frame_handlers)).start()
def rpcs(self, channel_id: int = None) -> Any:
"""Returns object for accessing services on the specified channel.
@@ -136,3 +131,7 @@
return next(iter(self.client.channels())).rpcs
return self.client.channel(channel_id).rpcs
+
+ def _handle_rpc_packet(self, frame: Frame) -> None:
+ if not self.client.process_packet(frame.data):
+ _LOG.error('Packet not handled by RPC client: %s', frame.data)
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 1e3b929..0dbef7b 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -24,6 +24,7 @@
rpcs - used to invoke RPCs
device - the serial device used for communication
client - the pw_rpc.Client
+ protos - protocol buffer messages indexed by proto package
An example echo RPC command:
@@ -35,7 +36,7 @@
import logging
from pathlib import Path
import sys
-from typing import Collection, Iterable, Iterator, BinaryIO
+from typing import BinaryIO, Collection, Iterable, Iterator
import IPython # type: ignore
import serial # type: ignore
@@ -82,6 +83,7 @@
client=client,
channel_client=client.client.channel(1),
rpcs=client.client.channel(1).rpcs,
+ protos=client.protos.packages,
)
print(__doc__) # Print the banner
diff --git a/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py b/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py
index 4b7f75a..3286ccc 100644
--- a/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py
+++ b/pw_protobuf_compiler/py/pw_protobuf_compiler/python_protos.py
@@ -226,6 +226,9 @@
return packages
+PathOrModule = Union[str, Path, ModuleType]
+
+
class Library:
"""A collection of protocol buffer modules sorted by package.
@@ -245,10 +248,26 @@
for iterating over all modules.
"""
@classmethod
+ def from_paths(cls, protos: Iterable[PathOrModule]) -> 'Library':
+ """Creates a Library from paths to proto files or proto modules."""
+ paths: List[PathOrStr] = []
+ modules: List[ModuleType] = []
+
+ for proto in protos:
+ if isinstance(proto, (Path, str)):
+ paths.append(proto)
+ else:
+ modules.append(proto)
+
+ modules += compile_and_import(paths)
+ return Library(modules)
+
+ @classmethod
def from_strings(cls,
contents: Iterable[str],
includes: Iterable[PathOrStr] = (),
output_dir: PathOrStr = None) -> 'Library':
+ """Creates a proto library from protos in the provided strings."""
return cls(compile_and_import_strings(contents, includes, output_dir))
def __init__(self, modules: Iterable[ModuleType]):