pw_hdlc_lite: HdlcRpcClient fixes
- Allow printing HDLC "stdout" to any file in HdlcRpcClient.
- Allow Detokenizer or AutoUpdatingDetokenizer in pw_tokenizer
Change-Id: I8591cc124b4b1ecf6f00f419191dbfc1f913d213
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/23923
Commit-Queue: Keir Mierle <keir@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
index aa67b8d..57b74b2 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
@@ -14,7 +14,6 @@
"""Utilities for using HDLC with pw_rpc."""
import logging
-import os
from pathlib import Path
import sys
import threading
@@ -54,8 +53,7 @@
def read_and_process_data(rpc_client: pw_rpc.Client,
device: BinaryIO,
- output: BinaryIO,
- output_sep: bytes = os.linesep.encode(),
+ output: Callable[[bytes], Any],
rpc_address: int = DEFAULT_ADDRESS) -> NoReturn:
"""Reads HDLC frames from the device and passes them to the RPC client."""
decoder = FrameDecoder()
@@ -71,9 +69,7 @@
if not rpc_client.process_packet(frame.data):
_LOG.error('Packet not handled by RPC client: %s', frame)
elif frame.address == STDOUT_ADDRESS:
- output.write(frame.data)
- output.write(output_sep)
- output.flush()
+ output(frame.data)
else:
_LOG.error('Unhandled frame for address %d: %s', frame.address,
frame.data.decode(errors='replace'))
@@ -82,12 +78,18 @@
_PathOrModule = Union[str, Path, ModuleType]
+def write_to_file(data: bytes, output: BinaryIO = sys.stdout.buffer):
+ output.write(data)
+ output.write(b'\n')
+ output.flush()
+
+
class HdlcRpcClient:
"""An RPC client configured to run over HDLC."""
def __init__(self,
device: BinaryIO,
proto_paths_or_modules: Iterable[_PathOrModule],
- output: BinaryIO = sys.stdout.buffer,
+ output: Callable[[bytes], Any] = write_to_file,
channels: Iterable[pw_rpc.Channel] = None,
client_impl: pw_rpc.client.ClientImpl = None):
"""Creates an RPC client configured to communicate using HDLC.
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 0ebeffe..1e3b929 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -40,7 +40,7 @@
import IPython # type: ignore
import serial # type: ignore
-from pw_hdlc_lite.rpc import HdlcRpcClient
+from pw_hdlc_lite.rpc import HdlcRpcClient, write_to_file
_LOG = logging.getLogger(__name__)
@@ -111,7 +111,8 @@
', '.join(proto_globs))
_start_ipython_terminal(
- HdlcRpcClient(serial.Serial(device, baudrate), protos, output))
+ HdlcRpcClient(serial.Serial(device, baudrate), protos,
+ lambda data: write_to_file(data, output)))
return 0
diff --git a/pw_tokenizer/py/pw_tokenizer/detokenize.py b/pw_tokenizer/py/pw_tokenizer/detokenize.py
index b5a0b7b..0e349a9 100755
--- a/pw_tokenizer/py/pw_tokenizer/detokenize.py
+++ b/pw_tokenizer/py/pw_tokenizer/detokenize.py
@@ -256,6 +256,9 @@
return self._detokenizer.detokenize(data)
+_Detokenizer = Union[Detokenizer, AutoUpdatingDetokenizer]
+
+
class PrefixedMessageDecoder:
"""Parses messages that start with a prefix character from a byte stream."""
def __init__(self, prefix: Union[str, bytes], chars: Union[str, bytes]):
@@ -324,7 +327,7 @@
def _detokenize_prefixed_base64(
- detokenizer: Detokenizer, prefix: bytes,
+ detokenizer: _Detokenizer, prefix: bytes,
recursion: int) -> Callable[[Match[bytes]], bytes]:
"""Returns a function that decodes prefixed Base64 with the detokenizer."""
def decode_and_detokenize(match: Match[bytes]) -> bytes:
@@ -365,7 +368,7 @@
br'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?'))
-def detokenize_base64_live(detokenizer: Detokenizer,
+def detokenize_base64_live(detokenizer: _Detokenizer,
input_file: BinaryIO,
output: BinaryIO,
prefix: Union[str, bytes] = BASE64_PREFIX,
@@ -390,7 +393,7 @@
output.flush()
-def detokenize_base64_to_file(detokenizer: Detokenizer,
+def detokenize_base64_to_file(detokenizer: _Detokenizer,
data: bytes,
output: BinaryIO,
prefix: Union[str, bytes] = BASE64_PREFIX,
@@ -402,7 +405,7 @@
_detokenize_prefixed_base64(detokenizer, prefix, recursion), data))
-def detokenize_base64(detokenizer: Detokenizer,
+def detokenize_base64(detokenizer: _Detokenizer,
data: bytes,
prefix: Union[str, bytes] = BASE64_PREFIX,
recursion: int = DEFAULT_RECURSION) -> bytes: