| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Console for interacting with pw_rpc over HDLC. |
| |
| To start the console, provide a serial port as the --device argument and paths |
| or globs for .proto files that define the RPC services to support: |
| |
| python -m pw_hdlc.rpc_console --device /dev/ttyUSB0 sample.proto |
| |
| This starts an IPython console for communicating with the connected device. A |
| few variables are predefined in the interactive console. These include: |
| |
| rpcs - used to invoke RPCs |
| device - the serial device used for communication |
| client - the pw_rpc.Client |
| protos - protocol buffer messages indexed by proto package |
| |
| An example echo RPC command: |
| |
| rpcs.pw.rpc.EchoService.Echo(msg="hello!") |
| """ |
| |
| import argparse |
| import glob |
| from inspect import cleandoc |
| import logging |
| from pathlib import Path |
| import sys |
| from types import ModuleType |
| from typing import ( |
| Any, |
| BinaryIO, |
| Collection, |
| Iterable, |
| Iterator, |
| List, |
| Optional, |
| Union, |
| ) |
| import socket |
| |
| import serial # type: ignore |
| |
| import pw_cli.log |
| import pw_console.python_logging |
| from pw_console import PwConsoleEmbed |
| from pw_console.pyserial_wrapper import SerialWithLogging |
| from pw_console.plugins.bandwidth_toolbar import BandwidthToolbar |
| |
| from pw_log.proto import log_pb2 |
| from pw_rpc.console_tools.console import ClientInfo, flattened_rpc_completions |
| from pw_rpc import callback_client |
| from pw_tokenizer.database import LoadTokenDatabases |
| from pw_tokenizer.detokenize import Detokenizer, detokenize_base64 |
| from pw_tokenizer import tokens |
| |
| from pw_hdlc.rpc import HdlcRpcClient, default_channels |
| |
| _LOG = logging.getLogger(__name__) |
| _DEVICE_LOG = logging.getLogger('rpc_device') |
| |
| PW_RPC_MAX_PACKET_SIZE = 256 |
| SOCKET_SERVER = 'localhost' |
| SOCKET_PORT = 33000 |
| MKFIFO_MODE = 0o666 |
| |
| |
| def _parse_args(): |
| """Parses and returns the command line arguments.""" |
| parser = argparse.ArgumentParser(description=__doc__) |
| group = parser.add_mutually_exclusive_group(required=True) |
| group.add_argument('-d', '--device', help='the serial port to use') |
| parser.add_argument('-b', |
| '--baudrate', |
| type=int, |
| default=115200, |
| help='the baud rate to use') |
| parser.add_argument( |
| '--serial-debug', |
| action='store_true', |
| help=('Enable debug log tracing of all data passed through' |
| 'pyserial read and write.')) |
| parser.add_argument( |
| '-o', |
| '--output', |
| type=argparse.FileType('wb'), |
| default=sys.stdout.buffer, |
| help=('The file to which to write device output (HDLC channel 1); ' |
| 'provide - or omit for stdout.')) |
| parser.add_argument('--logfile', help='Console debug log file.') |
| group.add_argument('-s', |
| '--socket-addr', |
| type=str, |
| help='use socket to connect to server, type default for\ |
| localhost:33000, or manually input the server address:port') |
| parser.add_argument("--token-databases", |
| metavar='elf_or_token_database', |
| nargs="+", |
| action=LoadTokenDatabases, |
| help="Path to tokenizer database csv file(s).") |
| parser.add_argument('--config-file', |
| type=Path, |
| help='Path to a pw_console yaml config file.') |
| parser.add_argument('--proto-globs', |
| nargs='+', |
| help='glob pattern for .proto files') |
| return parser.parse_args() |
| |
| |
| def _expand_globs(globs: Iterable[str]) -> Iterator[Path]: |
| for pattern in globs: |
| for file in glob.glob(pattern, recursive=True): |
| yield Path(file) |
| |
| |
| def _start_ipython_terminal(client: HdlcRpcClient, |
| serial_debug: bool = False, |
| config_file_path: Optional[Path] = None) -> None: |
| """Starts an interactive IPython terminal with preset variables.""" |
| local_variables = dict( |
| client=client, |
| device=client.client.channel(1), |
| rpcs=client.client.channel(1).rpcs, |
| protos=client.protos.packages, |
| # Include the active pane logger for creating logs in the repl. |
| DEVICE_LOG=_DEVICE_LOG, |
| LOG=logging.getLogger(), |
| ) |
| |
| welcome_message = cleandoc(""" |
| Welcome to the Pigweed Console! |
| |
| Help: Press F1 or click the [Help] menu |
| To move focus: Press Shift-Tab or click on a window |
| |
| Example Python commands: |
| |
| device.rpcs.pw.rpc.EchoService.Echo(msg='hello!') |
| LOG.warning('Message appears in Host Logs window.') |
| DEVICE_LOG.warning('Message appears in Device Logs window.') |
| """) |
| |
| client_info = ClientInfo('device', |
| client.client.channel(1).rpcs, client.client) |
| completions = flattened_rpc_completions([client_info]) |
| |
| log_windows = { |
| 'Device Logs': [_DEVICE_LOG], |
| 'Host Logs': [logging.getLogger()], |
| } |
| if serial_debug: |
| log_windows['Serial Debug'] = [ |
| logging.getLogger('pw_console.serial_debug_logger') |
| ] |
| |
| interactive_console = PwConsoleEmbed( |
| global_vars=local_variables, |
| local_vars=None, |
| loggers=log_windows, |
| repl_startup_message=welcome_message, |
| help_text=__doc__, |
| config_file_path=config_file_path, |
| ) |
| interactive_console.hide_windows('Host Logs') |
| interactive_console.add_sentence_completer(completions) |
| if serial_debug: |
| interactive_console.add_bottom_toolbar(BandwidthToolbar()) |
| |
| # Setup Python logger propagation |
| interactive_console.setup_python_logging() |
| |
| # Don't send device logs to the root logger. |
| _DEVICE_LOG.propagate = False |
| |
| interactive_console.embed() |
| |
| |
| class SocketClientImpl: |
| def __init__(self, config: str): |
| self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| socket_server = '' |
| socket_port = 0 |
| |
| if config == 'default': |
| socket_server = SOCKET_SERVER |
| socket_port = SOCKET_PORT |
| else: |
| socket_server, socket_port_str = config.split(':') |
| socket_port = int(socket_port_str) |
| self.socket.connect((socket_server, socket_port)) |
| |
| def write(self, data: bytes): |
| self.socket.sendall(data) |
| |
| def read(self, num_bytes: int = PW_RPC_MAX_PACKET_SIZE): |
| return self.socket.recv(num_bytes) |
| |
| |
| def console(device: str, |
| baudrate: int, |
| proto_globs: Collection[str], |
| token_databases: Collection[tokens.Database], |
| socket_addr: str, |
| logfile: str, |
| output: Any, |
| serial_debug: bool = False, |
| config_file: Optional[Path] = None) -> int: |
| """Starts an interactive RPC console for HDLC.""" |
| # argparse.FileType doesn't correctly handle '-' for binary files. |
| if output is sys.stdout: |
| output = sys.stdout.buffer |
| |
| if not logfile: |
| # Create a temp logfile to prevent logs from appearing over stdout. This |
| # would corrupt the prompt toolkit UI. |
| logfile = pw_console.python_logging.create_temp_log_file() |
| pw_cli.log.install(logging.INFO, True, False, logfile) |
| |
| detokenizer = None |
| if token_databases: |
| detokenizer = Detokenizer(tokens.Database.merged(*token_databases), |
| show_errors=False) |
| |
| if not proto_globs: |
| proto_globs = ['**/*.proto'] |
| |
| protos: List[Union[ModuleType, Path]] = list(_expand_globs(proto_globs)) |
| |
| # Append compiled log.proto library to avoid include errors when manually |
| # provided, and shadowing errors due to ordering when the default global |
| # search path is used. |
| protos.append(log_pb2) |
| |
| if not protos: |
| _LOG.critical('No .proto files were found with %s', |
| ', '.join(proto_globs)) |
| _LOG.critical('At least one .proto file is required') |
| return 1 |
| |
| _LOG.debug('Found %d .proto files found with %s', len(protos), |
| ', '.join(proto_globs)) |
| |
| serial_impl = serial.Serial |
| if serial_debug: |
| serial_impl = SerialWithLogging |
| |
| if socket_addr is None: |
| serial_device = serial_impl( |
| device, |
| baudrate, |
| timeout=0, # Non-blocking mode |
| ) |
| read = lambda: serial_device.read(8192) |
| write = serial_device.write |
| else: |
| try: |
| socket_device = SocketClientImpl(socket_addr) |
| read = socket_device.read |
| write = socket_device.write |
| except ValueError: |
| _LOG.exception('Failed to initialize socket at %s', socket_addr) |
| return 1 |
| |
| callback_client_impl = callback_client.Impl( |
| default_unary_timeout_s=5.0, |
| default_stream_timeout_s=None, |
| ) |
| _start_ipython_terminal( |
| HdlcRpcClient(read, |
| protos, |
| default_channels(write), |
| lambda data: detokenize_and_write_to_output( |
| data, output, detokenizer), |
| client_impl=callback_client_impl), serial_debug, |
| config_file) |
| return 0 |
| |
| |
| def detokenize_and_write_to_output(data: bytes, |
| unused_output: BinaryIO = sys.stdout.buffer, |
| detokenizer=None): |
| log_line = data |
| if detokenizer: |
| log_line = detokenize_base64(detokenizer, data) |
| |
| for line in log_line.decode(errors="surrogateescape").splitlines(): |
| _DEVICE_LOG.info(line) |
| |
| |
| def main() -> int: |
| return console(**vars(_parse_args())) |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main()) |