|  | # Copyright 2024 The Pigweed Authors | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); you may not | 
|  | # use this file except in compliance with the License. You may obtain a copy of | 
|  | # the License at | 
|  | # | 
|  | #     https://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
|  | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
|  | # License for the specific language governing permissions and limitations under | 
|  | # the License. | 
|  | """Binary log data functions.""" | 
|  |  | 
|  | import io | 
|  | import threading | 
|  | from typing import Union | 
|  |  | 
|  | BIN_LOG_SYNC_START_STR = '0d99bbaa7d' | 
|  | BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR) | 
|  |  | 
|  |  | 
|  | class BytearrayLoop: | 
|  | """A bytearray reader writer. | 
|  |  | 
|  | Reads occur from the front of the buffer and remove data. Writes append to | 
|  | the end of the buffer. | 
|  | """ | 
|  | def __init__(self, data: bytes = b''): | 
|  | self.buffer = bytearray(data) | 
|  | self.lock = threading.Lock() | 
|  |  | 
|  | def read(self, size: int = -1) -> bytearray: | 
|  | """Read bytes from the start of the buffer.""" | 
|  | data = b'' | 
|  | with self.lock: | 
|  | if size < 0: | 
|  | size = len(self.buffer) | 
|  | data = self.buffer[:size] | 
|  | # Delete the data we just fetched | 
|  | self.buffer[:size] = b'' | 
|  | return data | 
|  |  | 
|  | def write(self, data: bytes) -> None: | 
|  | """Write bytes to the end of the buffer.""" | 
|  | with self.lock: | 
|  | self.buffer.extend(data) | 
|  |  | 
|  | def peek(self, size: int = 0) -> bytearray: | 
|  | """Peek into bytes from the start.""" | 
|  | return self.buffer[:size] | 
|  |  | 
|  | def __len__(self) -> int: | 
|  | return len(self.buffer) | 
|  |  | 
|  | def getbuffer(self) -> bytearray: | 
|  | """Return the whole buffer.""" | 
|  | return self.buffer | 
|  |  | 
|  | def read_until(self, sub_sequece: bytes) -> bytearray: | 
|  | """Pop all data from the buffer until a subsequence is found.""" | 
|  | # Return nothing if the buffer is smaller than the provided subsequence. | 
|  | if len(sub_sequece) > len(self.buffer): | 
|  | return bytearray() | 
|  | position = self.buffer.find(sub_sequece) | 
|  | # If no subsequece is found, read all data. | 
|  | if position < 0: | 
|  | return self.read() | 
|  | return self.read(position) | 
|  |  | 
|  |  | 
|  | _RawIo = Union[io.RawIOBase, BytearrayLoop] | 
|  |  | 
|  |  | 
|  | def decode_one_varint(stream: _RawIo) -> int: | 
|  | """Read a single varint from a byte stream.""" | 
|  | # Track the amount shifted. | 
|  | shift = 0 | 
|  | # Resulting varint. | 
|  | result = 0 | 
|  | while True: | 
|  | # Get one byte | 
|  | b = stream.read(1) | 
|  | if not b: | 
|  | break | 
|  | # Read as an integer. Note: byte order is little endian but we are only | 
|  | # reading one byte at a time. | 
|  | i = int.from_bytes(b, byteorder='little') | 
|  | # Shift first 7 bits from this byte into the result. | 
|  | result |= (i & 0x7f) << shift | 
|  | shift += 7 | 
|  | # Stop if the highest bit is not set. | 
|  | if not i & 0x80: | 
|  | break | 
|  |  | 
|  | return result |