blob: 25f9d4bead543efe3b521b82d5d5f540ae6f6ffd [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2024 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
"""
Log Parser for Dictionary-based Logging
This uses the JSON database file to decode the binary
log data taken directly from input serialport and print
the log messages.
"""
import argparse
import contextlib
import logging
import os
import select
import sys
import time
import parserlib
import serial
try:
# Pylink is an optional dependency for RTT reading, which requires it's own installation.
# Don't fail, unless the user tries to use RTT reading.
import pylink
except ImportError:
pylink = None
LOGGER_FORMAT = "%(message)s"
logger = logging.getLogger("parser")
class SerialReader:
"""Class to read data from serial port and parse it"""
def __init__(self, serial_port, baudrate):
self.serial_port = serial_port
self.baudrate = baudrate
self.serial = None
@contextlib.contextmanager
def open(self):
try:
self.serial = serial.Serial(self.serial_port, self.baudrate)
yield
finally:
self.serial.close()
def fileno(self):
return self.serial.fileno()
def read_non_blocking(self):
size = self.serial.in_waiting
return self.serial.read(size)
class FileReader:
"""Class to read data from serial port and parse it"""
def __init__(self, filepath):
self.filepath = filepath
self.file = None
@contextlib.contextmanager
def open(self):
if self.filepath is not None:
with open(self.filepath, 'rb') as f:
self.file = f
yield
else:
sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0)
self.file = sys.stdin
yield
def fileno(self):
return self.file.fileno()
def read_non_blocking(self):
# Read available data using a reasonable buffer size (without buffer size, this blocks
# forever, but with buffer size it returns even when less data than the buffer read was
# available).
return self.file.read(1024)
class JLinkRTTReader:
"""Class to read data from JLink's RTT"""
@staticmethod
def _create_jlink_connection(lib_path):
if pylink is None:
raise ImportError(
"pylink module is required for RTT reading. "
"Please install it using 'pip install pylink-square'."
)
if lib_path is not None:
lib = pylink.Library(lib_path, True)
jlink = pylink.JLink(lib)
else:
jlink = pylink.JLink()
return jlink
@contextlib.contextmanager
def open(self):
try:
self.jlink.open()
self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD)
if self.speed != 0:
self.jlink.connect(self.target_device, self.speed)
else:
self.jlink.connect(self.target_device)
self.jlink.rtt_start(self.block_address)
# Wait for the JLINK RTT buffers to be initialized.
up_down_initialized = False
while not up_down_initialized:
try:
_ = self.jlink.rtt_get_num_up_buffers()
_ = self.jlink.rtt_get_num_down_buffers()
up_down_initialized = True
except pylink.errors.JLinkRTTException:
time.sleep(0.1)
yield
finally:
self.close()
def __init__(self, target_device, block_address, channel, speed, lib_path):
self.target_device = target_device
self.block_address = block_address
self.speed = speed
self.channel = channel
self.jlink = self._create_jlink_connection(lib_path)
def close(self):
# JLink closes the connection through the __del__ method.
del self.jlink
def read_non_blocking(self):
return bytes(self.jlink.rtt_read(self.channel, 1024))
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(allow_abbrev=False)
parser.add_argument("dbfile", help="Dictionary Logging Database file")
parser.add_argument("--debug", action="store_true", help="Print extra debugging information")
parser.add_argument(
"--polling-interval",
type=float,
default=0.1,
help="Interval for polling input source, if it does not support 'select'",
)
# Create subparsers for different input modes
subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode")
# Serial subparser
serial_parser = subparsers.add_parser("serial", help="Read from serial port")
serial_parser.add_argument("port", help="Serial port")
serial_parser.add_argument("baudrate", type=int, help="Baudrate")
# File subparser
file_parser = subparsers.add_parser("file", help="Read from file")
file_parser.add_argument(
"filepath", nargs="?", default=None, help="Input file path, leave empty for stdin"
)
# RTT subparser
jlink_rtt_parser = subparsers.add_parser("jlink-rtt", help="Read from RTT")
jlink_rtt_parser.add_argument(
"target_device", help="Device Name (see https://www.segger.com/supported-devices/jlink/)"
)
jlink_rtt_parser.add_argument(
"--block-address", help="RTT block address in hex", type=lambda x: int(x, 16)
)
jlink_rtt_parser.add_argument("--channel", type=int, help="RTT channel number", default=0)
jlink_rtt_parser.add_argument("--speed", type=int, help="Reading speed", default='0')
jlink_rtt_parser.add_argument("--lib-path", help="Path to libjlinkarm.so library")
return parser.parse_args()
def main():
"""function of serial parser"""
args = parse_args()
if args.dbfile is None or '.json' not in args.dbfile:
logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile)
sys.exit(1)
logging.basicConfig(format=LOGGER_FORMAT)
if args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
log_parser = parserlib.get_log_parser(args.dbfile, logger)
data = b''
if args.mode == "serial":
reader = SerialReader(args.port, args.baudrate)
elif args.mode == "file":
reader = FileReader(args.filepath)
elif args.mode == "jlink-rtt":
reader = JLinkRTTReader(
args.target_device, args.block_address, args.channel, args.speed, args.lib_path
)
else:
raise ValueError("Invalid mode selected. Use 'serial' or 'file'.")
with reader.open():
while True:
if hasattr(reader, 'fileno'):
_, _, _ = select.select([reader], [], [])
else:
time.sleep(args.polling_interval)
data += reader.read_non_blocking()
parsed_data_offset = parserlib.parser(data, log_parser, logger)
data = data[parsed_data_offset:]
if __name__ == "__main__":
main()