blob: 2e79b5a8cfad29d082e3a351f5bf01a31774c89d [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Utilities for using HDLC with pw_rpc."""
import logging
import sys
import threading
import time
from typing import Any, BinaryIO, Callable, Dict, Iterable, NoReturn
from pw_hdlc_lite.decode import Frame, FrameDecoder
from pw_hdlc_lite import encode
import pw_rpc
from pw_rpc import callback_client
from pw_protobuf_compiler import python_protos
_LOG = logging.getLogger(__name__)
STDOUT_ADDRESS = 1
DEFAULT_ADDRESS = ord('R')
def channel_output(writer: Callable[[bytes], Any],
address: int = DEFAULT_ADDRESS,
delay_s: float = 0) -> Callable[[bytes], None]:
"""Returns a function that can be used as a channel output for pw_rpc."""
if delay_s:
def slow_write(data: bytes) -> None:
"""Slows down writes in case unbuffered serial is in use."""
for byte in data:
time.sleep(delay_s)
writer(bytes([byte]))
return lambda data: slow_write(encode.information_frame(address, data))
return lambda data: writer(encode.information_frame(address, data))
def _handle_error(frame: Frame) -> None:
_LOG.error('Failed to parse frame: %s', frame.status.value)
_LOG.debug('%s', frame.data)
_FrameHandlers = Dict[int, Callable[[Frame], Any]]
def read_and_process_data(
device: BinaryIO,
frame_handlers: _FrameHandlers,
error_handler: Callable[[Frame], Any] = _handle_error) -> NoReturn:
"""Reads HDLC frames from the device and passes them to the RPC client."""
decoder = FrameDecoder()
while True:
byte = device.read()
for frame in decoder.process_valid_frames(byte):
if not frame.ok():
error_handler(frame)
continue
try:
frame_handlers[frame.address](frame)
except KeyError:
_LOG.error('Unhandled frame for address %d: %s', frame.address,
frame)
def write_to_file(data: bytes, output: BinaryIO = sys.stdout.buffer):
output.write(data)
output.write(b'\n')
output.flush()
class HdlcRpcClient:
"""An RPC client configured to run over HDLC."""
def __init__(self,
device: BinaryIO,
proto_paths_or_modules: Iterable[python_protos.PathOrModule],
output: Callable[[bytes], Any] = write_to_file,
channels: Iterable[pw_rpc.Channel] = None,
client_impl: pw_rpc.client.ClientImpl = None):
"""Creates an RPC client configured to communicate using HDLC.
Args:
device: serial.Serial (or any BinaryIO class) for reading/writing data
proto_paths_or_modules: paths to .proto files or proto modules
output: where to write "stdout" output from the device
"""
self.device = device
self.protos = python_protos.Library.from_paths(proto_paths_or_modules)
if channels is None:
channels = [pw_rpc.Channel(1, channel_output(device.write))]
if client_impl is None:
client_impl = callback_client.Impl()
self.client = pw_rpc.Client.from_modules(client_impl, channels,
self.protos.modules())
frame_handlers: _FrameHandlers = {
DEFAULT_ADDRESS: self._handle_rpc_packet,
STDOUT_ADDRESS: lambda frame: output(frame.data),
}
# Start background thread that reads and processes RPC packets.
threading.Thread(target=read_and_process_data,
daemon=True,
args=(device, frame_handlers)).start()
def rpcs(self, channel_id: int = None) -> Any:
"""Returns object for accessing services on the specified channel.
This skips some intermediate layers to make it simpler to invoke RPCs
from an HdlcRpcClient. If only one channel is in use, the channel ID is
not necessary.
"""
if channel_id is None:
return next(iter(self.client.channels())).rpcs
return self.client.channel(channel_id).rpcs
def _handle_rpc_packet(self, frame: Frame) -> None:
if not self.client.process_packet(frame.data):
_LOG.error('Packet not handled by RPC client: %s', frame.data)