pw_hdlc_lite: RPC console updates
- Move some code from rpc_console.py to rpc.py.
- Pass a specific set of variables to IPython instead of pulling in all
of locals().
Change-Id: I933fba58936908660dd7a66c37001d7d6e141072
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/18380
Reviewed-by: Keir Mierle <keir@google.com>
Commit-Queue: Wyatt Hepler <hepler@google.com>
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
index 165b470..1246ff6 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc.py
@@ -13,11 +13,20 @@
# the License.
"""Utilities for using HDLC with pw_rpc."""
+import logging
+import os
import time
-from typing import Any, Callable
+from typing import Any, Callable, NoReturn, BinaryIO
+import serial
+
+from pw_hdlc_lite.decoder import FrameDecoder
from pw_hdlc_lite.encoder import encode_information_frame
+from pw_rpc.client import Client
+_LOG = logging.getLogger(__name__)
+
+STDOUT_ADDRESS = 1
DEFAULT_ADDRESS = ord('R')
@@ -37,3 +46,30 @@
return lambda data: slow_write(encode_information_frame(address, data))
return lambda data: writer(encode_information_frame(address, data))
+
+
+def read_and_process_data(rpc_client: Client,
+ device: serial.Serial,
+ output: BinaryIO,
+ output_sep: bytes = os.linesep.encode(),
+ rpc_address: int = DEFAULT_ADDRESS) -> NoReturn:
+ """Reads HDLC frames from the device and passes them to the RPC client."""
+ decoder = FrameDecoder()
+
+ while True:
+ byte = device.read()
+ for frame in decoder.process_valid_frames(byte):
+ if not frame.ok():
+ _LOG.error('Failed to parse frame: %s', frame.status.value)
+ continue
+
+ if frame.address == rpc_address:
+ if not rpc_client.process_packet(frame.data):
+ _LOG.error('Packet not handled by RPC client: %s', frame)
+ elif frame.address == STDOUT_ADDRESS:
+ output.write(frame.data)
+ output.write(output_sep)
+ output.flush()
+ else:
+ _LOG.error('Unhandled frame for address %d: %s', frame.address,
+ frame.data.decoder(errors='replace'))
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
index 30d8d63..11bbf18 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/rpc_console.py
@@ -18,10 +18,10 @@
python -m pw_hdlc_lite.rpc_console --device /dev/ttyUSB0 --protos my.proto
-This will start an IPython console for communicating with the connected device.
-A few variables will be defined:
+This starts an IPython console for communicating with the connected device. A
+few variables are predefined in the interactive console. These include:
- rpcs - used to invoke RPCs
+ rpcs - used to invoke RPCs
device - the serial device used for communication
client - the pw_rpc.Client
@@ -33,16 +33,15 @@
import argparse
import glob
import logging
-import os
from pathlib import Path
import sys
import threading
-from typing import Collection, Iterable, Iterator, NoReturn, BinaryIO
+from typing import Collection, Iterable, Iterator, BinaryIO
import IPython
import serial
-from pw_hdlc_lite import decoder, rpc
+from pw_hdlc_lite import rpc
from pw_protobuf_compiler import python_protos
from pw_rpc import callback_client, descriptors
from pw_rpc.client import Client
@@ -77,48 +76,28 @@
return parser.parse_args()
-def read_and_process_data(
- rpc_client: Client,
- device: serial.Serial,
- output: BinaryIO,
- output_sep: bytes = os.linesep.encode()) -> NoReturn:
- """Reads HDLC frames from the device and passes them to the RPC client."""
- decode = decoder.FrameDecoder()
-
- while True:
- byte = device.read()
- for frame in decode.process_valid_frames(byte):
- if not frame.ok():
- _LOG.error('Failed to parse frame: %s', frame.status.value)
- continue
-
- if frame.address == rpc.DEFAULT_ADDRESS:
- if not rpc_client.process_packet(frame.data):
- _LOG.error('Packet not handled by rpc client: %s', frame)
- elif frame.address == 1:
- output.write(frame.data)
- output.write(output_sep)
- output.flush()
- else:
- _LOG.error('Unhandled frame for address %d: %s', frame.address,
- frame.data.decode(errors='replace'))
-
-
def _expand_globs(globs: Iterable[str]) -> Iterator[Path]:
for pattern in globs:
for file in glob.glob(pattern, recursive=True):
yield Path(file)
-def _start_ipython_terminal( # pylint: disable=unused-argument
- device: serial.Serial, client: Client) -> None:
- """Starts IPython with local variables available."""
- channel_client = client.channel(1) # pylint: disable=unused-variable
- rpcs = channel_client.rpcs # pylint: disable=unused-variable
- IPython.terminal.embed.InteractiveShellEmbed(banner1=__doc__)()
+def _start_ipython_terminal(device: serial.Serial, client: Client) -> None:
+ """Starts an interactive IPython terminal with preset variables."""
+ local_variables = dict(
+ client=client,
+ device=device,
+ channel_client=client.channel(1),
+ rpcs=client.channel(1).rpcs,
+ )
+ module = argparse.Namespace() # serves as an empty module
+
+ IPython.terminal.embed.InteractiveShellEmbed(banner1=__doc__).mainloop(
+ local_variables, module)
-def console(device: serial.Serial, protos: Iterable[Path], output) -> None:
+def console(device: serial.Serial, protos: Iterable[Path],
+ output: BinaryIO) -> None:
"""Starts an interactive RPC console for HDLC.
Args:
@@ -129,12 +108,12 @@
# Compile the proto files that define the RPC services to expose.
modules = python_protos.compile_and_import(protos)
- # Set up the pw_rpc server.
+ # Set up the pw_rpc server with a single channel with ID 1.
channel = descriptors.Channel(1, rpc.channel_output(device.write))
client = Client.from_modules(callback_client.Impl(), [channel], modules)
# Start background thread that reads serial data and processes RPC packets.
- threading.Thread(target=read_and_process_data,
+ threading.Thread(target=rpc.read_and_process_data,
daemon=True,
args=(client, device, output)).start()