blob: df801eb5b415f50300bed4c5c0047c34f33f077a [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Contains the Python decoder tests and generates C++ decoder tests."""
from collections import defaultdict
from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Union
import unittest
from pw_hdlc_lite.decoder import FrameDecoder, FrameStatus, NO_ADDRESS
from pw_hdlc_lite.protocol import frame_check_sequence as fcs
def _encode(address: int, control: int, data: bytes) -> bytes:
frame = bytearray([address, control]) + data
frame += fcs(frame)
frame = frame.replace(b'\x7d', b'\x7d\x5d')
frame = frame.replace(b'\x7e', b'\x7d\x5e')
return b''.join([b'\x7e', frame, b'\x7e'])
class Expected(NamedTuple):
address: int
control: bytes
data: bytes
status: FrameStatus = FrameStatus.OK
def __eq__(self, other) -> bool:
"""Define == so an Expected and a Frame can be compared."""
return (self.address == other.address and self.control == other.control
and self.data == other.data and self.status is other.status)
_PARTIAL = fcs(b'\x0ACmsg\x5e')
_ESCAPED_FLAG_TEST_CASE = (
b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e',
[
Expected(0xA, b'C', b'', FrameStatus.INCOMPLETE),
Expected(_PARTIAL[0], _PARTIAL[1:2], b'', FrameStatus.INCOMPLETE),
],
)
TestCase = Tuple[bytes, List[Expected]]
TestCases = Tuple[Union[str, TestCase], ...]
TEST_CASES: TestCases = (
'Empty payload',
(_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
(_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
'Simple one-byte payload',
(_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
(_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
'Simple multi-byte payload',
(_encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]),
(_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]),
'Escaped one-byte payload',
(_encode(1, 2, b'\x7e'), [Expected(1, b'\2', b'\x7e')]),
(_encode(1, 2, b'\x7d'), [Expected(1, b'\2', b'\x7d')]),
(_encode(1, 2, b'\x7e') + _encode(1, 2, b'\x7d'),
[Expected(1, b'\2', b'\x7e'),
Expected(1, b'\2', b'\x7d')]),
'Escaped address',
(_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]),
(_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]),
'Escaped control',
(_encode(0, 0x7e, b'C'), [Expected(0, b'\x7e', b'C')]),
(_encode(0, 0x7d, b'D'), [Expected(0, b'\x7d', b'D')]),
'Escaped address and control',
(_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'\x7d', b'E')]),
(_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'\x7e', b'F')]),
(_encode(0x7e, 0x7e, b'\x7e'), [Expected(0x7e, b'\x7e', b'\x7e')]),
'Multiple frames separated by single flag',
(_encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
[Expected(0, b'\0', b'A'),
Expected(1, b'\2', b'123')]),
(_encode(0xff, 0, b'Yo')[:-1] * 3 + b'\x7e',
[Expected(0xff, b'\0', b'Yo')] * 3),
'Ignore empty frames',
(b'\x7e\x7e', []),
(b'\x7e' * 10, []),
(b'\x7e\x7e' + _encode(1, 2, b'3') + b'\x7e' * 5,
[Expected(1, b'\2', b'3')]),
(b'\x7e' * 10 + _encode(1, 2, b':O') + b'\x7e' * 3 + _encode(3, 4, b':P'),
[Expected(1, b'\2', b':O'),
Expected(3, b'\4', b':P')]),
'Cannot escape flag',
(b'\x7e\xAA\x7d\x7e\xab\x00Hello' + fcs(b'\xab\0Hello') + b'\x7e', [
Expected(0xAA, b'', b'', FrameStatus.INCOMPLETE),
Expected(0xab, b'\0', b'Hello'),
]),
_ESCAPED_FLAG_TEST_CASE,
'Frame too short',
(b'\x7e1\x7e', [Expected(ord('1'), b'', b'', FrameStatus.INCOMPLETE)]),
(b'\x7e12\x7e', [Expected(ord('1'), b'2', b'', FrameStatus.INCOMPLETE)]),
(b'\x7e12345\x7e', [Expected(ord('1'), b'2', b'',
FrameStatus.INCOMPLETE)]),
'Incorrect frame check sequence',
(b'\x7e123456\x7e',
[Expected(ord('1'), b'2', b'', FrameStatus.FCS_MISMATCH)]),
(b'\x7e\1\2msg\xff\xff\xff\xff\x7e',
[Expected(0x1, b'\2', b'msg', FrameStatus.FCS_MISMATCH)]),
(_encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [
Expected(0xA, b'\x0B', b'??', FrameStatus.FCS_MISMATCH),
Expected(1, b'\2', b'def'),
]),
'Invalid escape in address',
(b'\x7e\x7d\x7d\0' + fcs(b'\x5d\0') + b'\x7e',
[Expected(0,
fcs(b'\x5d\0')[0:1], b'', FrameStatus.INVALID_ESCAPE)]),
'Invalid escape in control',
(b'\x7e\0\x7d\x7d' + fcs(b'\0\x5d') + b'\x7e',
[Expected(0,
fcs(b'\0\x5d')[0:1], b'', FrameStatus.INVALID_ESCAPE)]),
'Invalid escape in data',
(b'\x7e\0\1\x7d\x7d' + fcs(b'\0\1\x5d') + b'\x7e',
[Expected(0, b'\1', b'', FrameStatus.INVALID_ESCAPE)]),
'Frame ends with escape',
(b'\x7e\x7d\x7e', [Expected(NO_ADDRESS, b'', b'',
FrameStatus.INCOMPLETE)]),
(b'\x7e\1\x7d\x7e', [Expected(1, b'', b'', FrameStatus.INCOMPLETE)]),
(b'\x7e\1\2abc\x7d\x7e', [Expected(1, b'\2', b'',
FrameStatus.INCOMPLETE)]),
(b'\x7e\1\2abcd\x7d\x7e',
[Expected(1, b'\2', b'', FrameStatus.INCOMPLETE)]),
(b'\x7e\1\2abcd1234\x7d\x7e',
[Expected(1, b'\2', b'abcd', FrameStatus.INCOMPLETE)]),
'Data before first flag',
(b'\0\1' + fcs(b'\0\1'), []),
(b'\0\1' + fcs(b'\0\1') + b'\x7e',
[Expected(0, b'\1', b'', FrameStatus.INCOMPLETE)]),
'No frames emitted until flag',
(_encode(1, 2, b'3')[:-1], []),
(b'\x7e' + _encode(1, 2, b'3')[1:-1] * 2, []),
) # yapf: disable
# Formatting for the above tuple is very slow, so disable yapf.
def _sort_test_cases(test_cases: TestCases) -> Dict[str, List[TestCase]]:
cases: Dict[str, List[TestCase]] = defaultdict(list)
message = ''
for case in test_cases:
if isinstance(case, str):
message = case
else:
cases[message].append(case)
return cases
def _define_py_test(group: str,
data: bytes,
expected_frames: List[Expected],
count: int = None) -> Callable[[Any], None]:
def test(self) -> None:
# Decode in one call
self.assertEqual(expected_frames,
list(FrameDecoder().process(data)),
msg=f'{group}: {data!r}')
# Decode byte-by-byte
decoder = FrameDecoder()
decoded_frames = []
for i in range(len(data)):
decoded_frames += decoder.process(data[i:i + 1])
self.assertEqual(expected_frames,
decoded_frames,
msg=f'{group} (byte-by-byte): {data!r}')
name = 'test_' + ''.join(c if c.isalnum() else '_' for c in group.lower())
test.__name__ = name if count is None else name + f'_{count}'
return test
def _define_py_tests(test_cases: TestCases = TEST_CASES):
"""Generates a Python test function for each test case."""
tests: Dict[str, Callable[[Any], None]] = {}
for group, test_list in _sort_test_cases(test_cases).items():
for i, (data, expected_frames) in enumerate(test_list, 1):
count = i if len(test_list) > 1 else None
test = _define_py_test(group, data, expected_frames, count)
assert test.__name__ not in tests, f'Duplicate! {test.__name__}'
tests[test.__name__] = test
return tests
# Class that tests all cases in TEST_CASES.
DecoderTest = type('DecoderTest', (unittest.TestCase, ), _define_py_tests())
if __name__ == '__main__':
unittest.main()