pw_hdlc_lite: Make HdlcRpcClient easier to use
- On HdlcRpcClient, require the `channels` arg rather than accepting a
`write` arg that may not be used.
- Accept a python_protos.Library, paths, or modules for the protos
argument.
Change-Id: I9b5abf89e44a6f967ed5110b51643efdac8740d8
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/28040
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
index 78e0450..2405f3b 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
@@ -18,7 +18,8 @@
import sys
import threading
import time
-from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn, Optional
+from typing import (Any, BinaryIO, Callable, Dict, Iterable, List, NoReturn,
+ Optional, Union)
from pw_protobuf_compiler import python_protos
import pw_rpc
@@ -61,11 +62,11 @@
_LOG.debug('%s', frame.data)
-_FrameHandlers = Dict[int, Callable[[Frame], Any]]
+FrameHandlers = Dict[int, Callable[[Frame], Any]]
def read_and_process_data(read: Callable[[], bytes],
- frame_handlers: _FrameHandlers,
+ frame_handlers: FrameHandlers,
error_handler: Callable[[Frame],
Any] = _handle_error,
handler_threads: Optional[int] = 1) -> NoReturn:
@@ -104,39 +105,42 @@
def write_to_file(data: bytes, output: BinaryIO = sys.stdout.buffer):
- output.write(data)
- output.write(b'\n')
+ output.write(data + b'\n')
output.flush()
+def default_channels(write: Callable[[bytes], Any]) -> List[pw_rpc.Channel]:
+ return [pw_rpc.Channel(1, channel_output(write))]
+
+
class HdlcRpcClient:
"""An RPC client configured to run over HDLC."""
def __init__(self,
read: Callable[[], bytes],
- write: Callable[[bytes], Any],
- proto_paths_or_modules: Iterable[python_protos.PathOrModule],
+ paths_or_modules: Union[Iterable[python_protos.PathOrModule],
+ python_protos.Library],
+ channels: Iterable[pw_rpc.Channel],
output: Callable[[bytes], Any] = write_to_file,
- channels: Iterable[pw_rpc.Channel] = None,
client_impl: pw_rpc.client.ClientImpl = None):
"""Creates an RPC client configured to communicate using HDLC.
Args:
read: Function that reads bytes; e.g serial_device.read.
- write: Function that writes bytes; e.g. serial_device.write
- proto_paths_or_modules: paths to .proto files or proto modules
+ paths_or_modules: paths to .proto files or proto modules
+ channel: RPC channels to use for output
output: where to write "stdout" output from the device
"""
- self.protos = python_protos.Library.from_paths(proto_paths_or_modules)
-
- if channels is None:
- channels = [pw_rpc.Channel(1, channel_output(write))]
+ if isinstance(paths_or_modules, python_protos.Library):
+ self.protos = paths_or_modules
+ else:
+ self.protos = python_protos.Library.from_paths(paths_or_modules)
if client_impl is None:
client_impl = callback_client.Impl()
self.client = pw_rpc.Client.from_modules(client_impl, channels,
self.protos.modules())
- frame_handlers: _FrameHandlers = {
+ frame_handlers: FrameHandlers = {
DEFAULT_ADDRESS: self._handle_rpc_packet,
STDOUT_ADDRESS: lambda frame: output(frame.data),
}
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 2908be8..0c033d9 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -42,7 +42,7 @@
import IPython # type: ignore
import serial # type: ignore
-from pw_hdlc_lite.rpc import HdlcRpcClient, write_to_file
+from pw_hdlc_lite.rpc import HdlcRpcClient, default_channels, write_to_file
_LOG = logging.getLogger(__name__)
@@ -156,7 +156,7 @@
return 1
_start_ipython_terminal(
- HdlcRpcClient(read, write, protos,
+ HdlcRpcClient(read, protos, default_channels(write),
lambda data: write_to_file(data, output)))
return 0
diff --git a/pw_hdlc_lite/rpc_example/example_script.py b/pw_hdlc_lite/rpc_example/example_script.py
index 976cbb8..b749c58 100755
--- a/pw_hdlc_lite/rpc_example/example_script.py
+++ b/pw_hdlc_lite/rpc_example/example_script.py
@@ -20,7 +20,7 @@
import serial # type: ignore
-from pw_hdlc_lite.rpc import HdlcRpcClient
+from pw_hdlc_lite.rpc import HdlcRpcClient, default_channels
# Point the script to the .proto file with our RPC services.
PROTO = Path(os.environ['PW_ROOT'], 'pw_rpc/pw_rpc_protos/echo.proto')
@@ -29,7 +29,8 @@
def script(device: str, baud: int) -> None:
# Set up a pw_rpc client that uses HDLC.
ser = serial.Serial(device, baud, timeout=0.01)
- client = HdlcRpcClient(lambda: ser.read(4096), ser.write, [PROTO])
+ client = HdlcRpcClient(lambda: ser.read(4096), [PROTO],
+ default_channels(ser.write))
# Make a shortcut to the EchoService.
echo_service = client.rpcs().pw.rpc.EchoService