blob: e9e439004963ea82ce1ccef54493a062c21539e3 [file] [log] [blame]
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Library to analyze timestamps."""
from datetime import datetime, timedelta, timezone
import logging
import pw_tokenizer
from pw_chrono_protos import chrono_pb2
from pw_tokenizer import proto as proto_detokenizer
_LOG = logging.getLogger(__name__)
def process_snapshot(
serialized_snapshot: bytes,
detokenizer: pw_tokenizer.Detokenizer | None = None,
):
snapshot = chrono_pb2.SnapshotTimestamps()
snapshot.ParseFromString(serialized_snapshot)
output: list[str] = []
for timestamp in snapshot.timestamps or []:
proto_detokenizer.detokenize_fields(
detokenizer, timestamp.clock_parameters
)
try:
time_info = TimePointInfo(timestamp)
output.append(f' {time_info.name_str()}: {time_info}')
except ValueError:
_LOG.warning('Failed to decode timestamp:\n%s', str(timestamp))
if not output:
return ''
plural = '' if len(output) == 1 else 's'
output.insert(0, f'Snapshot capture timestamp{plural}')
output.append('')
return '\n'.join(output)
class TimePointInfo:
"""Decodes pw.chrono.TimePoint protos into various representations."""
def __init__(self, timepoint: chrono_pb2.TimePoint):
self._timepoint = timepoint
parameters = self._timepoint.clock_parameters
if (
parameters.tick_period_seconds_denominator == 0
or parameters.tick_period_seconds_numerator == 0
):
raise ValueError('Invalid timepoint')
self._timepoint = timepoint
def ticks_per_sec(self) -> float:
parameters = self._timepoint.clock_parameters
return (
parameters.tick_period_seconds_denominator
/ parameters.tick_period_seconds_numerator
)
def period_suffix(self) -> str:
return {
1.0: 's',
1000.0: 'ms',
1_000_000.0: 'us',
1_000_000_000.0: 'ns',
}.get(self.ticks_per_sec(), '')
def duration(self) -> timedelta:
return timedelta(
seconds=self._timepoint.timestamp / self.ticks_per_sec()
)
def as_unix_time(self, tz: timezone = timezone.utc) -> datetime:
return datetime.fromtimestamp(self.duration().total_seconds(), tz)
def tick_count_str(self) -> str:
return f'{self._timepoint.timestamp} {self.period_suffix()}'.rstrip()
def __str__(self) -> str:
epoch_type = self._timepoint.clock_parameters.epoch_type
if epoch_type == chrono_pb2.EpochType.Enum.TIME_SINCE_BOOT:
return f'{self.duration()} ({self.tick_count_str()})'
if epoch_type == chrono_pb2.EpochType.Enum.UTC_WALL_CLOCK:
return f'{self.as_unix_time()} ({self.tick_count_str()})'
return self.tick_count_str()
def name_str(self) -> str:
parameters = self._timepoint.clock_parameters
try:
epoch_str = chrono_pb2.EpochType.Enum.Name(parameters.epoch_type)
except ValueError:
epoch_str = str(parameters.epoch_type)
if parameters.name:
return f'{parameters.name.decode()} (epoch {epoch_str})'
if parameters.epoch_type == chrono_pb2.EpochType.Enum.TIME_SINCE_BOOT:
return 'Time since boot'
if parameters.epoch_type == chrono_pb2.EpochType.Enum.UTC_WALL_CLOCK:
return 'UTC time'
return f'Timestamp (epoch {epoch_str})'