blob: 83f3f3a66d5ae3120508f232fb7c36769ddfb7fd [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Decoder class for decoding bytes using HDLC-Lite protocol"""
import enum
import logging
import zlib
from typing import Iterator, NamedTuple, Optional, Tuple
from pw_hdlc_lite import protocol
_LOG = logging.getLogger('pw_hdlc_lite')
class FrameStatus(enum.Enum):
"""Indicates that an error occurred."""
OK = 'OK'
FCS_MISMATCH = 'frame check sequence failure'
INCOMPLETE = 'incomplete frame'
INVALID_ESCAPE = 'invalid escape character'
_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32
NO_ADDRESS = -1
class Frame(NamedTuple):
"""Represents an HDLC frame."""
# All bytes in the frame (address, control, information, FCS)
raw: bytes
# Whether parsing the frame succeeded.
status: FrameStatus = FrameStatus.OK
@property
def address(self) -> int:
"""The frame's address field (assumes only one byte for now)."""
return self.raw[0] if self.raw else NO_ADDRESS
@property
def control(self) -> bytes:
"""The control byte (assumes only one byte for now)."""
return self.raw[1:2] if len(self.raw) >= 2 else b''
@property
def data(self) -> bytes:
"""The information field in the frame."""
return self.raw[2:-4] if len(self.raw) >= _MIN_FRAME_SIZE else b''
def ok(self) -> bool:
"""True if this represents a valid frame.
If false, then parsing failed. The status is set to indicate what type
of error occurred, and the data field contains all bytes parsed from the
frame (including bytes parsed as address or control bytes).
"""
return self.status is FrameStatus.OK
class _BaseFrameState:
"""Base class for all frame parsing states."""
def __init__(self, data: bytearray):
self._data = data # All data seen in the current frame
self._escape_next = False
def handle_flag(self) -> Tuple['_BaseFrameState', Optional[Frame]]:
"""Handles an HDLC flag character (0x7e).
The HDLC flag is always interpreted as the start of a new frame.
Returns:
(next state, optional frame or error)
"""
# If there is data or an escape character, the frame is incomplete.
if self._escape_next or self._data:
return _AddressState(), Frame(bytes(self._data),
FrameStatus.INCOMPLETE)
return _AddressState(), None
def handle_escape(self) -> '_BaseFrameState':
"""Handles an HDLC escape character (0x7d); returns the next state."""
if self._escape_next:
# If two escapes occur in a row, the frame is invalid.
return _InterframeState(self._data, FrameStatus.INVALID_ESCAPE)
self._escape_next = True
return self
def handle_byte(self, byte: int) -> '_BaseFrameState':
"""Handles a byte, which may have been escaped; returns next state."""
self._data.append(protocol.escape(byte) if self._escape_next else byte)
self._escape_next = False
return self
class _InterframeState(_BaseFrameState):
"""Not currently in a frame; any data is discarded."""
def __init__(self, data: bytearray, error: FrameStatus):
super().__init__(data)
self._error = error
def handle_flag(self) -> Tuple[_BaseFrameState, Optional[Frame]]:
# If this state was entered due to an error, report that error before
# starting a new frame.
if self._error is not FrameStatus.OK:
return _AddressState(), Frame(bytes(self._data), self._error)
return super().handle_flag()
class _AddressState(_BaseFrameState):
"""First field in a frame: the address."""
def __init__(self):
super().__init__(bytearray())
def handle_byte(self, byte: int) -> _BaseFrameState:
super().handle_byte(byte)
# Only handle single-byte addresses for now.
return _ControlState(self._data)
class _ControlState(_BaseFrameState):
"""Second field in a frame: control."""
def handle_byte(self, byte: int) -> _BaseFrameState:
super().handle_byte(byte)
# Only handle a single control byte for now.
return _DataState(self._data)
class _DataState(_BaseFrameState):
"""The information field in a frame."""
def handle_flag(self) -> Tuple[_BaseFrameState, Frame]:
return _AddressState(), Frame(bytes(self._data), self._check_frame())
def _check_frame(self) -> FrameStatus:
# If the last character was an escape, assume bytes are missing.
if self._escape_next or len(self._data) < _MIN_FRAME_SIZE:
return FrameStatus.INCOMPLETE
frame_crc = int.from_bytes(self._data[-4:], 'little')
if zlib.crc32(self._data[:-4]) != frame_crc:
return FrameStatus.FCS_MISMATCH
return FrameStatus.OK
class FrameDecoder:
"""Decodes one or more HDLC frames from a stream of data."""
def __init__(self):
self._data = bytearray()
self._unescape_next_byte_flag = False
self._state = _InterframeState(bytearray(), FrameStatus.OK)
def process(self, data: bytes) -> Iterator[Frame]:
"""Decodes and yields HDLC frames, including corrupt frames.
The ok() method on Frame indicates whether it is valid or represents a
frame parsing error.
Yields:
Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
"""
for byte in data:
frame = self._process_byte(byte)
if frame:
yield frame
def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
"""Decodes and yields valid HDLC frames, logging any errors."""
for frame in self.process(data):
if frame.ok():
yield frame
else:
_LOG.warning('Failed to decode frame: %s; discarded %d bytes',
frame.status.value, len(frame.data))
_LOG.debug('Discarded data: %s', frame.data)
def _process_byte(self, byte: int) -> Optional[Frame]:
if byte == protocol.FLAG:
self._state, frame = self._state.handle_flag()
return frame
if byte == protocol.ESCAPE:
self._state = self._state.handle_escape()
else:
self._state = self._state.handle_byte(byte)
return None