| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2024 Nordic Semiconductor ASA |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| Log Parser for Dictionary-based Logging |
| |
| This uses the JSON database file to decode the binary |
| log data taken directly from input serialport and print |
| the log messages. |
| """ |
| |
| import argparse |
| import contextlib |
| import logging |
| import os |
| import select |
| import sys |
| import time |
| |
| import parserlib |
| import serial |
| |
| try: |
| # Pylink is an optional dependency for RTT reading, which requires it's own installation. |
| # Don't fail, unless the user tries to use RTT reading. |
| import pylink |
| except ImportError: |
| pylink = None |
| |
| LOGGER_FORMAT = "%(message)s" |
| logger = logging.getLogger("parser") |
| |
| |
| class SerialReader: |
| """Class to read data from serial port and parse it""" |
| |
| def __init__(self, serial_port, baudrate): |
| self.serial_port = serial_port |
| self.baudrate = baudrate |
| self.serial = None |
| |
| @contextlib.contextmanager |
| def open(self): |
| try: |
| self.serial = serial.Serial(self.serial_port, self.baudrate) |
| yield |
| finally: |
| self.serial.close() |
| |
| def fileno(self): |
| return self.serial.fileno() |
| |
| def read_non_blocking(self): |
| size = self.serial.in_waiting |
| return self.serial.read(size) |
| |
| |
| class FileReader: |
| """Class to read data from serial port and parse it""" |
| |
| def __init__(self, filepath): |
| self.filepath = filepath |
| self.file = None |
| |
| @contextlib.contextmanager |
| def open(self): |
| if self.filepath is not None: |
| with open(self.filepath, 'rb') as f: |
| self.file = f |
| yield |
| else: |
| sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) |
| self.file = sys.stdin |
| yield |
| |
| def fileno(self): |
| return self.file.fileno() |
| |
| def read_non_blocking(self): |
| # Read available data using a reasonable buffer size (without buffer size, this blocks |
| # forever, but with buffer size it returns even when less data than the buffer read was |
| # available). |
| return self.file.read(1024) |
| |
| |
| class JLinkRTTReader: |
| """Class to read data from JLink's RTT""" |
| |
| @staticmethod |
| def _create_jlink_connection(lib_path): |
| if pylink is None: |
| raise ImportError( |
| "pylink module is required for RTT reading. " |
| "Please install it using 'pip install pylink-square'." |
| ) |
| |
| if lib_path is not None: |
| lib = pylink.Library(lib_path, True) |
| jlink = pylink.JLink(lib) |
| else: |
| jlink = pylink.JLink() |
| |
| return jlink |
| |
| @contextlib.contextmanager |
| def open(self): |
| try: |
| self.jlink.open() |
| self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD) |
| if self.speed != 0: |
| self.jlink.connect(self.target_device, self.speed) |
| else: |
| self.jlink.connect(self.target_device) |
| |
| self.jlink.rtt_start(self.block_address) |
| |
| # Wait for the JLINK RTT buffers to be initialized. |
| up_down_initialized = False |
| while not up_down_initialized: |
| try: |
| _ = self.jlink.rtt_get_num_up_buffers() |
| _ = self.jlink.rtt_get_num_down_buffers() |
| up_down_initialized = True |
| except pylink.errors.JLinkRTTException: |
| time.sleep(0.1) |
| |
| yield |
| |
| finally: |
| self.close() |
| |
| def __init__(self, target_device, block_address, channel, speed, lib_path): |
| self.target_device = target_device |
| self.block_address = block_address |
| self.speed = speed |
| self.channel = channel |
| |
| self.jlink = self._create_jlink_connection(lib_path) |
| |
| def close(self): |
| # JLink closes the connection through the __del__ method. |
| del self.jlink |
| |
| def read_non_blocking(self): |
| return bytes(self.jlink.rtt_read(self.channel, 1024)) |
| |
| |
| def parse_args(): |
| """Parse command line arguments""" |
| parser = argparse.ArgumentParser(allow_abbrev=False) |
| |
| parser.add_argument("dbfile", help="Dictionary Logging Database file") |
| parser.add_argument("--debug", action="store_true", help="Print extra debugging information") |
| parser.add_argument( |
| "--polling-interval", |
| type=float, |
| default=0.1, |
| help="Interval for polling input source, if it does not support 'select'", |
| ) |
| |
| # Create subparsers for different input modes |
| subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode") |
| |
| # Serial subparser |
| serial_parser = subparsers.add_parser("serial", help="Read from serial port") |
| serial_parser.add_argument("port", help="Serial port") |
| serial_parser.add_argument("baudrate", type=int, help="Baudrate") |
| |
| # File subparser |
| file_parser = subparsers.add_parser("file", help="Read from file") |
| file_parser.add_argument( |
| "filepath", nargs="?", default=None, help="Input file path, leave empty for stdin" |
| ) |
| |
| # RTT subparser |
| jlink_rtt_parser = subparsers.add_parser("jlink-rtt", help="Read from RTT") |
| jlink_rtt_parser.add_argument( |
| "target_device", help="Device Name (see https://www.segger.com/supported-devices/jlink/)" |
| ) |
| jlink_rtt_parser.add_argument( |
| "--block-address", help="RTT block address in hex", type=lambda x: int(x, 16) |
| ) |
| jlink_rtt_parser.add_argument("--channel", type=int, help="RTT channel number", default=0) |
| jlink_rtt_parser.add_argument("--speed", type=int, help="Reading speed", default='0') |
| jlink_rtt_parser.add_argument("--lib-path", help="Path to libjlinkarm.so library") |
| |
| return parser.parse_args() |
| |
| |
| def main(): |
| """function of serial parser""" |
| args = parse_args() |
| |
| if args.dbfile is None or '.json' not in args.dbfile: |
| logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile) |
| sys.exit(1) |
| |
| logging.basicConfig(format=LOGGER_FORMAT) |
| |
| if args.debug: |
| logger.setLevel(logging.DEBUG) |
| else: |
| logger.setLevel(logging.INFO) |
| |
| log_parser = parserlib.get_log_parser(args.dbfile, logger) |
| |
| data = b'' |
| |
| if args.mode == "serial": |
| reader = SerialReader(args.port, args.baudrate) |
| elif args.mode == "file": |
| reader = FileReader(args.filepath) |
| elif args.mode == "jlink-rtt": |
| reader = JLinkRTTReader( |
| args.target_device, args.block_address, args.channel, args.speed, args.lib_path |
| ) |
| else: |
| raise ValueError("Invalid mode selected. Use 'serial' or 'file'.") |
| |
| with reader.open(): |
| while True: |
| if hasattr(reader, 'fileno'): |
| _, _, _ = select.select([reader], [], []) |
| else: |
| time.sleep(args.polling_interval) |
| data += reader.read_non_blocking() |
| parsed_data_offset = parserlib.parser(data, log_parser, logger) |
| data = data[parsed_data_offset:] |
| |
| |
| if __name__ == "__main__": |
| main() |