pw_hdlc_lite: Pipe console device output to file
Add the -o/--output option to the rpc_console. Data sent to HDLC address
1 is written to this file.
Change-Id: If861d9658bf8b842725aa4ea6b883a018a4743d0
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/18351
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 443d50d..30d8d63 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -33,10 +33,11 @@
import argparse
import glob
import logging
+import os
from pathlib import Path
import sys
import threading
-from typing import Collection, Iterable, Iterator, NoReturn
+from typing import Collection, Iterable, Iterator, NoReturn, BinaryIO
import IPython
import serial
@@ -66,11 +67,21 @@
dest='proto_globs',
action='append',
help='glob pattern for .proto files')
+ parser.add_argument(
+ '-o',
+ '--output',
+ type=argparse.FileType('wb'),
+ default=sys.stdout.buffer,
+ help=('The file to which to write device output (HDLC channel 1); '
+ 'provide - or omit for stdout.'))
return parser.parse_args()
-def read_and_process_data(rpc_client: Client,
- device: serial.Serial) -> NoReturn:
+def read_and_process_data(
+ rpc_client: Client,
+ device: serial.Serial,
+ output: BinaryIO,
+ output_sep: bytes = os.linesep.encode()) -> NoReturn:
"""Reads HDLC frames from the device and passes them to the RPC client."""
decode = decoder.FrameDecoder()
@@ -85,7 +96,9 @@
if not rpc_client.process_packet(frame.data):
_LOG.error('Packet not handled by rpc client: %s', frame)
elif frame.address == 1:
- print(f'{device.port}:', frame.data.decode(errors='replace'))
+ output.write(frame.data)
+ output.write(output_sep)
+ output.flush()
else:
_LOG.error('Unhandled frame for address %d: %s', frame.address,
frame.data.decode(errors='replace'))
@@ -105,7 +118,7 @@
IPython.terminal.embed.InteractiveShellEmbed(banner1=__doc__)()
-def console(device: serial.Serial, protos: Iterable[Path]) -> None:
+def console(device: serial.Serial, protos: Iterable[Path], output) -> None:
"""Starts an interactive RPC console for HDLC.
Args:
@@ -123,13 +136,17 @@
# Start background thread that reads serial data and processes RPC packets.
threading.Thread(target=read_and_process_data,
daemon=True,
- args=(client, device)).start()
+ args=(client, device, output)).start()
_start_ipython_terminal(device, client)
-def _prepare_console(device: str, baudrate: int,
- proto_globs: Collection[str]) -> int:
+def _prepare_console(device: str, baudrate: int, proto_globs: Collection[str],
+ output: BinaryIO) -> int:
+ # argparse.FileType doesn't correctly handle '-' for binary files.
+ if output is sys.stdout:
+ output = sys.stdout.buffer
+
if not proto_globs:
proto_globs = ['**/*.proto']
@@ -144,7 +161,7 @@
_LOG.debug('Found %d .proto files found with %s', len(protos),
', '.join(proto_globs))
- console(serial.Serial(device, baudrate), protos)
+ console(serial.Serial(device, baudrate), protos, output)
return 0