| # Copyright 2024 The Pigweed Authors | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); you may not | 
 | # use this file except in compliance with the License. You may obtain a copy of | 
 | # the License at | 
 | # | 
 | #     https://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
 | # License for the specific language governing permissions and limitations under | 
 | # the License. | 
 | """Gonk log stream handler.""" | 
 |  | 
 | from datetime import datetime | 
 | import logging | 
 | import time | 
 | from typing import Optional | 
 |  | 
 | from google.protobuf.message import DecodeError | 
 | from pw_hdlc.decode import FrameDecoder | 
 | from pw_log.log_decoder import LogStreamDecoder | 
 | from pw_log.proto import log_pb2 | 
 | from pw_tokenizer import detokenize | 
 | import pw_log.log_decoder | 
 |  | 
 | from gonk_adc.adc_measurement_pb2 import FramedProto | 
 | from gonk_tools.binary_handler import ( | 
 |     BIN_LOG_SYNC_START_BYTES, | 
 |     BytearrayLoop, | 
 | ) | 
 |  | 
 | _LOG = logging.getLogger('host') | 
 | _DEVICE_LOG = logging.getLogger('gonk') | 
 |  | 
 |  | 
 | def emit_python_log(log: pw_log.log_decoder.Log) -> None: | 
 |     log.metadata_fields['module'] = log.module_name | 
 |     log.metadata_fields['source_name'] = log.source_name | 
 |     log.metadata_fields['timestamp'] = log.timestamp | 
 |     log.metadata_fields['msg'] = log.message | 
 |     log.metadata_fields['file'] = log.file_and_line | 
 |  | 
 |     _DEVICE_LOG.log( | 
 |         log.level, | 
 |         '%s -- %s', | 
 |         log.message, | 
 |         log.file_and_line, | 
 |         extra=dict(extra_metadata_fields=log.metadata_fields), | 
 |     ) | 
 |  | 
 |  | 
 | class GonkLogStream: | 
 |     """Handle incoming serial data from Gonk.""" | 
 |  | 
 |     # pylint: disable=too-many-instance-attributes | 
 |     def __init__( | 
 |         self, | 
 |         detokenizer: Optional[detokenize.Detokenizer] = None, | 
 |         adc_time_format: str = '%Y%m%d %H:%M:%S.%f', | 
 |     ) -> None: | 
 |         self.detokenizer = detokenizer | 
 |         self.log_decoder: Optional[LogStreamDecoder] = None | 
 |         if self.detokenizer: | 
 |             self.log_decoder = LogStreamDecoder( | 
 |                 decoded_log_handler=emit_python_log, detokenizer=detokenizer) | 
 |         self.input_buffer = BytearrayLoop() | 
 |         self.log_buffer = BytearrayLoop() | 
 |         self.frame_decoder = FrameDecoder() | 
 |         self.adc_time_format = adc_time_format | 
 |  | 
 |         self.remaining_proto_bytes: Optional[int] = None | 
 |  | 
 |         self.proto_update_count: int = 0 | 
 |         self.proto_update_time = time.monotonic() | 
 |  | 
 |     def _maybe_log_update_rate(self) -> None: | 
 |         self.proto_update_count += 1 | 
 |         current_time = time.monotonic() | 
 |         if current_time > self.proto_update_time + 1: | 
 |             _LOG.info('ADC updates/second: %d', self.proto_update_count) | 
 |             self.proto_update_count = 0 | 
 |             self.proto_update_time = current_time | 
 |  | 
 |     def write(self, data: bytes) -> None: | 
 |         """Write data and separate protobuf data from logs.""" | 
 |         self.input_buffer.write(data) | 
 |  | 
 |         logdata = self.input_buffer.read_until(BIN_LOG_SYNC_START_BYTES) | 
 |         if logdata: | 
 |             self.log_buffer.write(logdata) | 
 |  | 
 |         self._handle_log_data() | 
 |         self._handle_proto_packets() | 
 |  | 
 |     def _next_proto_packet_size(self) -> Optional[int]: | 
 |         packet_size: Optional[int] = None | 
 |  | 
 |         required_bytes = 8 | 
 |         # Expected bytes: | 
 |         # 1: fixed32 tag | 
 |         # 4: sync start value bytes | 
 |         # 1: varint tag | 
 |         # 2: varint value (one to two bytes) | 
 |         head_bytes = self.input_buffer.peek(required_bytes) | 
 |         if len( | 
 |                 head_bytes | 
 |         ) < required_bytes or BIN_LOG_SYNC_START_BYTES not in head_bytes: | 
 |             return packet_size | 
 |  | 
 |         _sync_start_bytes = head_bytes[:5] | 
 |         _varint_tag = head_bytes[5:6] | 
 |         # Decode 1-2 varint bytes. | 
 |         varint_byte1 = head_bytes[6:7] | 
 |         varint_byte2 = head_bytes[7:8] | 
 |         i = int.from_bytes(varint_byte1) | 
 |         packet_size = i & 0x7f | 
 |         if (i & 0x80) == 0x80: | 
 |             # Decode the extra varint byte | 
 |             packet_size |= (int.from_bytes(varint_byte2) & 0x7f) << 7 | 
 |         else: | 
 |             # Extra varint byte value not used. | 
 |             required_bytes -= 1 | 
 |  | 
 |         return required_bytes + packet_size | 
 |  | 
 |     def _handle_log_data(self) -> None: | 
 |         log_data = self.log_buffer.read() | 
 |         for frame in self.frame_decoder.process(log_data): | 
 |             if not frame.ok(): | 
 |                 _LOG.warning( | 
 |                     'Failed to decode frame: %s; discarded %d bytes', | 
 |                     frame, | 
 |                     len(frame.raw_encoded), | 
 |                 ) | 
 |                 # Save this data back to the log_buffer | 
 |                 self.log_buffer.write(frame.raw_encoded) | 
 |                 continue | 
 |  | 
 |             if not self.log_decoder: | 
 |                 _DEVICE_LOG.info('%s', frame) | 
 |                 continue | 
 |  | 
 |             log_entry = log_pb2.LogEntry() | 
 |             try: | 
 |                 log_entry.ParseFromString(frame.data) | 
 |             except DecodeError: | 
 |                 # Try to detokenize the frame data and log the failure. | 
 |                 detokenized_text = '' | 
 |                 if self.detokenizer: | 
 |                     detokenized_text = str( | 
 |                         self.detokenizer.detokenize(frame.data)) | 
 |                 if detokenized_text: | 
 |                     _LOG.warning('Failed to parse log proto "%s" %s', | 
 |                                  detokenized_text, frame) | 
 |                 else: | 
 |                     _LOG.warning('Failed to parse log proto %s', frame) | 
 |                 continue | 
 |  | 
 |             log = self.log_decoder.parse_log_entry_proto(log_entry) | 
 |             emit_python_log(log) | 
 |  | 
 |     def _handle_proto_packets(self) -> None: | 
 |         # If no proto has been found, check for a new one. | 
 |         if self.remaining_proto_bytes is None: | 
 |             self.remaining_proto_bytes = self._next_proto_packet_size() | 
 |  | 
 |         # If a proto was found and bytes are pending. | 
 |         if (self.remaining_proto_bytes is not None | 
 |                 and self.remaining_proto_bytes > 0): | 
 |  | 
 |             # If the remaining bytes are available in the buffer. | 
 |             if self.remaining_proto_bytes == len( | 
 |                     self.input_buffer.peek(self.remaining_proto_bytes)): | 
 |                 # Pop the proto | 
 |                 proto_bytes = bytes( | 
 |                     self.input_buffer.read(self.remaining_proto_bytes)) | 
 |                 # All bytes consumed | 
 |                 self.remaining_proto_bytes = None | 
 |                 self._parse_and_log_adc_proto(proto_bytes) | 
 |                 self._maybe_log_update_rate() | 
 |  | 
 |     def _parse_and_log_adc_proto(self, proto_bytes: bytes) -> None: | 
 |         """Parse an ADC proto message and log.""" | 
 |         framed_proto = FramedProto() | 
 |  | 
 |         try: | 
 |             framed_proto.ParseFromString(proto_bytes) | 
 |         except DecodeError: | 
 |             _LOG.error('ADC FramedProto.DecodeError: %s', proto_bytes.hex()) | 
 |             return | 
 |  | 
 |         host_time = datetime.now().strftime(self.adc_time_format) | 
 |         packet_size = len(proto_bytes) | 
 |         delta_micros = framed_proto.payload.timestamp | 
 |  | 
 |         vbus_values = [] | 
 |         vshunt_values = [] | 
 |         for adc_measure in framed_proto.payload.adc_measurements: | 
 |             vbus_values.append(adc_measure.vbus_value) | 
 |             vshunt_values.append(adc_measure.vshunt_value) | 
 |  | 
 |         _DEVICE_LOG.info( | 
 |             'host_time: %s size: %s delta_microseconds: %s ' | 
 |             'vbus: %s vshunt: %s', | 
 |             host_time, | 
 |             str(packet_size), | 
 |             str(delta_micros), | 
 |             ','.join(str(value) for value in vshunt_values), | 
 |             ','.join(str(value) for value in vbus_values), | 
 |             extra=dict( | 
 |                 extra_metadata_fields={ | 
 |                     'host_time': host_time, | 
 |                     'packet_size': packet_size, | 
 |                     'delta_micros': delta_micros, | 
 |                     'vbus_values': vbus_values, | 
 |                     'vshunt_values': vshunt_values, | 
 |                 }), | 
 |         ) |