pw_hdlc_lite: Client I/O improvements

- Separate threads from reading data and responding to callbacks.
- Abstract the HDLC read / write as functions, instead of a device
  object.

Change-Id: I3b0fd8e5ec2fc6cf65f377358479625a7e3ce540
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/27320
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
index 50ed66d..7ee008c 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
@@ -13,11 +13,12 @@
 # the License.
 """Utilities for using HDLC with pw_rpc."""
 
+from concurrent.futures import ThreadPoolExecutor
 import logging
 import sys
 import threading
 import time
-from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn
+from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn, Optional
 
 from pw_protobuf_compiler import python_protos
 import pw_rpc
@@ -47,7 +48,12 @@
 
         return lambda data: slow_write(encode.information_frame(address, data))
 
-    return lambda data: writer(encode.information_frame(address, data))
+    def write_hdlc(data: bytes):
+        frame = encode.information_frame(address, data)
+        _LOG.debug('Write %2d B: %s', len(frame), frame)
+        writer(frame)
+
+    return write_hdlc
 
 
 def _handle_error(frame: Frame) -> None:
@@ -58,25 +64,43 @@
 _FrameHandlers = Dict[int, Callable[[Frame], Any]]
 
 
-def read_and_process_data(
-        device: BinaryIO,
-        frame_handlers: _FrameHandlers,
-        error_handler: Callable[[Frame], Any] = _handle_error) -> NoReturn:
-    """Reads HDLC frames from the device and passes them to the RPC client."""
-    decoder = FrameDecoder()
+def read_and_process_data(read: Callable[[], bytes],
+                          frame_handlers: _FrameHandlers,
+                          error_handler: Callable[[Frame],
+                                                  Any] = _handle_error,
+                          handler_threads: Optional[int] = 1) -> NoReturn:
+    """Continuously reads and handles HDLC frames.
 
-    while True:
-        byte = device.read()
-        for frame in decoder.process_valid_frames(byte):
+    Passes frames to an executor that calls frame handler functions in other
+    threads.
+    """
+    def handle_frame(frame: Frame):
+        try:
             if not frame.ok():
                 error_handler(frame)
-                continue
+                return
 
             try:
                 frame_handlers[frame.address](frame)
             except KeyError:
-                _LOG.error('Unhandled frame for address %d: %s', frame.address,
-                           frame)
+                _LOG.warning('Unhandled frame for address %d: %s',
+                             frame.address, frame)
+        except:  # pylint: disable=bare-except
+            _LOG.exception('Exception in HDLC frame handler thread')
+
+    decoder = FrameDecoder()
+
+    # Execute callbacks in a ThreadPoolExecutor to decouple reading the input
+    # stream from handling the data. That way, if a handler function takes a
+    # long time or crashes, this reading thread is not interrupted.
+    with ThreadPoolExecutor(max_workers=handler_threads) as executor:
+        while True:
+            data = read()
+            if data:
+                _LOG.debug('Read %2d B: %s', len(data), data)
+
+                for frame in decoder.process_valid_frames(data):
+                    executor.submit(handle_frame, frame)
 
 
 def write_to_file(data: bytes, output: BinaryIO = sys.stdout.buffer):
@@ -88,7 +112,8 @@
 class HdlcRpcClient:
     """An RPC client configured to run over HDLC."""
     def __init__(self,
-                 device: Any,
+                 read: Callable[[], bytes],
+                 write: Callable[[bytes], Any],
                  proto_paths_or_modules: Iterable[python_protos.PathOrModule],
                  output: Callable[[bytes], Any] = write_to_file,
                  channels: Iterable[pw_rpc.Channel] = None,
@@ -96,16 +121,15 @@
         """Creates an RPC client configured to communicate using HDLC.
 
         Args:
-          device: serial.Serial (or any class that implements read and
-          write) for reading/writing data proto_paths_or_modules: paths
-          to .proto files or proto modules output: where to write
-          "stdout" output from the device
+          read: Function that reads bytes; e.g serial_device.read.
+          write: Function that writes bytes; e.g. serial_device.write
+          proto_paths_or_modules: paths to .proto files or proto modules
+          output: where to write "stdout" output from the device
         """
-        self.device = device
         self.protos = python_protos.Library.from_paths(proto_paths_or_modules)
 
         if channels is None:
-            channels = [pw_rpc.Channel(1, channel_output(device.write))]
+            channels = [pw_rpc.Channel(1, channel_output(write))]
 
         if client_impl is None:
             client_impl = callback_client.Impl()
@@ -120,7 +144,7 @@
         # Start background thread that reads and processes RPC packets.
         threading.Thread(target=read_and_process_data,
                          daemon=True,
-                         args=(device, frame_handlers)).start()
+                         args=(read, frame_handlers)).start()
 
     def rpcs(self, channel_id: int = None) -> Any:
         """Returns object for accessing services on the specified channel.
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 2325c61..2908be8 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -100,21 +100,18 @@
         local_ns=local_variables, module=argparse.Namespace())
 
 
-class SocketClientImpl():
+class SocketClientImpl:
     def __init__(self, config: str):
-
         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         socket_server = ''
         socket_port = 0
+
         if config == 'default':
             socket_server = SOCKET_SERVER
             socket_port = SOCKET_PORT
         else:
             socket_server, socket_port_str = config.split(':')
-            try:
-                socket_port = int(socket_port_str)
-            except ValueError as err:
-                raise Exception('Invalid port number provided') from err
+            socket_port = int(socket_port_str)
         self.socket.connect((socket_server, socket_port))
 
     def write(self, data: bytes):
@@ -146,16 +143,20 @@
                ', '.join(proto_globs))
 
     if socket_addr is None:
-        client_device = serial.Serial(device, baudrate)
+        serial_device = serial.Serial(device, baudrate, timeout=1)
+        read = lambda: serial_device.read(8192)
+        write = serial_device.write
     else:
         try:
-            client_device = SocketClientImpl(socket_addr)
-        except ValueError as err:
-            print("ValueError: {0}".format(err), file=sys.stderr)
-            sys.exit(1)
+            socket_device = SocketClientImpl(socket_addr)
+            read = socket_device.read
+            write = socket_device.write
+        except ValueError:
+            _LOG.exception('Failed to initialize socket at %s', socket_addr)
+            return 1
 
     _start_ipython_terminal(
-        HdlcRpcClient(client_device, protos,
+        HdlcRpcClient(read, write, protos,
                       lambda data: write_to_file(data, output)))
     return 0
 
diff --git a/pw_hdlc_lite/rpc_example/BUILD.gn b/pw_hdlc_lite/rpc_example/BUILD.gn
index 88ef124..2c934ae 100644
--- a/pw_hdlc_lite/rpc_example/BUILD.gn
+++ b/pw_hdlc_lite/rpc_example/BUILD.gn
@@ -40,5 +40,6 @@
 
 pw_python_script("example_script") {
   sources = [ "example_script.py" ]
+  python_deps = [ "$dir_pw_hdlc_lite/py" ]
   pylintrc = "$dir_pigweed/.pylintrc"
 }
diff --git a/pw_hdlc_lite/rpc_example/example_script.py b/pw_hdlc_lite/rpc_example/example_script.py
index 57726de..976cbb8 100755
--- a/pw_hdlc_lite/rpc_example/example_script.py
+++ b/pw_hdlc_lite/rpc_example/example_script.py
@@ -28,7 +28,8 @@
 
 def script(device: str, baud: int) -> None:
     # Set up a pw_rpc client that uses HDLC.
-    client = HdlcRpcClient(serial.Serial(device, baud), [PROTO])
+    ser = serial.Serial(device, baud, timeout=0.01)
+    client = HdlcRpcClient(lambda: ser.read(4096), ser.write, [PROTO])
 
     # Make a shortcut to the EchoService.
     echo_service = client.rpcs().pw.rpc.EchoService