|  | #!/usr/bin/env python3 | 
|  | # | 
|  | # Copyright (c) 2024 Nordic Semiconductor ASA | 
|  | # | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  |  | 
|  | """ | 
|  | Log Parser for Dictionary-based Logging | 
|  |  | 
|  | This uses the JSON database file to decode the binary | 
|  | log data taken directly from input serialport and print | 
|  | the log messages. | 
|  | """ | 
|  |  | 
|  | import argparse | 
|  | import contextlib | 
|  | import logging | 
|  | import os | 
|  | import select | 
|  | import sys | 
|  | import time | 
|  |  | 
|  | import parserlib | 
|  | import serial | 
|  |  | 
|  | try: | 
|  | # Pylink is an optional dependency for RTT reading, which requires it's own installation. | 
|  | # Don't fail, unless the user tries to use RTT reading. | 
|  | import pylink | 
|  | except ImportError: | 
|  | pylink = None | 
|  |  | 
|  | LOGGER_FORMAT = "%(message)s" | 
|  | logger = logging.getLogger("parser") | 
|  |  | 
|  |  | 
|  | class SerialReader: | 
|  | """Class to read data from serial port and parse it""" | 
|  |  | 
|  | def __init__(self, serial_port, baudrate): | 
|  | self.serial_port = serial_port | 
|  | self.baudrate = baudrate | 
|  | self.serial = None | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def open(self): | 
|  | try: | 
|  | self.serial = serial.Serial(self.serial_port, self.baudrate) | 
|  | yield | 
|  | finally: | 
|  | self.serial.close() | 
|  |  | 
|  | def fileno(self): | 
|  | return self.serial.fileno() | 
|  |  | 
|  | def read_non_blocking(self): | 
|  | size = self.serial.in_waiting | 
|  | return self.serial.read(size) | 
|  |  | 
|  |  | 
|  | class FileReader: | 
|  | """Class to read data from serial port and parse it""" | 
|  |  | 
|  | def __init__(self, filepath): | 
|  | self.filepath = filepath | 
|  | self.file = None | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def open(self): | 
|  | if self.filepath is not None: | 
|  | with open(self.filepath, 'rb') as f: | 
|  | self.file = f | 
|  | yield | 
|  | else: | 
|  | sys.stdin = os.fdopen(sys.stdin.fileno(), 'rb', 0) | 
|  | self.file = sys.stdin | 
|  | yield | 
|  |  | 
|  | def fileno(self): | 
|  | return self.file.fileno() | 
|  |  | 
|  | def read_non_blocking(self): | 
|  | # Read available data using a reasonable buffer size (without buffer size, this blocks | 
|  | # forever, but with buffer size it returns even when less data than the buffer read was | 
|  | # available). | 
|  | return self.file.read(1024) | 
|  |  | 
|  |  | 
|  | class JLinkRTTReader: | 
|  | """Class to read data from JLink's RTT""" | 
|  |  | 
|  | @staticmethod | 
|  | def _create_jlink_connection(lib_path): | 
|  | if pylink is None: | 
|  | raise ImportError( | 
|  | "pylink module is required for RTT reading. " | 
|  | "Please install it using 'pip install pylink-square'." | 
|  | ) | 
|  |  | 
|  | if lib_path is not None: | 
|  | lib = pylink.Library(lib_path, True) | 
|  | jlink = pylink.JLink(lib) | 
|  | else: | 
|  | jlink = pylink.JLink() | 
|  |  | 
|  | return jlink | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def open(self): | 
|  | try: | 
|  | self.jlink.open() | 
|  | self.jlink.set_tif(pylink.enums.JLinkInterfaces.SWD) | 
|  | if self.speed != 0: | 
|  | self.jlink.connect(self.target_device, self.speed) | 
|  | else: | 
|  | self.jlink.connect(self.target_device) | 
|  |  | 
|  | self.jlink.rtt_start(self.block_address) | 
|  |  | 
|  | # Wait for the JLINK RTT buffers to be initialized. | 
|  | up_down_initialized = False | 
|  | while not up_down_initialized: | 
|  | try: | 
|  | _ = self.jlink.rtt_get_num_up_buffers() | 
|  | _ = self.jlink.rtt_get_num_down_buffers() | 
|  | up_down_initialized = True | 
|  | except pylink.errors.JLinkRTTException: | 
|  | time.sleep(0.1) | 
|  |  | 
|  | yield | 
|  |  | 
|  | finally: | 
|  | self.close() | 
|  |  | 
|  | def __init__(self, target_device, block_address, channel, speed, lib_path): | 
|  | self.target_device = target_device | 
|  | self.block_address = block_address | 
|  | self.speed = speed | 
|  | self.channel = channel | 
|  |  | 
|  | self.jlink = self._create_jlink_connection(lib_path) | 
|  |  | 
|  | def close(self): | 
|  | # JLink closes the connection through the __del__ method. | 
|  | del self.jlink | 
|  |  | 
|  | def read_non_blocking(self): | 
|  | return bytes(self.jlink.rtt_read(self.channel, 1024)) | 
|  |  | 
|  |  | 
|  | def parse_args(): | 
|  | """Parse command line arguments""" | 
|  | parser = argparse.ArgumentParser(allow_abbrev=False) | 
|  |  | 
|  | parser.add_argument("dbfile", help="Dictionary Logging Database file") | 
|  | parser.add_argument("--debug", action="store_true", help="Print extra debugging information") | 
|  | parser.add_argument( | 
|  | "--polling-interval", | 
|  | type=float, | 
|  | default=0.1, | 
|  | help="Interval for polling input source, if it does not support 'select'", | 
|  | ) | 
|  |  | 
|  | # Create subparsers for different input modes | 
|  | subparsers = parser.add_subparsers(dest="mode", required=True, help="Input source mode") | 
|  |  | 
|  | # Serial subparser | 
|  | serial_parser = subparsers.add_parser("serial", help="Read from serial port") | 
|  | serial_parser.add_argument("port", help="Serial port") | 
|  | serial_parser.add_argument("baudrate", type=int, help="Baudrate") | 
|  |  | 
|  | # File subparser | 
|  | file_parser = subparsers.add_parser("file", help="Read from file") | 
|  | file_parser.add_argument( | 
|  | "filepath", nargs="?", default=None, help="Input file path, leave empty for stdin" | 
|  | ) | 
|  |  | 
|  | # RTT subparser | 
|  | jlink_rtt_parser = subparsers.add_parser("jlink-rtt", help="Read from RTT") | 
|  | jlink_rtt_parser.add_argument( | 
|  | "target_device", help="Device Name (see https://www.segger.com/supported-devices/jlink/)" | 
|  | ) | 
|  | jlink_rtt_parser.add_argument( | 
|  | "--block-address", help="RTT block address in hex", type=lambda x: int(x, 16) | 
|  | ) | 
|  | jlink_rtt_parser.add_argument("--channel", type=int, help="RTT channel number", default=0) | 
|  | jlink_rtt_parser.add_argument("--speed", type=int, help="Reading speed", default='0') | 
|  | jlink_rtt_parser.add_argument("--lib-path", help="Path to libjlinkarm.so library") | 
|  |  | 
|  | return parser.parse_args() | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | """function of serial parser""" | 
|  | args = parse_args() | 
|  |  | 
|  | if args.dbfile is None or '.json' not in args.dbfile: | 
|  | logger.error("ERROR: invalid log database path: %s, exiting...", args.dbfile) | 
|  | sys.exit(1) | 
|  |  | 
|  | logging.basicConfig(format=LOGGER_FORMAT) | 
|  |  | 
|  | if args.debug: | 
|  | logger.setLevel(logging.DEBUG) | 
|  | else: | 
|  | logger.setLevel(logging.INFO) | 
|  |  | 
|  | log_parser = parserlib.get_log_parser(args.dbfile, logger) | 
|  |  | 
|  | data = b'' | 
|  |  | 
|  | if args.mode == "serial": | 
|  | reader = SerialReader(args.port, args.baudrate) | 
|  | elif args.mode == "file": | 
|  | reader = FileReader(args.filepath) | 
|  | elif args.mode == "jlink-rtt": | 
|  | reader = JLinkRTTReader( | 
|  | args.target_device, args.block_address, args.channel, args.speed, args.lib_path | 
|  | ) | 
|  | else: | 
|  | raise ValueError("Invalid mode selected. Use 'serial' or 'file'.") | 
|  |  | 
|  | with reader.open(): | 
|  | while True: | 
|  | if hasattr(reader, 'fileno'): | 
|  | _, _, _ = select.select([reader], [], []) | 
|  | else: | 
|  | time.sleep(args.polling_interval) | 
|  | data += reader.read_non_blocking() | 
|  | parsed_data_offset = parserlib.parser(data, log_parser, logger) | 
|  | data = data[parsed_data_offset:] | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | main() |