blob: c74b6617396e59bce41daec31e1f6de356784059 [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Console for interacting with CHIP lighting app over RPC.
To start the console, provide a serial port as the --device argument
python -m chip_rpc.console --device /dev/ttyUSB0
Alternatively to connect to a linux CHIP device provide the port.
python -m chip_rpc.console -s localhost:33000
This starts an IPython console for communicating with the connected device. A
few variables are predefined in the interactive console. These include:
rpcs - used to invoke RPCs
device - the serial device used for communication
client - the pw_rpc.Client
protos - protocol buffer messages indexed by proto package
An example RPC command:
rpcs.chip.rpc.DeviceCommon.GetDeviceInfo()
"""
import argparse
from typing import Callable
from collections import namedtuple
from inspect import cleandoc
import logging
import re
import socket
from concurrent.futures import ThreadPoolExecutor
import sys
import threading
from typing import Any, BinaryIO, Collection
from chip_rpc.plugins.device_toolbar import DeviceToolbar
from chip_rpc.plugins.helper_scripts import HelperScripts
import pw_cli.log
from pw_console import PwConsoleEmbed
from pw_console.__main__ import create_temp_log_file
from pw_console.pyserial_wrapper import SerialWithLogging
from pw_hdlc.rpc import HdlcRpcClient, default_channels
from pw_rpc import callback_client
from pw_rpc.console_tools.console import ClientInfo, flattened_rpc_completions
from pw_tokenizer.database import LoadTokenDatabases
from pw_tokenizer.detokenize import Detokenizer, detokenize_base64
from pw_tokenizer import tokens
# Protos
from attributes_service import attributes_service_pb2
from button_service import button_service_pb2
from descriptor_service import descriptor_service_pb2
from device_service import device_service_pb2
from echo_service import echo_pb2
from lighting_service import lighting_service_pb2
from locking_service import locking_service_pb2
from ot_cli_service import ot_cli_service_pb2
from thread_service import thread_service_pb2
from wifi_service import wifi_service_pb2
_LOG = logging.getLogger(__name__)
_DEVICE_LOG = logging.getLogger('rpc_device')
PW_RPC_MAX_PACKET_SIZE = 256
SOCKET_SERVER = 'localhost'
SOCKET_PORT = 33000
PROTOS = [attributes_service_pb2,
button_service_pb2,
descriptor_service_pb2,
device_service_pb2,
echo_pb2,
lighting_service_pb2,
locking_service_pb2,
ot_cli_service_pb2,
thread_service_pb2,
wifi_service_pb2]
def _parse_args():
"""Parses and returns the command line arguments."""
parser = argparse.ArgumentParser(description=__doc__)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-d', '--device', help='the serial port to use')
parser.add_argument('-b',
'--baudrate',
type=int,
default=115200,
help='the baud rate to use')
parser.add_argument(
'-o',
'--output',
type=argparse.FileType('wb'),
default=sys.stdout.buffer,
help=('The file to which to write device output (HDLC channel 1); '
'provide - or omit for stdout.'))
parser.add_argument(
'-r',
'--raw_serial',
action="store_true",
help=('Use raw serial instead of HDLC/RPC'))
parser.add_argument("--token-databases",
metavar='elf_or_token_database',
nargs="+",
action=LoadTokenDatabases,
help="Path to tokenizer database csv file(s).")
group.add_argument('-s',
'--socket-addr',
type=str,
help='use socket to connect to server, type default for\
localhost:33000, or manually input the server address:port')
return parser.parse_args()
def _start_ipython_raw_terminal() -> None:
"""Starts an interactive IPython terminal with preset variables. This raw
terminal does not use HDLC and provides no RPC functionality, this is
just a serial log viewer."""
local_variables = dict(
LOG=_DEVICE_LOG,
)
welcome_message = cleandoc("""
Welcome to the CHIP Console!
This has been started in raw serial mode,
and all RPC functionality is disabled.
Press F1 for help.
""")
interactive_console = PwConsoleEmbed(
global_vars=local_variables,
local_vars=None,
loggers={
'Device Logs': [_DEVICE_LOG],
'Host Logs': [logging.getLogger()],
'Serial Debug': [logging.getLogger('pw_console.serial_debug_logger')],
},
repl_startup_message=welcome_message,
help_text=__doc__,
app_title="CHIP Console",
)
interactive_console.hide_windows('Host Logs')
interactive_console.hide_windows('Serial Debug')
interactive_console.hide_windows('Python Repl')
# Setup Python logger propagation
interactive_console.setup_python_logging()
# Don't send device logs to the root logger.
_DEVICE_LOG.propagate = False
interactive_console.embed()
def _start_ipython_hdlc_terminal(client: HdlcRpcClient) -> None:
"""Starts an interactive IPython terminal with preset variables."""
local_variables = dict(
client=client,
channel_client=client.client.channel(1),
rpcs=client.client.channel(1).rpcs,
scripts=HelperScripts(client.client.channel(1).rpcs),
protos=client.protos.packages,
# Include the active pane logger for creating logs in the repl.
LOG=_DEVICE_LOG,
)
client_info = ClientInfo('channel_client',
client.client.channel(1).rpcs, client.client)
completions = flattened_rpc_completions([client_info])
welcome_message = cleandoc("""
Welcome to the CHIP RPC Console!
Press F1 for help.
Example commands:
rpcs.chip.rpc.Device.GetDeviceInfo()
LOG.warning('Message appears console log window.')
""")
interactive_console = PwConsoleEmbed(
global_vars=local_variables,
local_vars=None,
loggers={
'Device Logs': [_DEVICE_LOG],
'Host Logs': [logging.getLogger()],
'Serial Debug': [logging.getLogger('pw_console.serial_debug_logger')],
},
repl_startup_message=welcome_message,
help_text=__doc__,
app_title="CHIP Console",
)
interactive_console.add_sentence_completer(completions)
interactive_console.add_bottom_toolbar(
DeviceToolbar(client.client.channel(1).rpcs))
interactive_console.hide_windows('Host Logs')
interactive_console.hide_windows('Serial Debug')
# Setup Python logger propagation
interactive_console.setup_python_logging()
# Don't send device logs to the root logger.
_DEVICE_LOG.propagate = False
interactive_console.embed()
class SocketClientImpl:
def __init__(self, config: str):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server = ''
socket_port = 0
if config == 'default':
socket_server = SOCKET_SERVER
socket_port = SOCKET_PORT
else:
socket_server, socket_port_str = config.split(':')
socket_port = int(socket_port_str)
self.socket.connect((socket_server, socket_port))
def write(self, data: bytes):
self.socket.sendall(data)
def read(self, num_bytes: int = PW_RPC_MAX_PACKET_SIZE):
return self.socket.recv(num_bytes)
def write_to_output(data: bytes,
unused_output: BinaryIO = sys.stdout.buffer,
detokenizer=None):
log_line = data
RegexStruct = namedtuple('RegexStruct', 'platform type regex match_num')
LEVEL_MAPPING = {"I": logging.INFO, "W": logging.WARNING, "P": logging.INFO,
"E": logging.ERROR, "F": logging.FATAL, "V": logging.DEBUG, "D": logging.DEBUG,
"<inf>": logging.INFO, "<dbg>": logging.DEBUG, "<err>": logging.ERROR,
"<info >": logging.INFO, "<warn >": logging.WARNING,
"<error >": logging.ERROR, "<detail>": logging.DEBUG,
"ERR": logging.ERROR, "DBG": logging.DEBUG, "INF": logging.INFO}
ESP_CHIP_REGEX = r"(?P<level>[IWEFV]) \((?P<time>\d+)\) (?P<mod>chip\[[a-zA-Z]+\]):\s(?P<msg>.*)"
ESP_APP_REGEX = r"(?P<level>[IWEFVD]) \((?P<time>\d+)\) (?P<mod>[a-z\-_A-Z]+):\s(?P<msg>.*)"
EFR_CHIP_REGEX = r"(?P<level><detail>|<info >|<error >|<warn >)\s(?P<mod>\[[a-zA-Z\-]+\])\s(?P<msg>.*)"
EFR_APP_REGEX = r"<efr32 >\s(?P<msg>.*)"
NRF_CHIP_REGEX = r"\[(?P<time>\d+)\] (?P<level><inf>|<dbg>|<err>) chip.*: \[(?P<mod>[a-z\-A-Z]+)\](?P<msg>.*)"
NRF_APP_REGEX = r"\[(?P<time>\d+)\] (?P<level><inf>|<dbg>|<err>) (?P<msg>.*)"
NXP_CHIP_REGEX = r"\[(?P<time>\d+)\]\[(?P<level>[EPDF])\]\[(?P<mod>[a-z\-A-Z]+)\](?P<msg>.*)"
NXP_APP_REGEX = r"\[(?P<time>\d+)\]\[(?P<mod>[a-z\-A-Z]+)\](?P<msg>.*)"
LINUX_REGEX = r".*(?P<level>INF|DBG|ERR).*\s+\[(?P<time>[0-9]+\.?[0-9]*)\]\[(?P<pid>\d+)\:(?P<tid>\d+)\] CHIP:(?P<mod>[a-z\-A-Z]+)\: (?P<msg>.*)"
LogRegexes = [RegexStruct("ESP", "CHIP", re.compile(ESP_CHIP_REGEX), 4),
RegexStruct("ESP", "APP", re.compile(ESP_APP_REGEX), 4),
RegexStruct("EFR", "CHIP", re.compile(EFR_CHIP_REGEX), 3),
RegexStruct("EFR", "APP", re.compile(EFR_APP_REGEX), 1),
RegexStruct("NRF", "CHIP", re.compile(NRF_CHIP_REGEX), 4),
RegexStruct("NRF", "APP", re.compile(NRF_APP_REGEX), 3),
RegexStruct("NXP", "CHIP", re.compile(NXP_CHIP_REGEX), 4),
RegexStruct("NXP", "APP", re.compile(NXP_APP_REGEX), 3),
RegexStruct("LINUX", "CHIP", re.compile(LINUX_REGEX), 6)
]
for line in log_line.decode(errors="surrogateescape").splitlines():
fields = {'level': logging.INFO, "time": "",
"mod": "", "type": "", "msg": line}
for log_regex in LogRegexes:
match = log_regex.regex.search(line)
if match and len(match.groups()) == log_regex.match_num:
fields['type'] = log_regex.type
fields.update(match.groupdict())
if "level" in match.groupdict():
fields["level"] = LEVEL_MAPPING[fields["level"]]
if detokenizer:
_LOG.warn(fields["msg"])
if len(fields["msg"]) % 2:
# TODO the msg likely wrapped, trim for now
fields["msg"] = fields["msg"][:-1]
fields["msg"] = detokenizer.detokenize(
bytes.fromhex(fields["msg"]))
break
_DEVICE_LOG.log(fields["level"], fields["msg"], extra={'extra_metadata_fields': {
"timestamp": fields["time"], "type": fields["type"], "mod": fields["mod"]}})
def _read_raw_serial(read: Callable[[], bytes], output):
"""Continuously read and pass to output."""
with ThreadPoolExecutor() as executor:
while True:
try:
data = read()
except Exception as exc: # pylint: disable=broad-except
continue
if data:
output(data)
def console(device: str, baudrate: int,
token_databases: Collection[tokens.Database],
socket_addr: str, output: Any, raw_serial: bool) -> int:
"""Starts an interactive RPC console for HDLC."""
# argparse.FileType doesn't correctly handle '-' for binary files.
if output is sys.stdout:
output = sys.stdout.buffer
logfile = create_temp_log_file()
pw_cli.log.install(logging.INFO, True, False, logfile)
serial_impl = SerialWithLogging
if socket_addr is None:
serial_device = serial_impl(device, baudrate, timeout=0)
def read(): return serial_device.read(8192)
write = serial_device.write
else:
try:
socket_device = SocketClientImpl(socket_addr)
read = socket_device.read
write = socket_device.write
except ValueError:
_LOG.exception('Failed to initialize socket at %s', socket_addr)
return 1
callback_client_impl = callback_client.Impl(
default_unary_timeout_s=5.0,
default_stream_timeout_s=None,
)
detokenizer = Detokenizer(tokens.Database.merged(*token_databases),
show_errors=False) if token_databases else None
if raw_serial:
threading.Thread(target=_read_raw_serial,
daemon=True,
args=(read,
lambda data: write_to_output(
data, output, detokenizer),
)).start()
_start_ipython_raw_terminal()
else:
_start_ipython_hdlc_terminal(
HdlcRpcClient(read, PROTOS, default_channels(write),
lambda data: write_to_output(
data, output, detokenizer),
client_impl=callback_client_impl)
)
return 0
def main() -> int:
return console(**vars(_parse_args()))
if __name__ == '__main__':
sys.exit(main())