| # Copyright 2024 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Binary log data functions.""" |
| |
| import io |
| import threading |
| from typing import Union |
| |
| BIN_LOG_SYNC_START_STR = '0d99bbaa7d' |
| BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR) |
| |
| |
| class BytearrayLoop: |
| """A bytearray reader writer. |
| |
| Reads occur from the front of the buffer and remove data. Writes append to |
| the end of the buffer. |
| """ |
| |
| def __init__(self, data: bytes = b''): |
| self.buffer = bytearray(data) |
| self.lock = threading.Lock() |
| |
| def read(self, size: int = -1) -> bytearray: |
| """Read bytes from the start of the buffer.""" |
| data = b'' |
| with self.lock: |
| if size < 0: |
| size = len(self.buffer) |
| data = self.buffer[:size] |
| # Delete the data we just fetched |
| self.buffer[:size] = b'' |
| return data |
| |
| def write(self, data: bytes) -> None: |
| """Write bytes to the end of the buffer.""" |
| with self.lock: |
| self.buffer.extend(data) |
| |
| def peek(self, size: int = 0) -> bytearray: |
| """Peek into bytes from the start.""" |
| return self.buffer[:size] |
| |
| def __len__(self) -> int: |
| return len(self.buffer) |
| |
| def getbuffer(self) -> bytearray: |
| """Return the whole buffer.""" |
| return self.buffer |
| |
| def read_until(self, sub_sequece: bytes) -> bytearray: |
| """Pop all data from the buffer until a subsequence is found.""" |
| # Return nothing if the buffer is smaller than the provided subsequence. |
| if len(sub_sequece) > len(self.buffer): |
| return bytearray() |
| position = self.buffer.find(sub_sequece) |
| # If no subsequece is found, read all data. |
| if position < 0: |
| return self.read() |
| return self.read(position) |
| |
| |
| _RawIo = Union[io.RawIOBase, BytearrayLoop] |
| |
| |
| def decode_one_varint(stream: _RawIo) -> int: |
| """Read a single varint from a byte stream.""" |
| # Track the amount shifted. |
| shift = 0 |
| # Resulting varint. |
| result = 0 |
| while True: |
| # Get one byte |
| b = stream.read(1) |
| if not b: |
| break |
| # Read as an integer. Note: byte order is little endian but we are only |
| # reading one byte at a time. |
| i = int.from_bytes(b, byteorder='little') |
| # Shift first 7 bits from this byte into the result. |
| result |= (i & 0x7F) << shift |
| shift += 7 |
| # Stop if the highest bit is not set. |
| if not i & 0x80: |
| break |
| |
| return result |