pw_hdlc_lite: Update Python decoder
- Update Python decoder to work with address and control bytes and
CRC-32.
- Rename constants.py to protocol.py.
Change-Id: Id2b72b89cb2e42d449976a17809c49b82ee3a768
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/17842
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
diff --git a/pw_hdlc_lite/py/decoder_test.py b/pw_hdlc_lite/py/decoder_test.py
old mode 100644
new mode 100755
index 9d3038a..df801eb
--- a/pw_hdlc_lite/py/decoder_test.py
+++ b/pw_hdlc_lite/py/decoder_test.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python3
+#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
@@ -12,79 +12,197 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-"""Tests the hdlc_lite decoder module."""
+"""Contains the Python decoder tests and generates C++ decoder tests."""
+from collections import defaultdict
+from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Union
import unittest
-from pw_hdlc_lite import decoder
+from pw_hdlc_lite.decoder import FrameDecoder, FrameStatus, NO_ADDRESS
+from pw_hdlc_lite.protocol import frame_check_sequence as fcs
-class TestDecoder(unittest.TestCase):
- """Tests decoding bytes with different arguments."""
- def test_decode_1byte_payload(self):
- decode = decoder.Decoder()
- test_array = b'\x7EA\x15\xB9\x7E'
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(decoded_packets, [b'A'])
+def _encode(address: int, control: int, data: bytes) -> bytes:
+ frame = bytearray([address, control]) + data
+ frame += fcs(frame)
+ frame = frame.replace(b'\x7d', b'\x7d\x5d')
+ frame = frame.replace(b'\x7e', b'\x7d\x5e')
+ return b''.join([b'\x7e', frame, b'\x7e'])
- def test_decode_empty_payload(self):
- decode = decoder.Decoder()
- test_array = b'\x7E\xFF\xFF\x7E'
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(decoded_packets, [b''])
- def test_decode_9byte_payload(self):
- decode = decoder.Decoder()
- test_array = b'\x7E123456789\xB1\x29\x7E'
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(decoded_packets, [b'123456789'])
+class Expected(NamedTuple):
+ address: int
+ control: bytes
+ data: bytes
+ status: FrameStatus = FrameStatus.OK
- def test_decode_unescaping_payload_escapeflag(self):
- decode = decoder.Decoder()
- test_array = b'\x7E\x7D\x5D\xCA\x4E\x7E'
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(decoded_packets, [b'\x7D'])
+ def __eq__(self, other) -> bool:
+ """Define == so an Expected and a Frame can be compared."""
+ return (self.address == other.address and self.control == other.control
+ and self.data == other.data and self.status is other.status)
- def test_decode_unescaping_payload_framedelimiter(self):
- decode = decoder.Decoder()
- test_array = b'\x7E\x7D\x5E\xA9\x7D\x5E\x7E'
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(decoded_packets, [b'\x7E'])
- def test_decode_unescaping_payload_mix(self):
- decode = decoder.Decoder()
- test_array = b'\x7E\x7D\x5E\x7Babc\x7D\x5D\x7D\x5E\x49\xE5\x7E'
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(decoded_packets, [b'~{abc}~'])
+_PARTIAL = fcs(b'\x0ACmsg\x5e')
+_ESCAPED_FLAG_TEST_CASE = (
+ b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e',
+ [
+ Expected(0xA, b'C', b'', FrameStatus.INCOMPLETE),
+ Expected(_PARTIAL[0], _PARTIAL[1:2], b'', FrameStatus.INCOMPLETE),
+ ],
+)
- def test_decode_in_parts(self):
- decode = decoder.Decoder()
- test_array = b'\x7EA\x15\xB9\x7E\x7EA\x15\xB9\x7E'
- decoded_packets = list(decode.add_bytes(test_array[:3]))
- self.assertEqual(decoded_packets, [])
- decoded_packets = list(decode.add_bytes(test_array[3:8]))
- self.assertEqual(decoded_packets, [b'A'])
- decoded_packets = list(decode.add_bytes(test_array[8:]))
- self.assertEqual(decoded_packets, [b'A'])
+TestCase = Tuple[bytes, List[Expected]]
+TestCases = Tuple[Union[str, TestCase], ...]
- decoded_packets = list(decode.add_bytes(test_array))
- self.assertEqual(len(decoded_packets), 2)
- self.assertEqual(decoded_packets, [b'A', b'A'])
- def test_decode_incorrectcrc(self):
- decode = decoder.Decoder()
- test_array = b'\x7EA\x15\xB8\x7E'
- with self.assertRaises(decoder.CrcMismatchError):
- next(decode.add_bytes(test_array))
+TEST_CASES: TestCases = (
+ 'Empty payload',
+ (_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
+ (_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
+ (_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
+ 'Simple one-byte payload',
+ (_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
+ (_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
+ 'Simple multi-byte payload',
+ (_encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]),
+ (_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]),
+ 'Escaped one-byte payload',
+ (_encode(1, 2, b'\x7e'), [Expected(1, b'\2', b'\x7e')]),
+ (_encode(1, 2, b'\x7d'), [Expected(1, b'\2', b'\x7d')]),
+ (_encode(1, 2, b'\x7e') + _encode(1, 2, b'\x7d'),
+ [Expected(1, b'\2', b'\x7e'),
+ Expected(1, b'\2', b'\x7d')]),
+ 'Escaped address',
+ (_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]),
+ (_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]),
+ 'Escaped control',
+ (_encode(0, 0x7e, b'C'), [Expected(0, b'\x7e', b'C')]),
+ (_encode(0, 0x7d, b'D'), [Expected(0, b'\x7d', b'D')]),
+ 'Escaped address and control',
+ (_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'\x7d', b'E')]),
+ (_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'\x7e', b'F')]),
+ (_encode(0x7e, 0x7e, b'\x7e'), [Expected(0x7e, b'\x7e', b'\x7e')]),
+ 'Multiple frames separated by single flag',
+ (_encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
+ [Expected(0, b'\0', b'A'),
+ Expected(1, b'\2', b'123')]),
+ (_encode(0xff, 0, b'Yo')[:-1] * 3 + b'\x7e',
+ [Expected(0xff, b'\0', b'Yo')] * 3),
+ 'Ignore empty frames',
+ (b'\x7e\x7e', []),
+ (b'\x7e' * 10, []),
+ (b'\x7e\x7e' + _encode(1, 2, b'3') + b'\x7e' * 5,
+ [Expected(1, b'\2', b'3')]),
+ (b'\x7e' * 10 + _encode(1, 2, b':O') + b'\x7e' * 3 + _encode(3, 4, b':P'),
+ [Expected(1, b'\2', b':O'),
+ Expected(3, b'\4', b':P')]),
+ 'Cannot escape flag',
+ (b'\x7e\xAA\x7d\x7e\xab\x00Hello' + fcs(b'\xab\0Hello') + b'\x7e', [
+ Expected(0xAA, b'', b'', FrameStatus.INCOMPLETE),
+ Expected(0xab, b'\0', b'Hello'),
+ ]),
+ _ESCAPED_FLAG_TEST_CASE,
+ 'Frame too short',
+ (b'\x7e1\x7e', [Expected(ord('1'), b'', b'', FrameStatus.INCOMPLETE)]),
+ (b'\x7e12\x7e', [Expected(ord('1'), b'2', b'', FrameStatus.INCOMPLETE)]),
+ (b'\x7e12345\x7e', [Expected(ord('1'), b'2', b'',
+ FrameStatus.INCOMPLETE)]),
+ 'Incorrect frame check sequence',
+ (b'\x7e123456\x7e',
+ [Expected(ord('1'), b'2', b'', FrameStatus.FCS_MISMATCH)]),
+ (b'\x7e\1\2msg\xff\xff\xff\xff\x7e',
+ [Expected(0x1, b'\2', b'msg', FrameStatus.FCS_MISMATCH)]),
+ (_encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [
+ Expected(0xA, b'\x0B', b'??', FrameStatus.FCS_MISMATCH),
+ Expected(1, b'\2', b'def'),
+ ]),
+ 'Invalid escape in address',
+ (b'\x7e\x7d\x7d\0' + fcs(b'\x5d\0') + b'\x7e',
+ [Expected(0,
+ fcs(b'\x5d\0')[0:1], b'', FrameStatus.INVALID_ESCAPE)]),
+ 'Invalid escape in control',
+ (b'\x7e\0\x7d\x7d' + fcs(b'\0\x5d') + b'\x7e',
+ [Expected(0,
+ fcs(b'\0\x5d')[0:1], b'', FrameStatus.INVALID_ESCAPE)]),
+ 'Invalid escape in data',
+ (b'\x7e\0\1\x7d\x7d' + fcs(b'\0\1\x5d') + b'\x7e',
+ [Expected(0, b'\1', b'', FrameStatus.INVALID_ESCAPE)]),
+ 'Frame ends with escape',
+ (b'\x7e\x7d\x7e', [Expected(NO_ADDRESS, b'', b'',
+ FrameStatus.INCOMPLETE)]),
+ (b'\x7e\1\x7d\x7e', [Expected(1, b'', b'', FrameStatus.INCOMPLETE)]),
+ (b'\x7e\1\2abc\x7d\x7e', [Expected(1, b'\2', b'',
+ FrameStatus.INCOMPLETE)]),
+ (b'\x7e\1\2abcd\x7d\x7e',
+ [Expected(1, b'\2', b'', FrameStatus.INCOMPLETE)]),
+ (b'\x7e\1\2abcd1234\x7d\x7e',
+ [Expected(1, b'\2', b'abcd', FrameStatus.INCOMPLETE)]),
+ 'Data before first flag',
+ (b'\0\1' + fcs(b'\0\1'), []),
+ (b'\0\1' + fcs(b'\0\1') + b'\x7e',
+ [Expected(0, b'\1', b'', FrameStatus.INCOMPLETE)]),
+ 'No frames emitted until flag',
+ (_encode(1, 2, b'3')[:-1], []),
+ (b'\x7e' + _encode(1, 2, b'3')[1:-1] * 2, []),
+) # yapf: disable
+# Formatting for the above tuple is very slow, so disable yapf.
- def test_decode_incorrectcrc_mix(self):
- decode = decoder.Decoder()
- test_array = b'\x7EA\x15\xB9\x7E\x7EA\x15\xB8\x7E'
- decoded_packets = decode.add_bytes(test_array)
- self.assertEqual(next(decoded_packets), b'A')
- with self.assertRaises(decoder.CrcMismatchError):
- next(decoded_packets)
+def _sort_test_cases(test_cases: TestCases) -> Dict[str, List[TestCase]]:
+ cases: Dict[str, List[TestCase]] = defaultdict(list)
+ message = ''
+
+ for case in test_cases:
+ if isinstance(case, str):
+ message = case
+ else:
+ cases[message].append(case)
+
+ return cases
+
+
+def _define_py_test(group: str,
+ data: bytes,
+ expected_frames: List[Expected],
+ count: int = None) -> Callable[[Any], None]:
+ def test(self) -> None:
+ # Decode in one call
+ self.assertEqual(expected_frames,
+ list(FrameDecoder().process(data)),
+ msg=f'{group}: {data!r}')
+
+ # Decode byte-by-byte
+ decoder = FrameDecoder()
+ decoded_frames = []
+ for i in range(len(data)):
+ decoded_frames += decoder.process(data[i:i + 1])
+
+ self.assertEqual(expected_frames,
+ decoded_frames,
+ msg=f'{group} (byte-by-byte): {data!r}')
+
+ name = 'test_' + ''.join(c if c.isalnum() else '_' for c in group.lower())
+ test.__name__ = name if count is None else name + f'_{count}'
+ return test
+
+
+def _define_py_tests(test_cases: TestCases = TEST_CASES):
+ """Generates a Python test function for each test case."""
+ tests: Dict[str, Callable[[Any], None]] = {}
+
+ for group, test_list in _sort_test_cases(test_cases).items():
+ for i, (data, expected_frames) in enumerate(test_list, 1):
+ count = i if len(test_list) > 1 else None
+ test = _define_py_test(group, data, expected_frames, count)
+
+ assert test.__name__ not in tests, f'Duplicate! {test.__name__}'
+ tests[test.__name__] = test
+
+ return tests
+
+
+# Class that tests all cases in TEST_CASES.
+DecoderTest = type('DecoderTest', (unittest.TestCase, ), _define_py_tests())
if __name__ == '__main__':
unittest.main()
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py b/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py
index 73dc00d..1537e70 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py
@@ -84,17 +84,16 @@
def read_and_process_data(rpc_client, ser):
"""Reads in the data, decodes the bytes and then processes the rpc."""
- decode = decoder.Decoder()
+ decode = decoder.FrameDecoder()
while True:
byte = ser.read()
- try:
- for packet in decode.add_bytes(byte):
- if not rpc_client.process_packet(packet):
- _LOG.error('Packet not handled by rpc client: %s', packet)
- except decoder.CrcMismatchError:
- _LOG.exception('CRC verification failed')
- return
+ for frame in decode.process(byte):
+ if frame.ok():
+ if not rpc_client.process_packet(frame):
+ _LOG.error('Packet not handled by rpc client: %s', frame)
+ else:
+ _LOG.error('Failed to parse frame: %s', frame.status.value)
def main():
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/constants.py b/pw_hdlc_lite/py/pw_hdlc_lite/constants.py
deleted file mode 100644
index 418785b..0000000
--- a/pw_hdlc_lite/py/pw_hdlc_lite/constants.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# Copyright 2020 The Pigweed Authors
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-"""Module for constants"""
-
-HDLC_UNESCAPING_CONSTANT = 0x20
-HDLC_ESCAPE = 0x7D
-HDLC_FRAME_DELIMITER = 0x7E
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py b/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py
index e42947f..83f3f3a 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py
@@ -13,48 +13,190 @@
# the License.
"""Decoder class for decoding bytes using HDLC-Lite protocol"""
-import binascii
+import enum
+import logging
+import zlib
+from typing import Iterator, NamedTuple, Optional, Tuple
-from pw_hdlc_lite import constants
+from pw_hdlc_lite import protocol
+
+_LOG = logging.getLogger('pw_hdlc_lite')
-class CrcMismatchError(Exception):
- pass
+class FrameStatus(enum.Enum):
+ """Indicates that an error occurred."""
+ OK = 'OK'
+ FCS_MISMATCH = 'frame check sequence failure'
+ INCOMPLETE = 'incomplete frame'
+ INVALID_ESCAPE = 'invalid escape character'
-class Decoder:
- """Decodes the received _data-frames using the HDLC-Lite protocol."""
+_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32
+
+NO_ADDRESS = -1
+
+
+class Frame(NamedTuple):
+ """Represents an HDLC frame."""
+
+ # All bytes in the frame (address, control, information, FCS)
+ raw: bytes
+
+ # Whether parsing the frame succeeded.
+ status: FrameStatus = FrameStatus.OK
+
+ @property
+ def address(self) -> int:
+ """The frame's address field (assumes only one byte for now)."""
+ return self.raw[0] if self.raw else NO_ADDRESS
+
+ @property
+ def control(self) -> bytes:
+ """The control byte (assumes only one byte for now)."""
+ return self.raw[1:2] if len(self.raw) >= 2 else b''
+
+ @property
+ def data(self) -> bytes:
+ """The information field in the frame."""
+ return self.raw[2:-4] if len(self.raw) >= _MIN_FRAME_SIZE else b''
+
+ def ok(self) -> bool:
+ """True if this represents a valid frame.
+
+ If false, then parsing failed. The status is set to indicate what type
+ of error occurred, and the data field contains all bytes parsed from the
+ frame (including bytes parsed as address or control bytes).
+ """
+ return self.status is FrameStatus.OK
+
+
+class _BaseFrameState:
+ """Base class for all frame parsing states."""
+ def __init__(self, data: bytearray):
+ self._data = data # All data seen in the current frame
+ self._escape_next = False
+
+ def handle_flag(self) -> Tuple['_BaseFrameState', Optional[Frame]]:
+ """Handles an HDLC flag character (0x7e).
+
+ The HDLC flag is always interpreted as the start of a new frame.
+
+ Returns:
+ (next state, optional frame or error)
+ """
+ # If there is data or an escape character, the frame is incomplete.
+ if self._escape_next or self._data:
+ return _AddressState(), Frame(bytes(self._data),
+ FrameStatus.INCOMPLETE)
+
+ return _AddressState(), None
+
+ def handle_escape(self) -> '_BaseFrameState':
+ """Handles an HDLC escape character (0x7d); returns the next state."""
+ if self._escape_next:
+ # If two escapes occur in a row, the frame is invalid.
+ return _InterframeState(self._data, FrameStatus.INVALID_ESCAPE)
+
+ self._escape_next = True
+ return self
+
+ def handle_byte(self, byte: int) -> '_BaseFrameState':
+ """Handles a byte, which may have been escaped; returns next state."""
+ self._data.append(protocol.escape(byte) if self._escape_next else byte)
+ self._escape_next = False
+ return self
+
+
+class _InterframeState(_BaseFrameState):
+ """Not currently in a frame; any data is discarded."""
+ def __init__(self, data: bytearray, error: FrameStatus):
+ super().__init__(data)
+ self._error = error
+
+ def handle_flag(self) -> Tuple[_BaseFrameState, Optional[Frame]]:
+ # If this state was entered due to an error, report that error before
+ # starting a new frame.
+ if self._error is not FrameStatus.OK:
+ return _AddressState(), Frame(bytes(self._data), self._error)
+
+ return super().handle_flag()
+
+
+class _AddressState(_BaseFrameState):
+ """First field in a frame: the address."""
+ def __init__(self):
+ super().__init__(bytearray())
+
+ def handle_byte(self, byte: int) -> _BaseFrameState:
+ super().handle_byte(byte)
+ # Only handle single-byte addresses for now.
+ return _ControlState(self._data)
+
+
+class _ControlState(_BaseFrameState):
+ """Second field in a frame: control."""
+ def handle_byte(self, byte: int) -> _BaseFrameState:
+ super().handle_byte(byte)
+ # Only handle a single control byte for now.
+ return _DataState(self._data)
+
+
+class _DataState(_BaseFrameState):
+ """The information field in a frame."""
+ def handle_flag(self) -> Tuple[_BaseFrameState, Frame]:
+ return _AddressState(), Frame(bytes(self._data), self._check_frame())
+
+ def _check_frame(self) -> FrameStatus:
+ # If the last character was an escape, assume bytes are missing.
+ if self._escape_next or len(self._data) < _MIN_FRAME_SIZE:
+ return FrameStatus.INCOMPLETE
+
+ frame_crc = int.from_bytes(self._data[-4:], 'little')
+ if zlib.crc32(self._data[:-4]) != frame_crc:
+ return FrameStatus.FCS_MISMATCH
+
+ return FrameStatus.OK
+
+
+class FrameDecoder:
+ """Decodes one or more HDLC frames from a stream of data."""
def __init__(self):
self._data = bytearray()
self._unescape_next_byte_flag = False
+ self._state = _InterframeState(bytearray(), FrameStatus.OK)
- def add_bytes(self, byte_array: bytes):
- """Unescapes the bytes and yields the CRC-verified packets.
+ def process(self, data: bytes) -> Iterator[Frame]:
+ """Decodes and yields HDLC frames, including corrupt frames.
- If the CRC-verification fails, the function will raise a
- CrcMismatchError exception.
+ The ok() method on Frame indicates whether it is valid or represents a
+ frame parsing error.
+
+ Yields:
+ Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
"""
- for byte in byte_array:
- if byte == constants.HDLC_FRAME_DELIMITER:
- if self._data:
- if self._check_crc():
- yield self._data[:-2]
- else:
- raise CrcMismatchError()
- self._data.clear()
- elif byte == constants.HDLC_ESCAPE:
- self._unescape_next_byte_flag = True
+ for byte in data:
+ frame = self._process_byte(byte)
+ if frame:
+ yield frame
+
+ def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
+ """Decodes and yields valid HDLC frames, logging any errors."""
+ for frame in self.process(data):
+ if frame.ok():
+ yield frame
else:
- self._add_unescaped_byte(byte)
+ _LOG.warning('Failed to decode frame: %s; discarded %d bytes',
+ frame.status.value, len(frame.data))
+ _LOG.debug('Discarded data: %s', frame.data)
- def _add_unescaped_byte(self, byte):
- """Unescapes the bytes based on the _unescape_next_byte_flag flag."""
- if self._unescape_next_byte_flag:
- self._data.append(byte ^ constants.HDLC_UNESCAPING_CONSTANT)
- self._unescape_next_byte_flag = False
+ def _process_byte(self, byte: int) -> Optional[Frame]:
+ if byte == protocol.FLAG:
+ self._state, frame = self._state.handle_flag()
+ return frame
+
+ if byte == protocol.ESCAPE:
+ self._state = self._state.handle_escape()
else:
- self._data.append(byte)
+ self._state = self._state.handle_byte(byte)
- def _check_crc(self):
- return binascii.crc_hqx(self._data[:-2], 0xFFFF).to_bytes(
- 2, byteorder='little') == self._data[-2:]
+ return None
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py b/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py
index 4921233..64122f8 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py
@@ -15,10 +15,10 @@
import binascii
-from pw_hdlc_lite import constants
+from pw_hdlc_lite import protocol
-_HDLC_ESCAPE = bytes([constants.HDLC_ESCAPE])
-_HDLC_FRAME_DELIMITER = bytes([constants.HDLC_FRAME_DELIMITER])
+_HDLC_ESCAPE = bytes([protocol.ESCAPE])
+_HDLC_FRAME_DELIMITER = bytes([protocol.FLAG])
def encode_and_write_payload(payload, write):
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/protocol.py b/pw_hdlc_lite/py/pw_hdlc_lite/protocol.py
new file mode 100644
index 0000000..4f9098a
--- /dev/null
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/protocol.py
@@ -0,0 +1,31 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Module for low-level HDLC protocol features."""
+
+import zlib
+
+# Special flag character for delimiting HDLC frames.
+FLAG = 0x7E
+
+# Special character for escaping other special characters in a frame.
+ESCAPE = 0x7D
+
+
+def escape(byte: int) -> int:
+ """Escapes or unescapes a byte, which should have been preceeded by 0x7d."""
+ return byte ^ 0x20
+
+
+def frame_check_sequence(data: bytes) -> bytes:
+ return zlib.crc32(data).to_bytes(4, 'little')