blob: 1779ad0dcb4fb8f51020fe3c3f114e7f13d1ceeb [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Wrapers for pyserial classes to log read and write data."""
import collections
import logging
from dataclasses import dataclass
import time
from typing import Optional
_LOG = logging.getLogger('pw_console')
@dataclass
class EventCountHistory:
"""Track counts of events over time.
Example usage: ::
events = EventCountHistory(
base_count_units='Bytes',
display_unit_title='KiB/s',
display_unit_factor=0.001,
interval=1.0,
show_sparkline=True)
# Log 1 event now.
events.log(1)
time.sleep(1)
# Log 100 events at this time.
events.log(100)
time.sleep(1)
events.log(200)
time.sleep(1)
events.log(400)
print(events)
▂▄█ 0.400 [KiB/s]
"""
base_count_units: str = 'Bytes'
display_unit_title: str = 'KiB/s'
display_unit_factor: float = 0.001
interval: float = 1.0 # Number of seconds per sum of events.
history_limit: int = 20
scale_characters = ' ▁▂▃▄▅▆▇█'
history: collections.deque = collections.deque()
show_sparkline: bool = False
_this_count: int = 0
_last_count: int = 0
_last_update_time: float = time.time()
def log(self, count: int) -> None:
self._this_count += count
this_time = time.time()
if this_time - self._last_update_time >= self.interval:
self._last_update_time = this_time
self._last_count = self._this_count
self._this_count = 0
self.history.append(self._last_count)
if len(self.history) > self.history_limit:
self.history.popleft()
def last_count(self) -> float:
return self._last_count * self.display_unit_factor
def last_count_raw(self) -> int:
return self._last_count
def last_count_with_units(self) -> str:
return '{:.3f} [{}]'.format(
self._last_count * self.display_unit_factor,
self.display_unit_title)
def __repr__(self) -> str:
sparkline = ''
if self.show_sparkline:
sparkline = self.sparkline()
return ' '.join([sparkline, self.last_count_with_units()])
def __pt_formatted_text__(self):
return [('', self.__repr__())]
def sparkline(self,
min_value: int = 0,
max_value: Optional[int] = None) -> str:
msg = ''.rjust(self.history_limit)
if len(self.history) == 0:
return msg
minimum = min_value
maximum = max_value if max_value else max(self.history)
max_minus_min = maximum - min_value
if max_minus_min == 0:
return msg
msg = ''
for i in self.history:
# (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
index = int((((1.0 * i) - minimum) / max_minus_min) *
len(self.scale_characters))
if index >= len(self.scale_characters):
index = len(self.scale_characters) - 1
msg += self.scale_characters[index]
return msg.rjust(self.history_limit)