blob: db85c92bacf8e9c9212af847c4636d30b401816f [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Decoder class for decoding bytes using HDLC protocol"""
import enum
import logging
import threading
import time
from typing import Iterable, Optional, Callable, Any
import zlib
from pw_hdlc import protocol
_LOG = logging.getLogger('pw_hdlc')
NO_ADDRESS = -1
_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32
_FLAG_BYTE = bytes([protocol.FLAG])
class FrameStatus(enum.Enum):
"""Indicates that an error occurred."""
OK = 'OK'
FCS_MISMATCH = 'frame check sequence failure'
FRAMING_ERROR = 'invalid flag or escape characters'
BAD_ADDRESS = 'address field too long'
class Frame:
"""Represents an HDLC frame."""
def __init__(self,
raw_encoded: bytes,
raw_decoded: bytes,
status: FrameStatus = FrameStatus.OK):
"""Parses fields from an HDLC frame.
Arguments:
raw_encoded: The complete HDLC-encoded frame, including any HDLC
flag bytes. In the case of back to back frames, the
beginning flag byte may be omitted.
raw_decoded: The complete decoded frame (address, control,
information, FCS).
status: Whether parsing the frame succeeded.
"""
self.raw_encoded = raw_encoded
self.raw_decoded = raw_decoded
self.status = status
self.address: int = NO_ADDRESS
self.control: bytes = b''
self.data: bytes = b''
if status == FrameStatus.OK:
address, address_length = protocol.decode_address(raw_decoded)
if address_length == 0:
self.status = FrameStatus.BAD_ADDRESS
return
self.address = address
self.control = raw_decoded[address_length:address_length + 1]
self.data = raw_decoded[address_length + 1:-4]
def ok(self) -> bool:
"""True if this represents a valid frame.
If false, then parsing failed. The status is set to indicate what type
of error occurred, and the data field contains all bytes parsed from the
frame (including bytes parsed as address or control bytes).
"""
return self.status is FrameStatus.OK
def __repr__(self) -> str:
if self.ok():
body = (f'address={self.address}, control={self.control!r}, '
f'data={self.data!r}')
else:
body = (f'raw_encoded={self.raw_encoded!r}, '
f'status={str(self.status)}')
return f'{type(self).__name__}({body})'
class _State(enum.Enum):
INTERFRAME = 0
FRAME = 1
FRAME_ESCAPE = 2
def _check_frame(frame_data: bytes) -> FrameStatus:
if len(frame_data) < _MIN_FRAME_SIZE:
return FrameStatus.FRAMING_ERROR
frame_crc = int.from_bytes(frame_data[-4:], 'little')
if zlib.crc32(frame_data[:-4]) != frame_crc:
return FrameStatus.FCS_MISMATCH
return FrameStatus.OK
class FrameDecoder:
"""Decodes one or more HDLC frames from a stream of data."""
def __init__(self) -> None:
self._decoded_data = bytearray()
self._raw_data = bytearray()
self._state = _State.INTERFRAME
def process(self, data: bytes) -> Iterable[Frame]:
"""Decodes and yields HDLC frames, including corrupt frames.
The ok() method on Frame indicates whether it is valid or represents a
frame parsing error.
Yields:
Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
"""
for byte in data:
frame = self.process_byte(byte)
if frame:
yield frame
def process_valid_frames(self, data: bytes) -> Iterable[Frame]:
"""Decodes and yields valid HDLC frames, logging any errors."""
for frame in self.process(data):
if frame.ok():
yield frame
else:
_LOG.warning('Failed to decode frame: %s; discarded %d bytes',
frame.status.value, len(frame.raw_encoded))
_LOG.debug('Discarded data: %s', frame.raw_encoded)
def _finish_frame(self, status: FrameStatus) -> Frame:
# HDLC frames always start and end with a flag character, though the
# character may be shared with other frames. Ensure the raw encoding of
# OK frames always includes the start and end flags for consistency.
if status is FrameStatus.OK:
if not self._raw_data.startswith(_FLAG_BYTE):
self._raw_data.insert(0, protocol.FLAG)
frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
self._raw_data.clear()
self._decoded_data.clear()
return frame
def process_byte(self, byte: int) -> Optional[Frame]:
"""Processes a single byte and returns a frame if one was completed."""
frame: Optional[Frame] = None
self._raw_data.append(byte)
if self._state is _State.INTERFRAME:
if byte == protocol.FLAG:
if len(self._raw_data) != 1:
frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
self._state = _State.FRAME
elif self._state is _State.FRAME:
if byte == protocol.FLAG:
# On back to back frames, we may see a repeated FLAG byte.
if len(self._raw_data) > 1:
frame = self._finish_frame(_check_frame(
self._decoded_data))
self._state = _State.FRAME
elif byte == protocol.ESCAPE:
self._state = _State.FRAME_ESCAPE
else:
self._decoded_data.append(byte)
elif self._state is _State.FRAME_ESCAPE:
if byte == protocol.FLAG:
frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
self._state = _State.FRAME
elif byte in protocol.VALID_ESCAPED_BYTES:
self._state = _State.FRAME
self._decoded_data.append(protocol.escape(byte))
else:
self._state = _State.INTERFRAME
else:
raise AssertionError(f'Invalid decoder state: {self._state}')
return frame
class FrameAndNonFrameDecoder:
"""Processes both HDLC frames and non-frame data in a stream."""
def __init__(self,
non_frame_data_handler: Callable[[bytes], Any],
*,
mtu: int = None,
timeout_s: float = None,
handle_shared_flags: bool = True) -> None:
"""Yields valid HDLC frames and passes non-frame data to callback.
Args:
mtu: Maximum bytes to receive before flushing raw data. If a valid
HDLC frame contains more than MTU bytes, the valid frame will be
emitted, but part of the frame will be included in the raw data.
timeout_s: How long to wait before automatically flushing raw data. If
a timeout occurs partway through a valid frame, the frame will be
emitted, but part of the frame will be included in the raw data.
handle_shared_flags: Whether to permit HDLC frames to share a single
flag byte between frames. If False, partial HDLC frames may be
emitted as raw data when HDLC frames share a flag byte, but raw
data won't have to wait for a timeout or full MTU to be flushed.
"""
self._non_frame_data_handler = non_frame_data_handler
self._mtu = mtu
self._shared_flags = handle_shared_flags
self._timeout_s = timeout_s
self._raw_data = bytearray()
self._hdlc_decoder = FrameDecoder()
self._last_data_time = time.time()
self._lock = threading.Lock()
if self._timeout_s is not None:
threading.Thread(target=self._timeout_thread, daemon=True).start()
def flush_non_frame_data(self) -> None:
"""Flushes any data in the buffer as non-frame data.
If a valid HDLC frame was flushed partway, the data for the first part
of the frame will be included both in the raw data and in the frame.
"""
with self._lock:
self._flush_non_frame()
def _flush_non_frame(self, to_index: int = None):
if self._raw_data:
self._non_frame_data_handler(bytes(self._raw_data[:to_index]))
del self._raw_data[:to_index]
def _timeout_thread(self) -> None:
assert self._timeout_s is not None
while True:
time.sleep(self._timeout_s)
with self._lock:
if time.time() - self._last_data_time > self._timeout_s:
self._flush_non_frame()
def process(self, data: bytes) -> Iterable[Frame]:
"""Processes a stream of mixed HDLC and unstructured data.
Yields OK frames and calls non_frame_data_handler with non-HDLC data.
"""
with self._lock:
for byte in data:
yield from self._process_byte(byte)
# Flush the data if it is larger than the MTU, or flag bytes are not
# being shared and no initial flag was seen.
if (self._mtu is not None and len(self._raw_data) > self._mtu) or (
not self._shared_flags
and not self._raw_data.startswith(_FLAG_BYTE)):
self._flush_non_frame()
self._last_data_time = time.time()
def _process_byte(self, byte: int) -> Iterable[Frame]:
self._raw_data.append(byte)
frame = self._hdlc_decoder.process_byte(byte)
if frame is None:
return
if frame.ok():
# Drop the valid frame from the data. Only drop matching bytes in
# case the frame was flushed prematurely.
for suffix_byte in reversed(frame.raw_encoded):
if not self._raw_data or self._raw_data[-1] != suffix_byte:
break
self._raw_data.pop()
self._flush_non_frame() # Flush the raw data before the frame.
if self._mtu is not None and len(frame.raw_encoded) > self._mtu:
_LOG.warning(
'Found a valid %d B HDLC frame, but the MTU is set to %d! '
'The MTU setting may be incorrect.', self._mtu,
len(frame.raw_encoded))
yield frame
else:
# Don't flush a final flag byte yet because it might be the start of
# an HDLC frame.
to_index = -1 if self._raw_data[-1] == protocol.FLAG else None
self._flush_non_frame(to_index)