| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Decodes arguments and formats tokenized messages. |
| |
| The decode(format_string, encoded_arguments) function provides a simple way to |
| format a string with encoded arguments. The FormatString class may also be used. |
| |
| Missing, truncated, or otherwise corrupted arguments are handled and displayed |
| in the resulting string with an error message. |
| """ |
| |
| import re |
| import struct |
| from typing import Iterable, List, NamedTuple, Match, Sequence, Tuple |
| |
| |
| def zigzag_decode(value: int) -> int: |
| """ZigZag decode function from protobuf's wire_format module.""" |
| if not value & 0x1: |
| return value >> 1 |
| return (value >> 1) ^ (~0) |
| |
| |
| class FormatSpec: |
| """Represents a format specifier parsed from a printf-style string.""" |
| |
| # Regular expression for finding format specifiers. |
| FORMAT_SPEC = re.compile(r'%(?:(?P<flags>[+\- #0]*\d*(?:\.\d+)?)' |
| r'(?P<length>hh|h|ll|l|j|z|t|L)?' |
| r'(?P<type>[csdioxXufFeEaAgGnp])|%)') |
| |
| # Conversions to make format strings Python compatible. |
| _UNSUPPORTED_LENGTH = frozenset(['hh', 'll', 'j', 'z', 't']) |
| _REMAP_TYPE = {'a': 'f', 'A': 'F'} |
| |
| # Conversion specifiers by type; n is not supported. |
| _SIGNED_INT = 'di' |
| _UNSIGNED_INT = frozenset('oxXup') |
| _FLOATING_POINT = frozenset('fFeEaAgG') |
| |
| _PACKED_FLOAT = struct.Struct('<f') |
| |
| @classmethod |
| def from_string(cls, format_specifier: str): |
| """Creates a FormatSpec from a str with a single format specifier.""" |
| match = cls.FORMAT_SPEC.fullmatch(format_specifier) |
| |
| if not match: |
| raise ValueError( |
| '{!r} is not a valid single format specifier'.format( |
| format_specifier)) |
| |
| return cls(match) |
| |
| def __init__(self, re_match: Match): |
| """Constructs a FormatSpec from an re.Match object for FORMAT_SPEC.""" |
| self.match = re_match |
| self.specifier: str = self.match.group() |
| |
| self.flags: str = self.match.group('flags') or '' |
| self.length: str = self.match.group('length') or '' |
| |
| # If there is no type, the format spec is %%. |
| self.type: str = self.match.group('type') or '%' |
| |
| # %p prints as 0xFEEDBEEF; other specs may need length/type switched |
| if self.type == 'p': |
| self.compatible = '0x%08X' |
| else: |
| self.compatible = ''.join([ |
| '%', self.flags, |
| '' if self.length in self._UNSUPPORTED_LENGTH else '', |
| self._REMAP_TYPE.get(self.type, self.type) |
| ]) |
| |
| def decode(self, encoded_arg: bytes) -> 'DecodedArg': |
| """Decodes the provided data according to this format specifier.""" |
| if self.type == '%': # literal % |
| return DecodedArg(self, (), |
| b'') # Use () as the value for % formatting. |
| |
| if self.type == 's': # string |
| return self._decode_string(encoded_arg) |
| |
| if self.type == 'c': # character |
| return self._decode_char(encoded_arg) |
| |
| if self.type in self._SIGNED_INT: |
| return self._decode_signed_integer(encoded_arg) |
| |
| if self.type in self._UNSIGNED_INT: |
| return self._decode_unsigned_integer(encoded_arg) |
| |
| if self.type in self._FLOATING_POINT: |
| return self._decode_float(encoded_arg) |
| |
| # Unsupported specifier (e.g. %n) |
| return DecodedArg( |
| self, None, b'', DecodedArg.DECODE_ERROR, |
| 'Unsupported conversion specifier "{}"'.format(self.type)) |
| |
| def _decode_signed_integer(self, encoded: bytes) -> 'DecodedArg': |
| """Decodes a signed variable-length integer.""" |
| if not encoded: |
| return DecodedArg.missing(self) |
| |
| count = 0 |
| result = 0 |
| shift = 0 |
| |
| for byte in encoded: |
| count += 1 |
| result |= (byte & 0x7f) << shift |
| |
| if not byte & 0x80: |
| return DecodedArg(self, zigzag_decode(result), encoded[:count]) |
| |
| shift += 7 |
| if shift >= 64: |
| break |
| |
| return DecodedArg(self, None, encoded[:count], DecodedArg.DECODE_ERROR, |
| 'Unterminated variable-length integer') |
| |
| def _decode_unsigned_integer(self, encoded: bytes) -> 'DecodedArg': |
| arg = self._decode_signed_integer(encoded) |
| |
| # Since ZigZag encoding is used, unsigned integers must be masked off to |
| # their original bit length. |
| if arg.value is not None: |
| arg.value &= (1 << self.size_bits()) - 1 |
| |
| return arg |
| |
| def _decode_float(self, encoded: bytes) -> 'DecodedArg': |
| if len(encoded) < 4: |
| return DecodedArg.missing(self) |
| |
| return DecodedArg(self, |
| self._PACKED_FLOAT.unpack_from(encoded)[0], |
| encoded[:4]) |
| |
| def _decode_string(self, encoded: bytes) -> 'DecodedArg': |
| """Reads a unicode string from the encoded data.""" |
| if not encoded: |
| return DecodedArg.missing(self) |
| |
| size_and_status = encoded[0] |
| status = DecodedArg.OK |
| |
| if size_and_status & 0x80: |
| status |= DecodedArg.TRUNCATED |
| size_and_status &= 0x7f |
| |
| raw_data = encoded[0:size_and_status + 1] |
| data = raw_data[1:] |
| |
| if len(data) < size_and_status: |
| status |= DecodedArg.DECODE_ERROR |
| |
| try: |
| decoded = data.decode() |
| except UnicodeDecodeError as err: |
| return DecodedArg(self, |
| repr(bytes(data)).lstrip('b'), raw_data, |
| status | DecodedArg.DECODE_ERROR, err) |
| |
| return DecodedArg(self, decoded, raw_data, status) |
| |
| def _decode_char(self, encoded: bytes) -> 'DecodedArg': |
| """Reads an integer from the data, then converts it to a string.""" |
| arg = self._decode_signed_integer(encoded) |
| |
| if arg.ok(): |
| try: |
| arg.value = chr(arg.value) |
| except (OverflowError, ValueError) as err: |
| arg.error = err |
| arg.status |= DecodedArg.DECODE_ERROR |
| |
| return arg |
| |
| def size_bits(self) -> int: |
| """Size of the argument in bits; 0 for strings.""" |
| if self.type == 's': |
| return 0 |
| |
| # TODO(hepler): 64-bit targets likely have 64-bit l, j, z, and t. |
| return 64 if self.length in ['ll', 'j'] else 32 |
| |
| def __str__(self) -> str: |
| return self.specifier |
| |
| |
| class DecodedArg: |
| """Represents a decoded argument that is ready to be formatted.""" |
| |
| # Status flags for a decoded argument. These values should match the |
| # DecodingStatus enum in pw_tokenizer/internal/decode.h. |
| OK = 0 # decoding was successful |
| MISSING = 1 # the argument was not present in the data |
| TRUNCATED = 2 # the argument was truncated during encoding |
| DECODE_ERROR = 4 # an error occurred while decoding the argument |
| SKIPPED = 8 # argument was skipped due to a previous error |
| |
| @classmethod |
| def missing(cls, specifier: FormatSpec): |
| return cls(specifier, None, b'', cls.MISSING) |
| |
| def __init__(self, |
| specifier: FormatSpec, |
| value, |
| raw_data: bytes, |
| status: int = OK, |
| error=None): |
| self.specifier = specifier # FormatSpec (e.g. to represent "%0.2f") |
| self.value = value # the decoded value, or None if decoding failed |
| self.raw_data = bytes( |
| raw_data) # the exact bytes used to decode this arg |
| self._status = status |
| self.error = error |
| |
| def ok(self) -> bool: |
| """The argument was decoded without errors.""" |
| return self.status == self.OK or self.status == self.TRUNCATED |
| |
| @property |
| def status(self) -> int: |
| return self._status |
| |
| @status.setter |
| def status(self, status: int): |
| # The %% specifier is always OK and should always be printed normally. |
| self._status = status if self.specifier.type != '%' else self.OK |
| |
| def format(self) -> str: |
| """Returns formatted version of this argument, with error handling.""" |
| if self.status == self.TRUNCATED: |
| return self.specifier.compatible % (self.value + '[...]') |
| |
| if self.ok(): |
| try: |
| return self.specifier.compatible % self.value |
| except (OverflowError, TypeError, ValueError) as err: |
| self.status |= self.DECODE_ERROR |
| self.error = err |
| |
| if self.status & self.SKIPPED: |
| message = '{} SKIPPED'.format(self.specifier) |
| elif self.status == self.MISSING: |
| message = '{} MISSING'.format(self.specifier) |
| elif self.status & self.DECODE_ERROR: |
| message = '{} ERROR'.format(self.specifier) |
| else: |
| raise AssertionError('Unhandled DecodedArg status {:x}!'.format( |
| self.status)) |
| |
| if self.value is None or not str(self.value): |
| return '<[{}]>'.format(message) |
| |
| return '<[{} ({})]>'.format(message, self.value) |
| |
| def __str__(self) -> str: |
| return self.format() |
| |
| def __repr__(self) -> str: |
| return 'DecodedArg({!r})'.format(self) |
| |
| |
| def parse_format_specifiers(format_string: str) -> Iterable[FormatSpec]: |
| for spec in FormatSpec.FORMAT_SPEC.finditer(format_string): |
| yield FormatSpec(spec) |
| |
| |
| class FormattedString(NamedTuple): |
| value: str |
| args: Sequence[DecodedArg] |
| remaining: bytes |
| |
| |
| class FormatString: |
| """Represents a printf-style format string.""" |
| def __init__(self, format_string: str): |
| """Parses format specifiers in the format string.""" |
| self.format_string = format_string |
| self.specifiers = tuple(parse_format_specifiers(self.format_string)) |
| |
| # List of non-specifier string pieces with room for formatted arguments. |
| self._segments = self._parse_string_segments() |
| |
| def _parse_string_segments(self) -> List: |
| """Splits the format string by format specifiers.""" |
| if not self.specifiers: |
| return [self.format_string] |
| |
| spec_spans = [spec.match.span() for spec in self.specifiers] |
| |
| # Start with the part of the format string up to the first specifier. |
| string_pieces = [self.format_string[:spec_spans[0][0]]] |
| |
| for ((_, end1), (start2, _)) in zip(spec_spans[:-1], spec_spans[1:]): |
| string_pieces.append(self.format_string[end1:start2]) |
| |
| # Append the format string segment after the last format specifier. |
| string_pieces.append(self.format_string[spec_spans[-1][1]:]) |
| |
| # Make a list with spots for the replacements between the string pieces. |
| segments: List = [None] * (len(string_pieces) + len(self.specifiers)) |
| segments[::2] = string_pieces |
| |
| return segments |
| |
| def decode(self, encoded: bytes) -> Tuple[Sequence[DecodedArg], bytes]: |
| """Decodes arguments according to the format string. |
| |
| Args: |
| encoded: bytes; the encoded arguments |
| |
| Returns: |
| tuple with the decoded arguments and any unparsed data |
| """ |
| decoded_args = [] |
| |
| fatal_error = False |
| index = 0 |
| |
| for spec in self.specifiers: |
| arg = spec.decode(encoded[index:]) |
| |
| if fatal_error: |
| # After an error is encountered, continue to attempt to parse |
| # arguments, but mark them all as SKIPPED. If an error occurs, |
| # it's impossible to know if subsequent arguments are valid. |
| arg.status |= DecodedArg.SKIPPED |
| elif not arg.ok(): |
| fatal_error = True |
| |
| decoded_args.append(arg) |
| index += len(arg.raw_data) |
| |
| return tuple(decoded_args), encoded[index:] |
| |
| def format(self, |
| encoded_args: bytes, |
| show_errors: bool = False) -> FormattedString: |
| """Decodes arguments and formats the string with them. |
| |
| Args: |
| encoded_args: the arguments to decode and format the string with |
| show_errors: if True, an error message is used in place of the % |
| conversion specifier when an argument fails to decode |
| |
| Returns: |
| tuple with the formatted string, decoded arguments, and remaining data |
| """ |
| # Insert formatted arguments in place of each format specifier. |
| args, remaining = self.decode(encoded_args) |
| |
| if show_errors: |
| self._segments[1::2] = (arg.format() for arg in args) |
| else: |
| self._segments[1::2] = (arg.format() |
| if arg.ok() else arg.specifier.specifier |
| for arg in args) |
| |
| return FormattedString(''.join(self._segments), args, remaining) |
| |
| |
| def decode(format_string: str, |
| encoded_arguments: bytes, |
| show_errors: bool = False) -> str: |
| """Decodes arguments and formats them with the provided format string. |
| |
| Args: |
| format_string: the printf-style format string |
| encoded_arguments: encoded arguments with which to format |
| format_string; must exclude the 4-byte string token |
| show_errors: if True, an error message is used in place of the % |
| conversion specifier when an argument fails to decode |
| |
| Returns: |
| the printf-style formatted string |
| """ |
| return FormatString(format_string).format(encoded_arguments, |
| show_errors).value |