blob: 7490f61034f5c018c6bfeadd17ed2ef6827ed2fb [file] [log] [blame]
# Copyright 2024 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Gonk log stream handler."""
from datetime import datetime
import logging
import time
from typing import Callable, Sequence
from google.protobuf.message import DecodeError
from pw_hdlc.decode import FrameDecoder
from pw_log.log_decoder import LogStreamDecoder
from pw_log.proto import log_pb2
from pw_tokenizer import detokenize
import pw_log.log_decoder
from gonk_adc.adc_measurement_pb2 import FramedProto
from gonk_tools.adc import (
ADC_COUNT,
calc_power,
get_bus_voltages,
get_shunt_currents,
)
from gonk_tools.binary_handler import (
BIN_LOG_SYNC_START_BYTES,
BytearrayLoop,
)
_LOG = logging.getLogger('host')
_DEVICE_LOG = logging.getLogger('gonk')
_CSV_LOG = logging.getLogger('gonk.csv')
def emit_python_log(log: pw_log.log_decoder.Log) -> None:
log.metadata_fields['module'] = log.module_name
log.metadata_fields['source_name'] = log.source_name
log.metadata_fields['timestamp'] = log.timestamp
log.metadata_fields['msg'] = log.message
log.metadata_fields['file'] = log.file_and_line
_DEVICE_LOG.log(
log.level,
'%s -- %s',
log.message,
log.file_and_line,
extra=dict(extra_metadata_fields=log.metadata_fields),
)
class GonkLogStream:
"""Handle incoming serial data from Gonk."""
# pylint: disable=too-many-instance-attributes
def __init__(
self,
detokenizer: detokenize.Detokenizer | None = None,
adc_time_format: str = '%Y%m%d %H:%M:%S.%f',
log_message_handlers: Sequence[Callable[[pw_log.log_decoder.Log], None]]
| None = None,
) -> None:
self.detokenizer = detokenizer
self.log_decoder: LogStreamDecoder | None = None
if self.detokenizer:
self.log_decoder = LogStreamDecoder(
decoded_log_handler=emit_python_log, detokenizer=detokenizer
)
self.input_buffer = BytearrayLoop()
self.log_buffer = BytearrayLoop()
self.frame_decoder = FrameDecoder()
self.adc_time_format = adc_time_format
self.remaining_proto_bytes: int | None = None
self.proto_update_count: int = 0
self.proto_update_time = time.monotonic()
self.log_message_handlers: Sequence[
Callable[[pw_log.log_decoder.Log], None]
] = []
self.log_message_handlers.append(self._log_csv_gpio_assert)
self.empty_measurements_str = ',' * (ADC_COUNT * 3 - 1)
if log_message_handlers:
self.log_message_handlers.extend(log_message_handlers)
def _maybe_log_update_rate(self) -> None:
self.proto_update_count += 1
current_time = time.monotonic()
if current_time > self.proto_update_time + 1:
_LOG.info('ADC updates/second: %d', self.proto_update_count)
self.proto_update_count = 0
self.proto_update_time = current_time
def write(self, data: bytes) -> None:
"""Write data and separate protobuf data from logs."""
self.input_buffer.write(data)
logdata = self.input_buffer.read_until(BIN_LOG_SYNC_START_BYTES)
if logdata:
self.log_buffer.write(logdata)
self._handle_log_data()
self._handle_proto_packets()
def _next_proto_packet_size(self) -> int | None:
packet_size: int | None = None
required_bytes = 8
# Expected bytes:
# 1: fixed32 tag
# 4: sync start value bytes
# 1: varint tag
# 2: varint value (one to two bytes)
head_bytes = self.input_buffer.peek(required_bytes)
if (
len(head_bytes) < required_bytes
or BIN_LOG_SYNC_START_BYTES not in head_bytes
):
return packet_size
_sync_start_bytes = head_bytes[:5]
_varint_tag = head_bytes[5:6]
# Decode 1-2 varint bytes.
varint_byte1 = head_bytes[6:7]
varint_byte2 = head_bytes[7:8]
i = int.from_bytes(varint_byte1)
packet_size = i & 0x7F
if (i & 0x80) == 0x80:
# Decode the extra varint byte
packet_size |= (int.from_bytes(varint_byte2) & 0x7F) << 7
else:
# Extra varint byte value not used.
required_bytes -= 1
return required_bytes + packet_size
def _handle_log_data(self) -> None:
log_data = self.log_buffer.read()
for frame in self.frame_decoder.process(log_data):
if not frame.ok():
_LOG.warning(
'Failed to decode frame: %s; discarded %d bytes',
frame,
len(frame.raw_encoded),
)
# Save this data back to the log_buffer
self.log_buffer.write(frame.raw_encoded)
continue
if not self.log_decoder:
_DEVICE_LOG.info('%s', frame)
continue
log_entry = log_pb2.LogEntry()
try:
log_entry.ParseFromString(frame.data)
except DecodeError:
# Try to detokenize the frame data and log the failure.
detokenized_text = ''
if self.detokenizer:
detokenized_text = str(
self.detokenizer.detokenize(frame.data)
)
if detokenized_text:
_LOG.warning(
'Failed to parse log proto "%s" %s',
detokenized_text,
frame,
)
else:
_LOG.warning('Failed to parse log proto %s', frame)
continue
log = self.log_decoder.parse_log_entry_proto(log_entry)
emit_python_log(log)
for log_message_handler in self.log_message_handlers:
log_message_handler(log)
def _handle_proto_packets(self) -> None:
# If no proto has been found, check for a new one.
if self.remaining_proto_bytes is None:
self.remaining_proto_bytes = self._next_proto_packet_size()
# If a proto was found and bytes are pending.
if (
self.remaining_proto_bytes is not None
and self.remaining_proto_bytes > 0
):
# If the remaining bytes are available in the buffer.
if self.remaining_proto_bytes == len(
self.input_buffer.peek(self.remaining_proto_bytes)
):
# Pop the proto
proto_bytes = bytes(
self.input_buffer.read(self.remaining_proto_bytes)
)
# All bytes consumed
self.remaining_proto_bytes = None
self._parse_and_log_adc_proto(proto_bytes)
self._maybe_log_update_rate()
def _get_host_time(self) -> str:
return datetime.now().strftime(self.adc_time_format)
def _parse_and_log_adc_proto(self, proto_bytes: bytes) -> None:
"""Parse an ADC proto message and log."""
framed_proto = FramedProto()
try:
framed_proto.ParseFromString(proto_bytes)
except DecodeError:
_LOG.error('ADC FramedProto.DecodeError: %s', proto_bytes.hex())
return
host_time = self._get_host_time()
packet_size = len(proto_bytes)
delta_micros = framed_proto.payload.timestamp
vbus_values = []
vshunt_values = []
for adc_measure in framed_proto.payload.adc_measurements:
vbus_values.append(adc_measure.vbus_value)
vshunt_values.append(adc_measure.vshunt_value)
delta_micros_str = str(delta_micros)
bus_voltages = get_bus_voltages(vbus_values)
ishunt = get_shunt_currents(vshunt_values)
power_values = calc_power(bus_voltages, ishunt)
vbus_list_str = ', '.join(str(value) for value in bus_voltages)
vshunt_list_str = ', '.join(str(value) for value in ishunt)
power_list_str = ', '.join(str(value) for value in power_values)
_DEVICE_LOG.info(
'host_time: %s size: %s delta_microseconds: %s '
'vbus: %s vshunt: %s power: %s',
host_time,
str(packet_size),
delta_micros_str,
vbus_list_str,
vshunt_list_str,
power_list_str,
extra=dict(
extra_metadata_fields={
'host_time': host_time,
'packet_size': packet_size,
'delta_micros': delta_micros,
'vbus_values': vbus_values,
'vshunt_values': vshunt_values,
'power_values': power_values,
}
),
)
# NOTE: If this format is changed the log_csv_header function
# must be updated in adc.py
_CSV_LOG.info(
'%s, %s, %s, %s, %s, %s',
host_time,
delta_micros_str,
# Volts
vbus_list_str,
# Current
vshunt_list_str,
# Power
power_list_str,
# Empty Comment
' ',
)
def _log_csv_gpio_assert(self, log: pw_log.log_decoder.Log) -> None:
if 'Header pin assert' not in log.message:
return
_CSV_LOG.info(
', '.join(
[
self._get_host_time(),
'0',
# Empty Volts, Current, and Power
self.empty_measurements_str,
# Comment is the log message
log.message.replace(',', ' '),
]
)
)