blob: 072d5e47f5e71a7effbf8fb9407c0683925a68e3 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Wrapers for pyserial classes to log read and write data."""
from contextvars import ContextVar
import logging
import textwrap
import serial # type: ignore
from pw_console.widgets.event_count_history import EventCountHistory
_LOG = logging.getLogger('pw_console.serial_debug_logger')
def _log_hex_strings(data: bytes, prefix=''):
"""Create alinged hex number and character view log messages."""
# Make a list of 2 character hex number strings.
hex_numbers = textwrap.wrap(data.hex(), 2)
hex_chars = [
('<' + str(b.to_bytes(1, byteorder='big')) + '>')
.replace("<b'\\x", '', 1) # Remove b'\x from the beginning
.replace("<b'", '', 1) # Remove b' from the beginning
.replace("'>", '', 1) # Remove ' from the end
.rjust(2)
for b in data
] # yapf: disable
# Replace non-printable bytes with dots.
for i, num in enumerate(hex_numbers):
if num == hex_chars[i]:
hex_chars[i] = '..'
hex_numbers_msg = ' '.join(hex_numbers)
hex_chars_msg = ' '.join(hex_chars)
_LOG.debug('%s%s',
prefix,
hex_numbers_msg,
extra=dict(extra_metadata_fields={
'msg': hex_numbers_msg,
'view': 'hex',
}))
_LOG.debug('%s%s',
prefix,
hex_chars_msg,
extra=dict(extra_metadata_fields={
'msg': hex_chars_msg,
'view': 'chars',
}))
BANDWIDTH_HISTORY_CONTEXTVAR = (ContextVar('pw_console_bandwidth_history',
default={
'total':
EventCountHistory(interval=3),
'read':
EventCountHistory(interval=3),
'write':
EventCountHistory(interval=3),
}))
class SerialWithLogging(serial.Serial): # pylint: disable=too-many-ancestors
"""pyserial with read and write wrappers for logging."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get()
def read(self, *args, **kwargs):
data = super().read(*args, **kwargs)
self.pw_bps_history['read'].log(len(data))
self.pw_bps_history['total'].log(len(data))
if len(data) > 0:
prefix = 'Read %2d B: ' % len(data)
_LOG.debug('%s%s',
prefix,
data,
extra=dict(
extra_metadata_fields={
'mode': 'Read',
'bytes': len(data),
'view': 'bytes',
'msg': str(data),
}))
_log_hex_strings(data, prefix=prefix)
# Print individual lines
for line in data.decode(encoding='utf-8',
errors='ignore').splitlines():
_LOG.debug('%s',
line,
extra=dict(extra_metadata_fields={
'msg': line,
'view': 'lines',
}))
return data
def write(self, data: bytes, *args, **kwargs):
self.pw_bps_history['write'].log(len(data))
self.pw_bps_history['total'].log(len(data))
if len(data) > 0:
prefix = 'Write %2d B: ' % len(data)
_LOG.debug('%s%s',
prefix,
data,
extra=dict(
extra_metadata_fields={
'mode': 'Write',
'bytes': len(data),
'view': 'bytes',
'msg': str(data)
}))
_log_hex_strings(data, prefix=prefix)
# Print individual lines
for line in data.decode(encoding='utf-8',
errors='ignore').splitlines():
_LOG.debug('%s',
line,
extra=dict(extra_metadata_fields={
'msg': line,
'view': 'lines',
}))
super().write(data, *args, **kwargs)