blob: ead60b7aae2d9bbf0f93613a944b3b3340fc17ba [file] [log] [blame]
# Copyright 2024 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Binary log data functions."""
import io
import threading
from typing import Union
BIN_LOG_SYNC_START_STR = '0d99bbaa7d'
BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR)
class BytearrayLoop:
"""A bytearray reader writer.
Reads occur from the front of the buffer and remove data. Writes append to
the end of the buffer.
"""
def __init__(self, data: bytes = b''):
self.buffer = bytearray(data)
self.lock = threading.Lock()
def read(self, size: int = -1) -> bytearray:
"""Read bytes from the start of the buffer."""
data = b''
with self.lock:
if size < 0:
size = len(self.buffer)
data = self.buffer[:size]
# Delete the data we just fetched
self.buffer[:size] = b''
return data
def write(self, data: bytes) -> None:
"""Write bytes to the end of the buffer."""
with self.lock:
self.buffer.extend(data)
def peek(self, size: int = 0) -> bytearray:
"""Peek into bytes from the start."""
return self.buffer[:size]
def __len__(self) -> int:
return len(self.buffer)
def getbuffer(self) -> bytearray:
"""Return the whole buffer."""
return self.buffer
def read_until(self, sub_sequece: bytes) -> bytearray:
"""Pop all data from the buffer until a subsequence is found."""
# Return nothing if the buffer is smaller than the provided subsequence.
if len(sub_sequece) > len(self.buffer):
return bytearray()
position = self.buffer.find(sub_sequece)
# If no subsequece is found, read all data.
if position < 0:
return self.read()
return self.read(position)
_RawIo = Union[io.RawIOBase, BytearrayLoop]
def decode_one_varint(stream: _RawIo) -> int:
"""Read a single varint from a byte stream."""
# Track the amount shifted.
shift = 0
# Resulting varint.
result = 0
while True:
# Get one byte
b = stream.read(1)
if not b:
break
# Read as an integer. Note: byte order is little endian but we are only
# reading one byte at a time.
i = int.from_bytes(b, byteorder='little')
# Shift first 7 bits from this byte into the result.
result |= (i & 0x7f) << shift
shift += 7
# Stop if the highest bit is not set.
if not i & 0x80:
break
return result