| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2021 Intel Corporation |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| Dictionary-based Logging Parser Version 1 |
| |
| This contains the implementation of the parser for |
| version 1 databases. |
| """ |
| |
| import logging |
| import math |
| import struct |
| import colorama |
| from colorama import Fore |
| |
| from .log_parser import LogParser |
| |
| |
| HEX_BYTES_IN_LINE = 16 |
| |
| LOG_LEVELS = [ |
| ('none', Fore.WHITE), |
| ('err', Fore.RED), |
| ('wrn', Fore.YELLOW), |
| ('inf', Fore.GREEN), |
| ('dbg', Fore.BLUE) |
| ] |
| |
| # Need to keep sync with struct log_dict_output_msg_hdr in |
| # include/logging/log_output_dict.h. |
| # |
| # struct log_dict_output_normal_msg_hdr_t { |
| # uint8_t type; |
| # uint32_t domain:3; |
| # uint32_t level:3; |
| # uint32_t package_len:10; |
| # uint32_t data_len:12; |
| # uintptr_t source; |
| # log_timestamp_t timestamp; |
| # } __packed; |
| # |
| # Note "type" and "timestamp" are encoded separately below. |
| FMT_MSG_HDR_32 = "II" |
| FMT_MSG_HDR_64 = "IQ" |
| |
| # Message type |
| # 0: normal message |
| # 1: number of dropped messages |
| FMT_MSG_TYPE = "B" |
| |
| # Depends on CONFIG_LOG_TIMESTAMP_64BIT |
| FMT_MSG_TIMESTAMP_32 = "I" |
| FMT_MSG_TIMESTAMP_64 = "Q" |
| |
| # Keep message types in sync with include/logging/log_output_dict.h |
| MSG_TYPE_NORMAL = 0 |
| MSG_TYPE_DROPPED = 1 |
| |
| # Number of dropped messages |
| FMT_DROPPED_CNT = "H" |
| |
| |
| logger = logging.getLogger("parser") |
| |
| |
| def get_log_level_str_color(lvl): |
| """Convert numeric log level to string""" |
| if lvl < 0 or lvl >= len(LOG_LEVELS): |
| return ("unk", Fore.WHITE) |
| |
| return LOG_LEVELS[lvl] |
| |
| |
| def formalize_fmt_string(fmt_str): |
| """Replace unsupported formatter""" |
| new_str = fmt_str |
| |
| # Python doesn't support %lld or %llu so need to remove extra 'l' |
| new_str = new_str.replace("%lld", "%ld") |
| new_str = new_str.replace("%llu", "%lu") |
| |
| # No %p for pointer either, so use %x |
| new_str = new_str.replace("%p", "0x%x") |
| |
| return new_str |
| |
| |
| class DataTypes(): |
| """Class regarding data types, their alignments and sizes""" |
| INT = 0 |
| UINT = 1 |
| LONG = 2 |
| ULONG = 3 |
| LONG_LONG = 4 |
| ULONG_LONG = 5 |
| PTR = 6 |
| DOUBLE = 7 |
| LONG_DOUBLE = 8 |
| NUM_TYPES = 9 |
| |
| def __init__(self, database): |
| self.database = database |
| self.data_types = {} |
| |
| if database.is_tgt_64bit(): |
| self.add_data_type(self.LONG, "q") |
| self.add_data_type(self.LONG_LONG, "q") |
| self.add_data_type(self.PTR, "Q") |
| else: |
| self.add_data_type(self.LONG, "i") |
| self.add_data_type(self.LONG_LONG, "q") |
| self.add_data_type(self.PTR, "I") |
| |
| self.add_data_type(self.INT, "i") |
| self.add_data_type(self.DOUBLE, "d") |
| self.add_data_type(self.LONG_DOUBLE, "d") |
| |
| |
| def add_data_type(self, data_type, fmt): |
| """Add one data type""" |
| if self.database.is_tgt_little_endian(): |
| endianness = "<" |
| else: |
| endianness = ">" |
| |
| formatter = endianness + fmt |
| |
| self.data_types[data_type] = {} |
| self.data_types[data_type]['fmt'] = formatter |
| |
| size = struct.calcsize(formatter) |
| |
| if data_type == self.LONG_DOUBLE: |
| # Python doesn't have long double but we still |
| # need to skip correct number of bytes |
| size = 16 |
| |
| self.data_types[data_type]['sizeof'] = size |
| |
| # Might need actual number for different architectures |
| # but these seem to work fine for now. |
| if self.database.is_tgt_64bit(): |
| self.data_types[data_type]['align'] = 8 |
| else: |
| self.data_types[data_type]['align'] = 4 |
| |
| |
| def get_sizeof(self, data_type): |
| """Get sizeof() of a data type""" |
| return self.data_types[data_type]['sizeof'] |
| |
| |
| def get_alignment(self, data_type): |
| """Get the alignment of a data type""" |
| return self.data_types[data_type]['align'] |
| |
| |
| def get_formatter(self, data_type): |
| """Get the formatter for a data type""" |
| return self.data_types[data_type]['fmt'] |
| |
| |
| class LogParserV1(LogParser): |
| """Log Parser V1""" |
| def __init__(self, database): |
| super().__init__(database=database) |
| |
| if self.database.is_tgt_little_endian(): |
| endian = "<" |
| else: |
| endian = ">" |
| |
| self.fmt_msg_type = endian + FMT_MSG_TYPE |
| self.fmt_dropped_cnt = endian + FMT_DROPPED_CNT |
| |
| if self.database.is_tgt_64bit(): |
| self.fmt_msg_hdr = endian + FMT_MSG_HDR_64 |
| else: |
| self.fmt_msg_hdr = endian + FMT_MSG_HDR_32 |
| |
| if "CONFIG_LOG_TIMESTAMP_64BIT" in self.database.get_kconfigs(): |
| self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_64 |
| else: |
| self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_32 |
| |
| self.data_types = DataTypes(self.database) |
| |
| |
| def __get_string(self, arg, arg_offset, string_tbl): |
| one_str = self.database.find_string(arg) |
| if one_str is not None: |
| ret = one_str |
| else: |
| # The index from the string table is basically |
| # the order in va_list. Need to add to the index |
| # to skip the packaged string header and |
| # the format string. |
| str_idx = arg_offset + self.data_types.get_sizeof(DataTypes.PTR) * 2 |
| str_idx /= self.data_types.get_sizeof(DataTypes.INT) |
| |
| if int(str_idx) not in string_tbl: |
| ret = "<string@0x{0:x}>".format(arg) |
| else: |
| ret = string_tbl[int(str_idx)] |
| |
| return ret |
| |
| |
| def process_one_fmt_str(self, fmt_str, arg_list, string_tbl): |
| """Parse the format string to extract arguments from |
| the binary arglist and return a tuple usable with |
| Python's string formatting""" |
| idx = 0 |
| arg_offset = 0 |
| is_parsing = False |
| do_extract = False |
| |
| args = [] |
| |
| # Translated from cbvprintf_package() |
| for idx, fmt in enumerate(fmt_str): |
| if not is_parsing: |
| if fmt == '%': |
| is_parsing = True |
| arg_data_type = DataTypes.INT |
| continue |
| |
| elif fmt == '%': |
| # '%%' -> literal percentage sign |
| is_parsing = False |
| continue |
| |
| elif fmt == '*': |
| pass |
| |
| elif fmt.isdecimal() or str.lower(fmt) == 'l' \ |
| or fmt in (' ', '#', '-', '+', '.', 'h'): |
| # formatting modifiers, just ignore |
| continue |
| |
| elif fmt in ('j', 'z', 't'): |
| # intmax_t, size_t or ptrdiff_t |
| arg_data_type = DataTypes.LONG |
| |
| elif fmt in ('c', 'd', 'i', 'o', 'u') or str.lower(fmt) == 'x': |
| if fmt_str[idx - 1] == 'l': |
| if fmt_str[idx - 2] == 'l': |
| arg_data_type = DataTypes.LONG_LONG |
| else: |
| arg_data_type = DataTypes.LONG |
| else: |
| arg_data_type = DataTypes.INT |
| |
| is_parsing = False |
| do_extract = True |
| |
| elif fmt in ('s', 'p', 'n'): |
| arg_data_type = DataTypes.PTR |
| |
| is_parsing = False |
| do_extract = True |
| |
| elif str.lower(fmt) in ('a', 'e', 'f', 'g'): |
| # Python doesn't do"long double". |
| # |
| # Parse it as double (probably incorrect), but |
| # still have to skip enough bytes. |
| if fmt_str[idx - 1] == 'L': |
| arg_data_type = DataTypes.LONG_DOUBLE |
| else: |
| arg_data_type = DataTypes.DOUBLE |
| |
| is_parsing = False |
| do_extract = True |
| |
| else: |
| is_parsing = False |
| continue |
| |
| if do_extract: |
| do_extract = False |
| |
| align = self.data_types.get_alignment(arg_data_type) |
| size = self.data_types.get_sizeof(arg_data_type) |
| unpack_fmt = self.data_types.get_formatter(arg_data_type) |
| |
| one_arg = struct.unpack_from(unpack_fmt, arg_list, arg_offset)[0] |
| |
| if fmt == 's': |
| one_arg = self.__get_string(one_arg, arg_offset, string_tbl) |
| |
| args.append(one_arg) |
| arg_offset += size |
| |
| # Align the offset |
| arg_offset = int((arg_offset + align - 1) / align) * align |
| |
| return tuple(args) |
| |
| |
| @staticmethod |
| def extract_string_table(str_tbl): |
| """Extract string table in a packaged log message""" |
| tbl = {} |
| |
| one_str = "" |
| next_new_string = True |
| # Translated from cbvprintf_package() |
| for one_ch in str_tbl: |
| if next_new_string: |
| str_idx = one_ch |
| next_new_string = False |
| continue |
| |
| if one_ch == 0: |
| tbl[str_idx] = one_str |
| one_str = "" |
| next_new_string = True |
| continue |
| |
| one_str += chr(one_ch) |
| |
| return tbl |
| |
| |
| @staticmethod |
| def print_hexdump(hex_data, prefix_len, color): |
| """Print hex dump""" |
| hex_vals = "" |
| chr_vals = "" |
| chr_done = 0 |
| |
| for one_hex in hex_data: |
| hex_vals += "%x " % one_hex |
| chr_vals += chr(one_hex) |
| chr_done += 1 |
| |
| if chr_done == HEX_BYTES_IN_LINE / 2: |
| hex_vals += " " |
| chr_vals += " " |
| |
| elif chr_done == HEX_BYTES_IN_LINE: |
| print(f"{color}%s%s|%s{Fore.RESET}" % ((" " * prefix_len), |
| hex_vals, chr_vals)) |
| hex_vals = "" |
| chr_vals = "" |
| chr_done = 0 |
| |
| if len(chr_vals) > 0: |
| hex_padding = " " * (HEX_BYTES_IN_LINE - chr_done) |
| print(f"{color}%s%s%s|%s{Fore.RESET}" % ((" " * prefix_len), |
| hex_vals, hex_padding, chr_vals)) |
| |
| |
| def parse_one_normal_msg(self, logdata, offset): |
| """Parse one normal log message and print the encoded message""" |
| # Parse log message header |
| log_desc, source_id = struct.unpack_from(self.fmt_msg_hdr, logdata, offset) |
| offset += struct.calcsize(self.fmt_msg_hdr) |
| |
| timestamp = struct.unpack_from(self.fmt_msg_timestamp, logdata, offset)[0] |
| offset += struct.calcsize(self.fmt_msg_timestamp) |
| |
| # domain_id, level, pkg_len, data_len |
| domain_id = log_desc & 0x07 |
| level = (log_desc >> 3) & 0x07 |
| pkg_len = (log_desc >> 6) & int(math.pow(2, 10) - 1) |
| data_len = (log_desc >> 16) & int(math.pow(2, 12) - 1) |
| |
| level_str, color = get_log_level_str_color(level) |
| source_id_str = self.database.get_log_source_string(domain_id, source_id) |
| |
| # Skip over data to point to next message (save as return value) |
| next_msg_offset = offset + pkg_len + data_len |
| |
| # Offset from beginning of cbprintf_packaged data to end of va_list arguments |
| offset_end_of_args = struct.unpack_from("B", logdata, offset)[0] |
| offset_end_of_args *= self.data_types.get_sizeof(DataTypes.INT) |
| offset_end_of_args += offset |
| |
| # Extra data after packaged log |
| extra_data = logdata[(offset + pkg_len):next_msg_offset] |
| |
| # Number of appended strings in package |
| num_packed_strings = struct.unpack_from("B", logdata, offset+1)[0] |
| |
| # Number of read-only string indexes |
| num_ro_str_indexes = struct.unpack_from("B", logdata, offset+2)[0] |
| offset_end_of_args += num_ro_str_indexes |
| |
| # Number of read-write string indexes |
| num_rw_str_indexes = struct.unpack_from("B", logdata, offset+3)[0] |
| offset_end_of_args += num_rw_str_indexes |
| |
| # Extract the string table in the packaged log message |
| string_tbl = self.extract_string_table(logdata[offset_end_of_args:(offset + pkg_len)]) |
| |
| if len(string_tbl) != num_packed_strings: |
| logger.error("------ Error extracting string table") |
| return None |
| |
| # Skip packaged string header |
| offset += self.data_types.get_sizeof(DataTypes.PTR) |
| |
| # Grab the format string |
| # |
| # Note the negative offset to __get_string(). It is because |
| # the offset begins at 0 for va_list. However, the format string |
| # itself is before the va_list, so need to go back the width of |
| # a pointer. |
| fmt_str_ptr = struct.unpack_from(self.data_types.get_formatter(DataTypes.PTR), |
| logdata, offset)[0] |
| fmt_str = self.__get_string(fmt_str_ptr, |
| -self.data_types.get_sizeof(DataTypes.PTR), |
| string_tbl) |
| offset += self.data_types.get_sizeof(DataTypes.PTR) |
| |
| if not fmt_str: |
| logger.error("------ Error getting format string at 0x%x", fmt_str_ptr) |
| return None |
| |
| args = self.process_one_fmt_str(fmt_str, logdata[offset:offset_end_of_args], string_tbl) |
| |
| fmt_str = formalize_fmt_string(fmt_str) |
| log_msg = fmt_str % args |
| |
| if level == 0: |
| print(f"{log_msg}", end='') |
| else: |
| log_prefix = f"[{timestamp:>10}] <{level_str}> {source_id_str}: " |
| print(f"{color}%s%s{Fore.RESET}" % (log_prefix, log_msg)) |
| |
| if data_len > 0: |
| # Has hexdump data |
| self.print_hexdump(extra_data, len(log_prefix), color) |
| |
| # Point to next message |
| return next_msg_offset |
| |
| |
| def parse_log_data(self, logdata, debug=False): |
| """Parse binary log data and print the encoded log messages""" |
| offset = 0 |
| |
| while offset < len(logdata): |
| # Get message type |
| msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0] |
| offset += struct.calcsize(self.fmt_msg_type) |
| |
| if msg_type == MSG_TYPE_DROPPED: |
| num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset) |
| offset += struct.calcsize(self.fmt_dropped_cnt) |
| |
| print(f"--- {num_dropped} messages dropped ---") |
| |
| elif msg_type == MSG_TYPE_NORMAL: |
| ret = self.parse_one_normal_msg(logdata, offset) |
| if ret is None: |
| return False |
| |
| offset = ret |
| |
| else: |
| logger.error("------ Unknown message type: %s", msg_type) |
| return False |
| |
| return True |
| |
| colorama.init() |