| #!/usr/bin/env python |
| # Copyright 2020 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Contains the Python decoder tests and generates C++ decoder tests.""" |
| |
| import queue |
| from typing import Iterator, List, NamedTuple, Tuple, Union |
| import unittest |
| |
| from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest |
| from pw_build.generated_tests import parse_test_generation_args |
| from pw_hdlc.decode import (Frame, FrameDecoder, FrameAndNonFrameDecoder, |
| FrameStatus, NO_ADDRESS) |
| from pw_hdlc.protocol import frame_check_sequence as fcs |
| from pw_hdlc.protocol import encode_address |
| |
| |
| def _encode(address: int, control: int, data: bytes) -> bytes: |
| frame = encode_address(address) + bytes([control]) + data |
| frame += fcs(frame) |
| frame = frame.replace(b'}', b'}\x5d') |
| frame = frame.replace(b'~', b'}\x5e') |
| return b''.join([b'~', frame, b'~']) |
| |
| |
| class Expected(NamedTuple): |
| address: int |
| control: bytes |
| data: bytes |
| status: FrameStatus = FrameStatus.OK |
| |
| @classmethod |
| def error(cls, status: FrameStatus): |
| assert status is not FrameStatus.OK |
| return cls(NO_ADDRESS, b'', b'', status) |
| |
| def __eq__(self, other) -> bool: |
| """Define == so an Expected and a Frame can be compared.""" |
| return (self.address == other.address and self.control == other.control |
| and self.data == other.data and self.status is other.status) |
| |
| |
| class ExpectedRaw(NamedTuple): |
| raw_encoded: bytes |
| status: FrameStatus |
| |
| def __eq__(self, other) -> bool: |
| """Define == so an ExpectedRaw and a Frame can be compared.""" |
| return (self.raw_encoded == other.raw_encoded |
| and self.status is other.status) |
| |
| |
| class TestCase(NamedTuple): |
| data: bytes |
| frames: List[Union[Expected, ExpectedRaw]] |
| raw_data: bytes |
| |
| |
| def case(data: bytes, frames: list, raw: bytes = None) -> TestCase: |
| """Creates a TestCase, filling in the default value for the raw bytes.""" |
| if raw is not None: |
| return TestCase(data, frames, raw) |
| if not frames or all(f.status is not FrameStatus.OK for f in frames): |
| return TestCase(data, frames, data) |
| if all(f.status is FrameStatus.OK for f in frames): |
| return TestCase(data, frames, b'') |
| raise AssertionError( |
| f'Must specify expected non-frame data for this test case ({data=})!') |
| |
| |
| _PARTIAL = fcs(b'\x0ACmsg\x5e') |
| _ESCAPED_FLAG_TEST_CASE = case( |
| b'~\x0ACmsg}~' + _PARTIAL + b'~', |
| [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| ], |
| ) |
| |
| # Test cases are a tuple with the following elements: |
| # |
| # - raw data stream |
| # - expected valid & invalid frames |
| # - [optional] expected raw, non-HDLC data; defaults to the full raw data |
| # stream if no valid frames are expected, or b'' if only valid frames are |
| # expected |
| # |
| # These tests are executed twice: once for the standard HDLC decoder, and a |
| # second time for the FrameAndNonFrameDecoder. The FrameAndNonFrameDecoder tests |
| # flush the non-frame data to simulate a timeout or MTU overflow, so the |
| # expected raw data includes all bytes not in an HDLC frame. |
| TEST_CASES: Tuple[GroupOrTest[TestCase], ...] = ( |
| 'Empty payload', |
| case(_encode(0, 0, b''), [Expected(0, b'\0', b'')]), |
| case(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]), |
| case(_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3), |
| 'Simple one-byte payload', |
| case(_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]), |
| case(_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]), |
| 'Simple multi-byte payload', |
| case(_encode(0, 0, b'Hello, world!'), |
| [Expected(0, b'\0', b'Hello, world!')]), |
| case(_encode(123, 0, b'\0\0\1\0\0'), |
| [Expected(123, b'\0', b'\0\0\1\0\0')]), |
| 'Escaped one-byte payload', |
| case(_encode(1, 2, b'~'), [Expected(1, b'\2', b'~')]), |
| case(_encode(1, 2, b'}'), [Expected(1, b'\2', b'}')]), |
| case( |
| _encode(1, 2, b'~') + _encode(1, 2, b'}'), |
| [Expected(1, b'\2', b'~'), |
| Expected(1, b'\2', b'}')]), |
| 'Escaped address', |
| case(_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]), |
| case(_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]), |
| 'Escaped control', |
| case(_encode(0, 0x7e, b'C'), [Expected(0, b'~', b'C')]), |
| case(_encode(0, 0x7d, b'D'), [Expected(0, b'}', b'D')]), |
| 'Escaped address and control', |
| case(_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'}', b'E')]), |
| case(_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'~', b'F')]), |
| case(_encode(0x7e, 0x7e, b'~'), [Expected(0x7e, b'~', b'~')]), |
| 'Multibyte address', |
| case(_encode(128, 0, b'big address'), |
| [Expected(128, b'\0', b'big address')]), |
| case(_encode(0xffffffff, 0, b'\0\0\1\0\0'), |
| [Expected(0xffffffff, b'\0', b'\0\0\1\0\0')]), |
| 'Multiple frames separated by single flag', |
| case( |
| _encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'), |
| [Expected(0, b'\0', b'A'), |
| Expected(1, b'\2', b'123')]), |
| case( |
| _encode(0xff, 0, b'Yo')[:-1] * 3 + b'~', |
| [Expected(0xff, b'\0', b'Yo')] * 3), |
| 'Empty frames produce framing errors with raw data', |
| case(b'~~', [ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR)], b'~~'), |
| case(b'~' * 10, [ |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ]), |
| case( |
| b'~~' + _encode(1, 2, b'3') + b'~' * 5, |
| [ |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| Expected(1, b'\2', b'3'), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| # One flag byte remains in the decoding state machine. |
| ], |
| b'~~~~~~~'), |
| case(b'~' * 10 + _encode(1, 2, b':O') + b'~' * 3 + _encode(3, 4, b':P'), [ |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| Expected(1, b'\2', b':O'), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| Expected(3, b'\4', b':P') |
| ], b'~' * 13), |
| 'Cannot escape flag', |
| case(b'~\xAA}~\xab\x00Hello' + fcs(b'\xab\0Hello') + b'~', [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected(0x55, b'\0', b'Hello'), |
| ], b'~\xAA}'), |
| _ESCAPED_FLAG_TEST_CASE, |
| 'Frame too short', |
| case(b'~1~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~12~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~12345~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| 'Multibyte address too long', |
| case(_encode(2**100, 0, b'too long'), |
| [Expected.error(FrameStatus.BAD_ADDRESS)]), |
| 'Incorrect frame check sequence', |
| case(b'~123456~', [Expected.error(FrameStatus.FCS_MISMATCH)]), |
| case(b'~\1\2msg\xff\xff\xff\xff~', |
| [Expected.error(FrameStatus.FCS_MISMATCH)]), |
| case( |
| _encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [ |
| Expected.error(FrameStatus.FCS_MISMATCH), |
| Expected(1, b'\2', b'def'), |
| ], |
| _encode(0xA, 0xB, b'???')[:-2]), |
| 'Invalid escape in address', |
| case(b'~}}\0' + fcs(b'\x5d\0') + b'~', |
| [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| 'Invalid escape in control', |
| case(b'~\0}}' + fcs(b'\0\x5d') + b'~', |
| [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| 'Invalid escape in data', |
| case(b'~\0\1}}' + fcs(b'\0\1\x5d') + b'~', |
| [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| 'Frame ends with escape', |
| case(b'~}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~\1}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~\1\2abc}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~\1\2abcd}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~\1\2abcd1234}~', [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| 'Inter-frame data is only escapes', |
| case(b'~}~}~', [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| ]), |
| case(b'~}}~}}~', [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| ]), |
| 'Data before first flag', |
| case(b'\0\1' + fcs(b'\0\1'), []), |
| case(b'\0\1' + fcs(b'\0\1') + b'~', |
| [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| 'No frames emitted until flag', |
| case(_encode(1, 2, b'3')[:-1], []), |
| case(b'~' + _encode(1, 2, b'3')[1:-1] * 2, []), |
| 'Only flag and escape characters can be escaped', |
| case(b'~}\0' + _encode(1, 2, b'3'), |
| [Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected(1, b'\2', b'3')], b'~}\0'), |
| case(b'~1234}a' + _encode(1, 2, b'3'), |
| [Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected(1, b'\2', b'3')], b'~1234}a'), |
| 'Invalid frame records raw data', |
| case(b'Hello?~', [ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR)]), |
| case(b'~~Hel}}lo~', [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'Hel}}lo~', FrameStatus.FRAMING_ERROR), |
| ]), |
| case(b'Hello?~~~~~', [ |
| ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR), |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| ]), |
| case(b'~~~~Hello?~~~~~', [ |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'Hello?~', FrameStatus.FCS_MISMATCH), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR), |
| ]), |
| case(b'Hello?~~Goodbye~', [ |
| ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR), |
| ExpectedRaw(b'~Goodbye~', FrameStatus.FCS_MISMATCH), |
| ]), |
| 'Valid data followed by frame followed by invalid', |
| case( |
| b'Hi~ this is a log message\r\n' + _encode(0, 0, b'') + |
| b'More log messages!\r\n', [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected.error(FrameStatus.FCS_MISMATCH), |
| Expected(0, b'\0', b''), |
| ], b'Hi~ this is a log message\r\nMore log messages!\r\n'), |
| case(b'Hi~ this is a log message\r\n', |
| [Expected.error(FrameStatus.FRAMING_ERROR)]), |
| case(b'~Hi~' + _encode(1, 2, b'def') + b' How are you?', [ |
| Expected.error(FrameStatus.FRAMING_ERROR), |
| Expected(1, b'\2', b'def') |
| ], b'~Hi~ How are you?'), |
| ) # yapf: disable |
| # Formatting for the above tuple is very slow, so disable yapf. Manually enable |
| # it as needed to format the test cases. |
| |
| _TESTS = TestGenerator(TEST_CASES) |
| |
| |
| def _expected(frames: List[Frame]) -> Iterator[str]: |
| for i, frame in enumerate(frames, 1): |
| if frame.ok(): |
| yield f' Frame::Parse(kDecodedFrame{i:02}).value(),' |
| elif frame.status is FrameStatus.BAD_ADDRESS: |
| yield f' Frame::Parse(kDecodedFrame{i:02}).status(),' |
| else: |
| yield f' Status::DataLoss(), // Frame {i}' |
| |
| |
| _CPP_HEADER = """\ |
| #include "pw_hdlc/decoder.h" |
| |
| #include <array> |
| #include <cstddef> |
| #include <variant> |
| |
| #include "gtest/gtest.h" |
| #include "pw_bytes/array.h" |
| |
| namespace pw::hdlc { |
| namespace { |
| """ |
| |
| _CPP_FOOTER = """\ |
| } // namespace |
| } // namespace pw::hdlc""" |
| |
| _TS_HEADER = """\ |
| import 'jasmine'; |
| |
| import {Buffer} from 'buffer'; |
| |
| import {Decoder, FrameStatus} from './decoder' |
| import * as protocol from './protocol' |
| import * as util from './util' |
| |
| class Expected { |
| address: number |
| control: Uint8Array |
| data: Uint8Array |
| status: FrameStatus |
| |
| constructor( |
| address: number, |
| control: Uint8Array, |
| data: Uint8Array, |
| status: FrameStatus) { |
| this.address = address; |
| this.control = control; |
| this.data = data; |
| this.status = status; |
| } |
| } |
| |
| class ExpectedRaw { |
| raw: Uint8Array |
| status: FrameStatus |
| |
| constructor(raw: Uint8Array, status: FrameStatus) { |
| this.status = status; |
| this.raw = raw; |
| } |
| } |
| |
| describe('Decoder', () => { |
| let decoder: Decoder; |
| let textEncoder: TextEncoder; |
| |
| beforeEach(() => { |
| decoder = new Decoder(); |
| textEncoder = new TextEncoder(); |
| }); |
| |
| """ |
| _TS_FOOTER = """\ |
| }); |
| """ |
| |
| |
| def _py_only_frame(frame: Frame) -> bool: |
| """Returns true for frames only returned by the Python library""" |
| return (frame.status is FrameStatus.FRAMING_ERROR |
| and frame.raw_encoded == b'~~') |
| |
| |
| def _cpp_test(ctx: Context) -> Iterator[str]: |
| """Generates a C++ test for the provided test data.""" |
| data, _, _ = ctx.test_case |
| frames = [ |
| f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f) |
| ] |
| data_bytes = ''.join(rf'\x{byte:02x}' for byte in data) |
| |
| yield f'TEST(Decoder, {ctx.cc_name()}) {{' |
| yield f' static constexpr auto kData = bytes::String("{data_bytes}");\n' |
| |
| for i, frame in enumerate(frames, 1): |
| if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS: |
| frame_bytes = ''.join(rf'\x{byte:02x}' |
| for byte in frame.raw_decoded) |
| yield (f' static constexpr auto kDecodedFrame{i:02} = ' |
| f'bytes::String("{frame_bytes}");') |
| else: |
| yield f' // Frame {i}: {frame.status.value}' |
| |
| yield '' |
| |
| expected = '\n'.join(_expected(frames)) or ' // No frames' |
| decoder_size = max(len(data), 8) # Make sure large enough for a frame |
| |
| yield f"""\ |
| DecoderBuffer<{decoder_size}> decoder; |
| |
| static std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{ |
| {expected} |
| }}; |
| |
| size_t decoded_frames = 0; |
| |
| decoder.Process(kData, [&](const Result<Frame>& result) {{ |
| ASSERT_LT(decoded_frames++, kExpected.size()); |
| auto& expected = kExpected[decoded_frames - 1]; |
| |
| if (std::holds_alternative<Status>(expected)) {{ |
| EXPECT_EQ(Status::DataLoss(), result.status()); |
| }} else {{ |
| ASSERT_EQ(OkStatus(), result.status()); |
| |
| const Frame& decoded_frame = result.value(); |
| const Frame& expected_frame = std::get<Frame>(expected); |
| EXPECT_EQ(expected_frame.address(), decoded_frame.address()); |
| EXPECT_EQ(expected_frame.control(), decoded_frame.control()); |
| ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size()); |
| EXPECT_EQ(std::memcmp(expected_frame.data().data(), |
| decoded_frame.data().data(), |
| expected_frame.data().size()), |
| 0); |
| }} |
| }}); |
| |
| EXPECT_EQ(decoded_frames, kExpected.size()); |
| }}""" |
| |
| |
| def _define_py_decoder_test(ctx: Context) -> PyTest: |
| data, expected_frames, _ = ctx.test_case |
| |
| def test(self) -> None: |
| self.maxDiff = None |
| # Decode in one call |
| self.assertEqual(expected_frames, |
| list(FrameDecoder().process(data)), |
| msg=f'{ctx.group}: {data!r}') |
| # Decode byte-by-byte |
| decoder = FrameDecoder() |
| decoded_frames: List[Frame] = [] |
| for i in range(len(data)): |
| decoded_frames += decoder.process(data[i:i + 1]) |
| |
| self.assertEqual(expected_frames, |
| decoded_frames, |
| msg=f'{ctx.group} (byte-by-byte): {data!r}') |
| |
| return test |
| |
| |
| def _define_raw_decoder_py_test(ctx: Context) -> PyTest: |
| raw_data, expected_frames, expected_non_frame_data = ctx.test_case |
| |
| # The non-frame data decoder only yields valid frames. |
| expected_frames = [ |
| f for f in expected_frames if f.status is FrameStatus.OK |
| ] |
| |
| def test(self) -> None: |
| self.maxDiff = None |
| |
| non_frame_data = bytearray() |
| |
| # Decode in one call |
| decoder = FrameAndNonFrameDecoder( |
| non_frame_data_handler=non_frame_data.extend) |
| |
| self.assertEqual(expected_frames, |
| list(decoder.process(raw_data)), |
| msg=f'{ctx.group}: {raw_data!r}') |
| |
| decoder.flush_non_frame_data() |
| self.assertEqual(expected_non_frame_data, bytes(non_frame_data)) |
| |
| # Decode byte-by-byte |
| non_frame_data.clear() |
| decoder = FrameAndNonFrameDecoder( |
| non_frame_data_handler=non_frame_data.extend) |
| decoded_frames: List[Frame] = [] |
| for i in range(len(raw_data)): |
| decoded_frames += decoder.process(raw_data[i:i + 1]) |
| |
| self.assertEqual(expected_frames, |
| decoded_frames, |
| msg=f'{ctx.group} (byte-by-byte): {raw_data!r}') |
| decoder.flush_non_frame_data() |
| self.assertEqual(expected_non_frame_data, bytes(non_frame_data)) |
| |
| return test |
| |
| |
| def _ts_byte_array(data: bytes) -> str: |
| return '[' + ', '.join(rf'0x{byte:02x}' for byte in data) + ']' |
| |
| |
| def _ts_test(ctx: Context) -> Iterator[str]: |
| """Generates a TS test for the provided test data.""" |
| data, _, _ = ctx.test_case |
| frames = [ |
| f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f) |
| ] |
| data_bytes = _ts_byte_array(data) |
| |
| yield f' it(\'{ctx.ts_name()}\', () => {{' |
| yield f' const data = new Uint8Array({data_bytes});' |
| |
| yield ' const expectedFrames = [' |
| for frame in frames: |
| control_bytes = _ts_byte_array(frame.control) |
| frame_bytes = _ts_byte_array(frame.data) |
| |
| if frame is Expected: |
| yield (f' new Expected({frame.address}, ' |
| f'new Uint8Array({control_bytes}), ' |
| f'new Uint8Array({frame_bytes}), {frame.status}),') |
| else: |
| raw = _ts_byte_array(frame.raw_encoded) |
| yield ( |
| f' new ExpectedRaw(new Uint8Array({raw}), {frame.status}),' |
| ) |
| |
| yield ' ].values();\n' |
| |
| yield """\ |
| const result = decoder.process(data); |
| |
| while (true) { |
| const expectedFrame = expectedFrames.next(); |
| const actualFrame = result.next(); |
| if (expectedFrame.done && actualFrame.done) { |
| break; |
| } |
| expect(expectedFrame.done).toBeFalse(); |
| expect(actualFrame.done).toBeFalse(); |
| |
| const expected = expectedFrame.value; |
| const actual = actualFrame.value; |
| if (expected instanceof Expected) { |
| expect(actual.address).toEqual(expected.address); |
| expect(actual.control).toEqual(expected.control); |
| expect(actual.data).toEqual(expected.data); |
| expect(actual.status).toEqual(expected.status); |
| } else { |
| // Expected Raw |
| expect(actual.rawEncoded).toEqual(expected.raw); |
| expect(actual.status).toEqual(expected.status); |
| } |
| } |
| }); |
| """ |
| |
| |
| # Class that tests all cases in TEST_CASES. |
| DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_decoder_test) |
| NonFrameDecoderTest = _TESTS.python_tests('NonFrameDecoderTest', |
| _define_raw_decoder_py_test) |
| |
| |
| class AdditionalNonFrameDecoderTests(unittest.TestCase): |
| """Additional tests for the non-frame decoder.""" |
| def test_shared_flags_waits_for_tilde_to_emit_data(self) -> None: |
| non_frame_data = bytearray() |
| decoder = FrameAndNonFrameDecoder(non_frame_data.extend) |
| |
| self.assertEqual([Expected(0, b'\0', b'')], |
| list(decoder.process(_encode(0, 0, b'')))) |
| self.assertEqual(non_frame_data, b'') |
| |
| self.assertEqual([], list(decoder.process(b'uh oh, no tilde!'))) |
| self.assertEqual(non_frame_data, b'') |
| |
| self.assertEqual([], list(decoder.process(b'~'))) |
| self.assertEqual(non_frame_data, b'uh oh, no tilde!') |
| |
| def test_no_shared_flags_immediately_emits_data(self) -> None: |
| non_frame_data = bytearray() |
| decoder = FrameAndNonFrameDecoder(non_frame_data.extend, |
| handle_shared_flags=False) |
| |
| self.assertEqual([Expected(0, b'\0', b'')], |
| list(decoder.process(_encode(0, 0, b'')))) |
| self.assertEqual(non_frame_data, b'') |
| |
| self.assertEqual([], list(decoder.process(b'uh oh, no tilde!'))) |
| self.assertEqual(non_frame_data, b'uh oh, no tilde!') |
| |
| def test_emits_data_if_mtu_is_exceeded(self) -> None: |
| frame_start = b'~this looks like a real frame' |
| |
| non_frame_data = bytearray() |
| decoder = FrameAndNonFrameDecoder(non_frame_data.extend, |
| mtu=len(frame_start)) |
| |
| self.assertEqual([], list(decoder.process(frame_start))) |
| self.assertEqual(non_frame_data, b'') |
| |
| self.assertEqual([], list(decoder.process(b'!'))) |
| self.assertEqual(non_frame_data, frame_start + b'!') |
| |
| def test_emits_data_if_timeout_expires(self) -> None: |
| frame_start = b'~this looks like a real frame' |
| |
| non_frame_data: 'queue.Queue[bytes]' = queue.Queue() |
| decoder = FrameAndNonFrameDecoder(non_frame_data.put, timeout_s=0.001) |
| |
| self.assertEqual([], list(decoder.process(frame_start))) |
| self.assertEqual(non_frame_data.get(timeout=2), frame_start) |
| |
| def test_emits_raw_data_and_valid_frame_if_flushed_partway(self) -> None: |
| payload = b'Do you wanna ride in my blimp?' |
| frame = _encode(1, 2, payload) |
| |
| non_frame_data = bytearray() |
| decoder = FrameAndNonFrameDecoder(non_frame_data.extend) |
| |
| self.assertEqual([], list(decoder.process(frame[:5]))) |
| decoder.flush_non_frame_data() |
| |
| self.assertEqual([Expected(1, b'\2', payload)], |
| list(decoder.process(frame[5:]))) |
| |
| |
| if __name__ == '__main__': |
| args = parse_test_generation_args() |
| if args.generate_cc_test: |
| _TESTS.cc_tests(args.generate_cc_test, _cpp_test, _CPP_HEADER, |
| _CPP_FOOTER) |
| elif args.generate_ts_test: |
| _TESTS.ts_tests(args.generate_ts_test, _ts_test, _TS_HEADER, |
| _TS_FOOTER) |
| else: |
| unittest.main() |