blob: 3c5b487a633d55c22eef77d0da0e5ce197101f5c [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Decoder class for decoding bytes using HDLC protocol"""
import enum
import logging
from typing import Iterator, Optional
import zlib
from pw_hdlc import protocol
_LOG = logging.getLogger('pw_hdlc')
NO_ADDRESS = -1
_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32
class FrameStatus(enum.Enum):
"""Indicates that an error occurred."""
OK = 'OK'
FCS_MISMATCH = 'frame check sequence failure'
FRAMING_ERROR = 'invalid flag or escape characters'
BAD_ADDRESS = 'address field too long'
class Frame:
"""Represents an HDLC frame."""
def __init__(self,
raw_encoded: bytes,
raw_decoded: bytes,
status: FrameStatus = FrameStatus.OK):
"""Parses fields from an HDLC frame.
Arguments:
raw_encoded: The complete HDLC-encoded frame, excluding HDLC flag
characters.
raw_decoded: The complete decoded frame (address, control,
information, FCS).
status: Whether parsing the frame succeeded.
"""
self.raw_encoded = raw_encoded
self.raw_decoded = raw_decoded
self.status = status
self.address: int = NO_ADDRESS
self.control: bytes = b''
self.data: bytes = b''
if status == FrameStatus.OK:
address, address_length = protocol.decode_address(raw_decoded)
if address_length == 0:
self.status = FrameStatus.BAD_ADDRESS
return
self.address = address
self.control = raw_decoded[address_length:address_length + 1]
self.data = raw_decoded[address_length + 1:-4]
def ok(self) -> bool:
"""True if this represents a valid frame.
If false, then parsing failed. The status is set to indicate what type
of error occurred, and the data field contains all bytes parsed from the
frame (including bytes parsed as address or control bytes).
"""
return self.status is FrameStatus.OK
def __repr__(self) -> str:
if self.ok():
body = (f'address={self.address}, control={self.control!r}, '
f'data={self.data!r}')
else:
body = str(self.status)
return f'{type(self).__name__}({body})'
class _State(enum.Enum):
INTERFRAME = 0
FRAME = 1
FRAME_ESCAPE = 2
def _check_frame(frame_data: bytes) -> FrameStatus:
if len(frame_data) < _MIN_FRAME_SIZE:
return FrameStatus.FRAMING_ERROR
frame_crc = int.from_bytes(frame_data[-4:], 'little')
if zlib.crc32(frame_data[:-4]) != frame_crc:
return FrameStatus.FCS_MISMATCH
return FrameStatus.OK
class FrameDecoder:
"""Decodes one or more HDLC frames from a stream of data."""
def __init__(self):
self._decoded_data = bytearray()
self._raw_data = bytearray()
self._state = _State.INTERFRAME
def process(self, data: bytes) -> Iterator[Frame]:
"""Decodes and yields HDLC frames, including corrupt frames.
The ok() method on Frame indicates whether it is valid or represents a
frame parsing error.
Yields:
Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
"""
for byte in data:
frame = self._process_byte(byte)
if frame:
yield frame
def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
"""Decodes and yields valid HDLC frames, logging any errors."""
for frame in self.process(data):
if frame.ok():
yield frame
else:
_LOG.warning('Failed to decode frame: %s; discarded %d bytes',
frame.status.value, len(frame.raw_encoded))
_LOG.debug('Discarded data: %s', frame.raw_encoded)
def _finish_frame(self, status: FrameStatus) -> Frame:
frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
self._raw_data.clear()
self._decoded_data.clear()
return frame
def _process_byte(self, byte: int) -> Optional[Frame]:
frame: Optional[Frame] = None
# Record every byte except the flag character.
if byte != protocol.FLAG:
self._raw_data.append(byte)
if self._state is _State.INTERFRAME:
if byte == protocol.FLAG:
if self._raw_data:
frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
self._state = _State.FRAME
elif self._state is _State.FRAME:
if byte == protocol.FLAG:
if self._raw_data:
frame = self._finish_frame(_check_frame(
self._decoded_data))
self._state = _State.FRAME
elif byte == protocol.ESCAPE:
self._state = _State.FRAME_ESCAPE
else:
self._decoded_data.append(byte)
elif self._state is _State.FRAME_ESCAPE:
if byte == protocol.FLAG:
frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
self._state = _State.FRAME
elif byte in protocol.VALID_ESCAPED_BYTES:
self._state = _State.FRAME
self._decoded_data.append(protocol.escape(byte))
else:
self._state = _State.INTERFRAME
else:
raise AssertionError(f'Invalid decoder state: {self._state}')
return frame