#!/usr/bin/env python
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Contains the Python decoder tests and generates C++ decoder tests."""

import queue
from typing import Iterator, List, NamedTuple, Optional, Tuple, Union
import unittest

from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest
from pw_build.generated_tests import parse_test_generation_args
from pw_hdlc.decode import (Frame, FrameDecoder, FrameAndNonFrameDecoder,
                            FrameStatus, NO_ADDRESS)
from pw_hdlc.protocol import frame_check_sequence as fcs
from pw_hdlc.protocol import encode_address


def _encode(address: int, control: int, data: bytes) -> bytes:
    frame = encode_address(address) + bytes([control]) + data
    frame += fcs(frame)
    frame = frame.replace(b'}', b'}\x5d')
    frame = frame.replace(b'~', b'}\x5e')
    return b''.join([b'~', frame, b'~'])


class Expected(NamedTuple):
    address: int
    control: bytes
    data: bytes
    status: FrameStatus = FrameStatus.OK

    @classmethod
    def error(cls, status: FrameStatus):
        assert status is not FrameStatus.OK
        return cls(NO_ADDRESS, b'', b'', status)

    def __eq__(self, other) -> bool:
        """Define == so an Expected and a Frame can be compared."""
        return (self.address == other.address and self.control == other.control
                and self.data == other.data and self.status is other.status)


class ExpectedRaw(NamedTuple):
    raw_encoded: bytes
    status: FrameStatus

    def __eq__(self, other) -> bool:
        """Define == so an ExpectedRaw and a Frame can be compared."""
        return (self.raw_encoded == other.raw_encoded
                and self.status is other.status)


class TestCase(NamedTuple):
    data: bytes
    frames: List[Union[Expected, ExpectedRaw]]
    raw_data: bytes


def case(data: bytes, frames: list, raw: Optional[bytes] = None) -> TestCase:
    """Creates a TestCase, filling in the default value for the raw bytes."""
    if raw is not None:
        return TestCase(data, frames, raw)
    if not frames or all(f.status is not FrameStatus.OK for f in frames):
        return TestCase(data, frames, data)
    if all(f.status is FrameStatus.OK for f in frames):
        return TestCase(data, frames, b'')
    raise AssertionError(
        f'Must specify expected non-frame data for this test case ({data=})!')


_PARTIAL = fcs(b'\x0ACmsg\x5e')
_ESCAPED_FLAG_TEST_CASE = case(
    b'~\x0ACmsg}~' + _PARTIAL + b'~',
    [
        Expected.error(FrameStatus.FRAMING_ERROR),
        Expected.error(FrameStatus.FRAMING_ERROR),
    ],
)

# Test cases are a tuple with the following elements:
#
#   - raw data stream
#   - expected valid & invalid frames
#   - [optional] expected raw, non-HDLC data; defaults to the full raw data
#     stream if no valid frames are expected, or b'' if only valid frames are
#     expected
#
# These tests are executed twice: once for the standard HDLC decoder, and a
# second time for the FrameAndNonFrameDecoder. The FrameAndNonFrameDecoder tests
# flush the non-frame data to simulate a timeout or MTU overflow, so the
# expected raw data includes all bytes not in an HDLC frame.
TEST_CASES: Tuple[GroupOrTest[TestCase], ...] = (
    'Empty payload',
    case(_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
    case(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
    case(_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
    'Simple one-byte payload',
    case(_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
    case(_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
    'Simple multi-byte payload',
    case(_encode(0, 0, b'Hello, world!'),
         [Expected(0, b'\0', b'Hello, world!')]),
    case(_encode(123, 0, b'\0\0\1\0\0'),
         [Expected(123, b'\0', b'\0\0\1\0\0')]),
    'Escaped one-byte payload',
    case(_encode(1, 2, b'~'), [Expected(1, b'\2', b'~')]),
    case(_encode(1, 2, b'}'), [Expected(1, b'\2', b'}')]),
    case(
        _encode(1, 2, b'~') + _encode(1, 2, b'}'),
        [Expected(1, b'\2', b'~'),
         Expected(1, b'\2', b'}')]),
    'Escaped address',
    case(_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]),
    case(_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]),
    'Escaped control',
    case(_encode(0, 0x7e, b'C'), [Expected(0, b'~', b'C')]),
    case(_encode(0, 0x7d, b'D'), [Expected(0, b'}', b'D')]),
    'Escaped address and control',
    case(_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'}', b'E')]),
    case(_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'~', b'F')]),
    case(_encode(0x7e, 0x7e, b'~'), [Expected(0x7e, b'~', b'~')]),
    'Multibyte address',
    case(_encode(128, 0, b'big address'),
         [Expected(128, b'\0', b'big address')]),
    case(_encode(0xffffffff, 0, b'\0\0\1\0\0'),
         [Expected(0xffffffff, b'\0', b'\0\0\1\0\0')]),
    'Multiple frames separated by single flag',
    case(
        _encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
        [Expected(0, b'\0', b'A'),
         Expected(1, b'\2', b'123')]),
    case(
        _encode(0xff, 0, b'Yo')[:-1] * 3 + b'~',
        [Expected(0xff, b'\0', b'Yo')] * 3),
    'Empty frames produce framing errors with raw data',
    case(b'~~', [ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR)], b'~~'),
    case(b'~' * 10, [
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
    ]),
    case(
        b'~~' + _encode(1, 2, b'3') + b'~' * 5,
        [
            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
            Expected(1, b'\2', b'3'),
            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
            ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
            # One flag byte remains in the decoding state machine.
        ],
        b'~~~~~~~'),
    case(b'~' * 10 + _encode(1, 2, b':O') + b'~' * 3 + _encode(3, 4, b':P'), [
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        Expected(1, b'\2', b':O'),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        Expected(3, b'\4', b':P')
    ], b'~' * 13),
    'Cannot escape flag',
    case(b'~\xAA}~\xab\x00Hello' + fcs(b'\xab\0Hello') + b'~', [
        Expected.error(FrameStatus.FRAMING_ERROR),
        Expected(0x55, b'\0', b'Hello'),
    ], b'~\xAA}'),
    _ESCAPED_FLAG_TEST_CASE,
    'Frame too short',
    case(b'~1~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~12~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~12345~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    'Multibyte address too long',
    case(_encode(2**100, 0, b'too long'),
         [Expected.error(FrameStatus.BAD_ADDRESS)]),
    'Incorrect frame check sequence',
    case(b'~123456~', [Expected.error(FrameStatus.FCS_MISMATCH)]),
    case(b'~\1\2msg\xff\xff\xff\xff~',
         [Expected.error(FrameStatus.FCS_MISMATCH)]),
    case(
        _encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [
            Expected.error(FrameStatus.FCS_MISMATCH),
            Expected(1, b'\2', b'def'),
        ],
        _encode(0xA, 0xB, b'???')[:-2]),
    'Invalid escape in address',
    case(b'~}}\0' + fcs(b'\x5d\0') + b'~',
         [Expected.error(FrameStatus.FRAMING_ERROR)]),
    'Invalid escape in control',
    case(b'~\0}}' + fcs(b'\0\x5d') + b'~',
         [Expected.error(FrameStatus.FRAMING_ERROR)]),
    'Invalid escape in data',
    case(b'~\0\1}}' + fcs(b'\0\1\x5d') + b'~',
         [Expected.error(FrameStatus.FRAMING_ERROR)]),
    'Frame ends with escape',
    case(b'~}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~\1}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~\1\2abc}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~\1\2abcd}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~\1\2abcd1234}~', [Expected.error(FrameStatus.FRAMING_ERROR)]),
    'Inter-frame data is only escapes',
    case(b'~}~}~', [
        Expected.error(FrameStatus.FRAMING_ERROR),
        Expected.error(FrameStatus.FRAMING_ERROR),
    ]),
    case(b'~}}~}}~', [
        Expected.error(FrameStatus.FRAMING_ERROR),
        Expected.error(FrameStatus.FRAMING_ERROR),
    ]),
    'Data before first flag',
    case(b'\0\1' + fcs(b'\0\1'), []),
    case(b'\0\1' + fcs(b'\0\1') + b'~',
         [Expected.error(FrameStatus.FRAMING_ERROR)]),
    'No frames emitted until flag',
    case(_encode(1, 2, b'3')[:-1], []),
    case(b'~' + _encode(1, 2, b'3')[1:-1] * 2, []),
    'Only flag and escape characters can be escaped',
    case(b'~}\0' + _encode(1, 2, b'3'),
         [Expected.error(FrameStatus.FRAMING_ERROR),
          Expected(1, b'\2', b'3')], b'~}\0'),
    case(b'~1234}a' + _encode(1, 2, b'3'),
         [Expected.error(FrameStatus.FRAMING_ERROR),
          Expected(1, b'\2', b'3')], b'~1234}a'),
    'Invalid frame records raw data',
    case(b'Hello?~', [ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR)]),
    case(b'~~Hel}}lo~', [
        Expected.error(FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'Hel}}lo~', FrameStatus.FRAMING_ERROR),
    ]),
    case(b'Hello?~~~~~', [
        ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR),
        Expected.error(FrameStatus.FRAMING_ERROR),
        Expected.error(FrameStatus.FRAMING_ERROR),
    ]),
    case(b'~~~~Hello?~~~~~', [
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'Hello?~', FrameStatus.FCS_MISMATCH),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~~', FrameStatus.FRAMING_ERROR),
    ]),
    case(b'Hello?~~Goodbye~', [
        ExpectedRaw(b'Hello?~', FrameStatus.FRAMING_ERROR),
        ExpectedRaw(b'~Goodbye~', FrameStatus.FCS_MISMATCH),
    ]),
    'Valid data followed by frame followed by invalid',
    case(
        b'Hi~ this is a log message\r\n' + _encode(0, 0, b'') +
        b'More log messages!\r\n', [
            Expected.error(FrameStatus.FRAMING_ERROR),
            Expected.error(FrameStatus.FCS_MISMATCH),
            Expected(0, b'\0', b''),
        ], b'Hi~ this is a log message\r\nMore log messages!\r\n'),
    case(b'Hi~ this is a log message\r\n',
         [Expected.error(FrameStatus.FRAMING_ERROR)]),
    case(b'~Hi~' + _encode(1, 2, b'def') + b' How are you?', [
        Expected.error(FrameStatus.FRAMING_ERROR),
        Expected(1, b'\2', b'def')
    ], b'~Hi~ How are you?'),
)  # yapf: disable
# Formatting for the above tuple is very slow, so disable yapf. Manually enable
# it as needed to format the test cases.

_TESTS = TestGenerator(TEST_CASES)


def _expected(frames: List[Frame]) -> Iterator[str]:
    for i, frame in enumerate(frames, 1):
        if frame.ok():
            yield f'      Frame::Parse(kDecodedFrame{i:02}).value(),'
        elif frame.status is FrameStatus.BAD_ADDRESS:
            yield f'      Frame::Parse(kDecodedFrame{i:02}).status(),'
        else:
            yield f'      Status::DataLoss(),  // Frame {i}'


_CPP_HEADER = """\
#include "pw_hdlc/decoder.h"

#include <array>
#include <cstddef>
#include <variant>

#include "gtest/gtest.h"
#include "pw_bytes/array.h"

namespace pw::hdlc {
namespace {
"""

_CPP_FOOTER = """\
}  // namespace
}  // namespace pw::hdlc"""

_TS_HEADER = """\
import 'jasmine';

import {Buffer} from 'buffer';

import {Decoder, FrameStatus} from './decoder'
import * as protocol from './protocol'
import * as util from './util'

class Expected {
  address: number
  control: Uint8Array
  data: Uint8Array
  status: FrameStatus

  constructor(
      address: number,
      control: Uint8Array,
      data: Uint8Array,
      status: FrameStatus) {
    this.address = address;
    this.control = control;
    this.data = data;
    this.status = status;
  }
}

class ExpectedRaw {
  raw: Uint8Array
  status: FrameStatus

  constructor(raw: Uint8Array, status: FrameStatus) {
    this.status = status;
    this.raw = raw;
  }
}

describe('Decoder', () => {
  let decoder: Decoder;
  let textEncoder: TextEncoder;

  beforeEach(() => {
    decoder = new Decoder();
    textEncoder = new TextEncoder();
  });

"""
_TS_FOOTER = """\
});
"""


def _py_only_frame(frame: Frame) -> bool:
    """Returns true for frames only returned by the Python library"""
    return (frame.status is FrameStatus.FRAMING_ERROR
            and frame.raw_encoded == b'~~')


def _cpp_test(ctx: Context) -> Iterator[str]:
    """Generates a C++ test for the provided test data."""
    data, _, _ = ctx.test_case
    frames = [
        f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f)
    ]
    data_bytes = ''.join(rf'\x{byte:02x}' for byte in data)

    yield f'TEST(Decoder, {ctx.cc_name()}) {{'
    yield f'  static constexpr auto kData = bytes::String("{data_bytes}");\n'

    for i, frame in enumerate(frames, 1):
        if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS:
            frame_bytes = ''.join(rf'\x{byte:02x}'
                                  for byte in frame.raw_decoded)
            yield (f'  static constexpr auto kDecodedFrame{i:02} = '
                   f'bytes::String("{frame_bytes}");')
        else:
            yield f'  // Frame {i}: {frame.status.value}'

    yield ''

    expected = '\n'.join(_expected(frames)) or '      // No frames'
    decoder_size = max(len(data), 8)  # Make sure large enough for a frame

    yield f"""\
  DecoderBuffer<{decoder_size}> decoder;

  static std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{
{expected}
  }};

  size_t decoded_frames = 0;

  decoder.Process(kData, [&](const Result<Frame>& result) {{
    ASSERT_LT(decoded_frames++, kExpected.size());
    auto& expected = kExpected[decoded_frames - 1];

    if (std::holds_alternative<Status>(expected)) {{
      EXPECT_EQ(Status::DataLoss(), result.status());
    }} else {{
      ASSERT_EQ(OkStatus(), result.status());

      const Frame& decoded_frame = result.value();
      const Frame& expected_frame = std::get<Frame>(expected);
      EXPECT_EQ(expected_frame.address(), decoded_frame.address());
      EXPECT_EQ(expected_frame.control(), decoded_frame.control());
      ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size());
      EXPECT_EQ(std::memcmp(expected_frame.data().data(),
                            decoded_frame.data().data(),
                            expected_frame.data().size()),
                0);
    }}
  }});

  EXPECT_EQ(decoded_frames, kExpected.size());
}}"""


def _define_py_decoder_test(ctx: Context) -> PyTest:
    data, expected_frames, _ = ctx.test_case

    def test(self) -> None:
        self.maxDiff = None
        # Decode in one call
        self.assertEqual(expected_frames,
                         list(FrameDecoder().process(data)),
                         msg=f'{ctx.group}: {data!r}')
        # Decode byte-by-byte
        decoder = FrameDecoder()
        decoded_frames: List[Frame] = []
        for i in range(len(data)):
            decoded_frames += decoder.process(data[i:i + 1])

        self.assertEqual(expected_frames,
                         decoded_frames,
                         msg=f'{ctx.group} (byte-by-byte): {data!r}')

    return test


def _define_raw_decoder_py_test(ctx: Context) -> PyTest:
    raw_data, expected_frames, expected_non_frame_data = ctx.test_case

    # The non-frame data decoder only yields valid frames.
    expected_frames = [
        f for f in expected_frames if f.status is FrameStatus.OK
    ]

    def test(self) -> None:
        self.maxDiff = None

        non_frame_data = bytearray()

        # Decode in one call
        decoder = FrameAndNonFrameDecoder(
            non_frame_data_handler=non_frame_data.extend)

        self.assertEqual(expected_frames,
                         list(decoder.process(raw_data)),
                         msg=f'{ctx.group}: {raw_data!r}')

        decoder.flush_non_frame_data()
        self.assertEqual(expected_non_frame_data, bytes(non_frame_data))

        # Decode byte-by-byte
        non_frame_data.clear()
        decoder = FrameAndNonFrameDecoder(
            non_frame_data_handler=non_frame_data.extend)
        decoded_frames: List[Frame] = []
        for i in range(len(raw_data)):
            decoded_frames += decoder.process(raw_data[i:i + 1])

        self.assertEqual(expected_frames,
                         decoded_frames,
                         msg=f'{ctx.group} (byte-by-byte): {raw_data!r}')
        decoder.flush_non_frame_data()
        self.assertEqual(expected_non_frame_data, bytes(non_frame_data))

    return test


def _ts_byte_array(data: bytes) -> str:
    return '[' + ', '.join(rf'0x{byte:02x}' for byte in data) + ']'


def _ts_test(ctx: Context) -> Iterator[str]:
    """Generates a TS test for the provided test data."""
    data, _, _ = ctx.test_case
    frames = [
        f for f in list(FrameDecoder().process(data)) if not _py_only_frame(f)
    ]
    data_bytes = _ts_byte_array(data)

    yield f'  it(\'{ctx.ts_name()}\', () => {{'
    yield f'    const data = new Uint8Array({data_bytes});'

    yield '    const expectedFrames = ['
    for frame in frames:
        control_bytes = _ts_byte_array(frame.control)
        frame_bytes = _ts_byte_array(frame.data)

        if frame is Expected:
            yield (f'      new Expected({frame.address}, '
                   f'new Uint8Array({control_bytes}), '
                   f'new Uint8Array({frame_bytes}), {frame.status}),')
        else:
            raw = _ts_byte_array(frame.raw_encoded)
            yield (
                f'      new ExpectedRaw(new Uint8Array({raw}), {frame.status}),'
            )

    yield '    ].values();\n'

    yield """\
    const result = decoder.process(data);

    while (true) {
      const expectedFrame = expectedFrames.next();
      const actualFrame = result.next();
      if (expectedFrame.done && actualFrame.done) {
        break;
      }
      expect(expectedFrame.done).toBeFalse();
      expect(actualFrame.done).toBeFalse();

      const expected = expectedFrame.value;
      const actual = actualFrame.value;
      if (expected instanceof Expected) {
        expect(actual.address).toEqual(expected.address);
        expect(actual.control).toEqual(expected.control);
        expect(actual.data).toEqual(expected.data);
        expect(actual.status).toEqual(expected.status);
      } else {
        // Expected Raw
        expect(actual.rawEncoded).toEqual(expected.raw);
        expect(actual.status).toEqual(expected.status);
      }
    }
  });
"""


# Class that tests all cases in TEST_CASES.
DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_decoder_test)
NonFrameDecoderTest = _TESTS.python_tests('NonFrameDecoderTest',
                                          _define_raw_decoder_py_test)


class AdditionalNonFrameDecoderTests(unittest.TestCase):
    """Additional tests for the non-frame decoder."""
    def test_shared_flags_waits_for_tilde_to_emit_data(self) -> None:
        non_frame_data = bytearray()
        decoder = FrameAndNonFrameDecoder(non_frame_data.extend)

        self.assertEqual([Expected(0, b'\0', b'')],
                         list(decoder.process(_encode(0, 0, b''))))
        self.assertEqual(non_frame_data, b'')

        self.assertEqual([], list(decoder.process(b'uh oh, no tilde!')))
        self.assertEqual(non_frame_data, b'')

        self.assertEqual([], list(decoder.process(b'~')))
        self.assertEqual(non_frame_data, b'uh oh, no tilde!')

    def test_no_shared_flags_immediately_emits_data(self) -> None:
        non_frame_data = bytearray()
        decoder = FrameAndNonFrameDecoder(non_frame_data.extend,
                                          handle_shared_flags=False)

        self.assertEqual([Expected(0, b'\0', b'')],
                         list(decoder.process(_encode(0, 0, b''))))
        self.assertEqual(non_frame_data, b'')

        self.assertEqual([], list(decoder.process(b'uh oh, no tilde!')))
        self.assertEqual(non_frame_data, b'uh oh, no tilde!')

    def test_emits_data_if_mtu_is_exceeded(self) -> None:
        frame_start = b'~this looks like a real frame'

        non_frame_data = bytearray()
        decoder = FrameAndNonFrameDecoder(non_frame_data.extend,
                                          mtu=len(frame_start))

        self.assertEqual([], list(decoder.process(frame_start)))
        self.assertEqual(non_frame_data, b'')

        self.assertEqual([], list(decoder.process(b'!')))
        self.assertEqual(non_frame_data, frame_start + b'!')

    def test_emits_data_if_timeout_expires(self) -> None:
        frame_start = b'~this looks like a real frame'

        non_frame_data: 'queue.Queue[bytes]' = queue.Queue()
        decoder = FrameAndNonFrameDecoder(non_frame_data.put, timeout_s=0.001)

        self.assertEqual([], list(decoder.process(frame_start)))
        self.assertEqual(non_frame_data.get(timeout=2), frame_start)

    def test_emits_raw_data_and_valid_frame_if_flushed_partway(self) -> None:
        payload = b'Do you wanna ride in my blimp?'
        frame = _encode(1, 2, payload)

        non_frame_data = bytearray()
        decoder = FrameAndNonFrameDecoder(non_frame_data.extend)

        self.assertEqual([], list(decoder.process(frame[:5])))
        decoder.flush_non_frame_data()

        self.assertEqual([Expected(1, b'\2', payload)],
                         list(decoder.process(frame[5:])))


if __name__ == '__main__':
    args = parse_test_generation_args()
    if args.generate_cc_test:
        _TESTS.cc_tests(args.generate_cc_test, _cpp_test, _CPP_HEADER,
                        _CPP_FOOTER)
    elif args.generate_ts_test:
        _TESTS.ts_tests(args.generate_ts_test, _ts_test, _TS_HEADER,
                        _TS_FOOTER)
    else:
        unittest.main()
