pw_hdlc_lite: Client I/O improvements
- Separate threads from reading data and responding to callbacks.
- Abstract the HDLC read / write as functions, instead of a device
object.
Change-Id: I3b0fd8e5ec2fc6cf65f377358479625a7e3ce540
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/27320
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
index 50ed66d..7ee008c 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
@@ -13,11 +13,12 @@
# the License.
"""Utilities for using HDLC with pw_rpc."""
+from concurrent.futures import ThreadPoolExecutor
import logging
import sys
import threading
import time
-from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn
+from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn, Optional
from pw_protobuf_compiler import python_protos
import pw_rpc
@@ -47,7 +48,12 @@
return lambda data: slow_write(encode.information_frame(address, data))
- return lambda data: writer(encode.information_frame(address, data))
+ def write_hdlc(data: bytes):
+ frame = encode.information_frame(address, data)
+ _LOG.debug('Write %2d B: %s', len(frame), frame)
+ writer(frame)
+
+ return write_hdlc
def _handle_error(frame: Frame) -> None:
@@ -58,25 +64,43 @@
_FrameHandlers = Dict[int, Callable[[Frame], Any]]
-def read_and_process_data(
- device: BinaryIO,
- frame_handlers: _FrameHandlers,
- error_handler: Callable[[Frame], Any] = _handle_error) -> NoReturn:
- """Reads HDLC frames from the device and passes them to the RPC client."""
- decoder = FrameDecoder()
+def read_and_process_data(read: Callable[[], bytes],
+ frame_handlers: _FrameHandlers,
+ error_handler: Callable[[Frame],
+ Any] = _handle_error,
+ handler_threads: Optional[int] = 1) -> NoReturn:
+ """Continuously reads and handles HDLC frames.
- while True:
- byte = device.read()
- for frame in decoder.process_valid_frames(byte):
+ Passes frames to an executor that calls frame handler functions in other
+ threads.
+ """
+ def handle_frame(frame: Frame):
+ try:
if not frame.ok():
error_handler(frame)
- continue
+ return
try:
frame_handlers[frame.address](frame)
except KeyError:
- _LOG.error('Unhandled frame for address %d: %s', frame.address,
- frame)
+ _LOG.warning('Unhandled frame for address %d: %s',
+ frame.address, frame)
+ except: # pylint: disable=bare-except
+ _LOG.exception('Exception in HDLC frame handler thread')
+
+ decoder = FrameDecoder()
+
+ # Execute callbacks in a ThreadPoolExecutor to decouple reading the input
+ # stream from handling the data. That way, if a handler function takes a
+ # long time or crashes, this reading thread is not interrupted.
+ with ThreadPoolExecutor(max_workers=handler_threads) as executor:
+ while True:
+ data = read()
+ if data:
+ _LOG.debug('Read %2d B: %s', len(data), data)
+
+ for frame in decoder.process_valid_frames(data):
+ executor.submit(handle_frame, frame)
def write_to_file(data: bytes, output: BinaryIO = sys.stdout.buffer):
@@ -88,7 +112,8 @@
class HdlcRpcClient:
"""An RPC client configured to run over HDLC."""
def __init__(self,
- device: Any,
+ read: Callable[[], bytes],
+ write: Callable[[bytes], Any],
proto_paths_or_modules: Iterable[python_protos.PathOrModule],
output: Callable[[bytes], Any] = write_to_file,
channels: Iterable[pw_rpc.Channel] = None,
@@ -96,16 +121,15 @@
"""Creates an RPC client configured to communicate using HDLC.
Args:
- device: serial.Serial (or any class that implements read and
- write) for reading/writing data proto_paths_or_modules: paths
- to .proto files or proto modules output: where to write
- "stdout" output from the device
+ read: Function that reads bytes; e.g serial_device.read.
+ write: Function that writes bytes; e.g. serial_device.write
+ proto_paths_or_modules: paths to .proto files or proto modules
+ output: where to write "stdout" output from the device
"""
- self.device = device
self.protos = python_protos.Library.from_paths(proto_paths_or_modules)
if channels is None:
- channels = [pw_rpc.Channel(1, channel_output(device.write))]
+ channels = [pw_rpc.Channel(1, channel_output(write))]
if client_impl is None:
client_impl = callback_client.Impl()
@@ -120,7 +144,7 @@
# Start background thread that reads and processes RPC packets.
threading.Thread(target=read_and_process_data,
daemon=True,
- args=(device, frame_handlers)).start()
+ args=(read, frame_handlers)).start()
def rpcs(self, channel_id: int = None) -> Any:
"""Returns object for accessing services on the specified channel.
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 2325c61..2908be8 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -100,21 +100,18 @@
local_ns=local_variables, module=argparse.Namespace())
-class SocketClientImpl():
+class SocketClientImpl:
def __init__(self, config: str):
-
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server = ''
socket_port = 0
+
if config == 'default':
socket_server = SOCKET_SERVER
socket_port = SOCKET_PORT
else:
socket_server, socket_port_str = config.split(':')
- try:
- socket_port = int(socket_port_str)
- except ValueError as err:
- raise Exception('Invalid port number provided') from err
+ socket_port = int(socket_port_str)
self.socket.connect((socket_server, socket_port))
def write(self, data: bytes):
@@ -146,16 +143,20 @@
', '.join(proto_globs))
if socket_addr is None:
- client_device = serial.Serial(device, baudrate)
+ serial_device = serial.Serial(device, baudrate, timeout=1)
+ read = lambda: serial_device.read(8192)
+ write = serial_device.write
else:
try:
- client_device = SocketClientImpl(socket_addr)
- except ValueError as err:
- print("ValueError: {0}".format(err), file=sys.stderr)
- sys.exit(1)
+ socket_device = SocketClientImpl(socket_addr)
+ read = socket_device.read
+ write = socket_device.write
+ except ValueError:
+ _LOG.exception('Failed to initialize socket at %s', socket_addr)
+ return 1
_start_ipython_terminal(
- HdlcRpcClient(client_device, protos,
+ HdlcRpcClient(read, write, protos,
lambda data: write_to_file(data, output)))
return 0
diff --git a/pw_hdlc_lite/rpc_example/BUILD.gn b/pw_hdlc_lite/rpc_example/BUILD.gn
index 88ef124..2c934ae 100644
--- a/pw_hdlc_lite/rpc_example/BUILD.gn
+++ b/pw_hdlc_lite/rpc_example/BUILD.gn
@@ -40,5 +40,6 @@
pw_python_script("example_script") {
sources = [ "example_script.py" ]
+ python_deps = [ "$dir_pw_hdlc_lite/py" ]
pylintrc = "$dir_pigweed/.pylintrc"
}
diff --git a/pw_hdlc_lite/rpc_example/example_script.py b/pw_hdlc_lite/rpc_example/example_script.py
index 57726de..976cbb8 100755
--- a/pw_hdlc_lite/rpc_example/example_script.py
+++ b/pw_hdlc_lite/rpc_example/example_script.py
@@ -28,7 +28,8 @@
def script(device: str, baud: int) -> None:
# Set up a pw_rpc client that uses HDLC.
- client = HdlcRpcClient(serial.Serial(device, baud), [PROTO])
+ ser = serial.Serial(device, baud, timeout=0.01)
+ client = HdlcRpcClient(lambda: ser.read(4096), ser.write, [PROTO])
# Make a shortcut to the EchoService.
echo_service = client.rpcs().pw.rpc.EchoService