pw_hdlc_lite: Added a Python Decoder module

Change-Id: I82a6fef9f629ced56c16ba2f502a21ecca719da7
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/14585
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Shane Gomindes <shaneajg@google.com>
diff --git a/pw_hdlc_lite/py/decoder_test.py b/pw_hdlc_lite/py/decoder_test.py
new file mode 100644
index 0000000..9d3038a
--- /dev/null
+++ b/pw_hdlc_lite/py/decoder_test.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python3
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests the hdlc_lite decoder module."""
+
+import unittest
+
+from pw_hdlc_lite import decoder
+
+
+class TestDecoder(unittest.TestCase):
+    """Tests decoding bytes with different arguments."""
+    def test_decode_1byte_payload(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7EA\x15\xB9\x7E'
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(decoded_packets, [b'A'])
+
+    def test_decode_empty_payload(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7E\xFF\xFF\x7E'
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(decoded_packets, [b''])
+
+    def test_decode_9byte_payload(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7E123456789\xB1\x29\x7E'
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(decoded_packets, [b'123456789'])
+
+    def test_decode_unescaping_payload_escapeflag(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7E\x7D\x5D\xCA\x4E\x7E'
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(decoded_packets, [b'\x7D'])
+
+    def test_decode_unescaping_payload_framedelimiter(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7E\x7D\x5E\xA9\x7D\x5E\x7E'
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(decoded_packets, [b'\x7E'])
+
+    def test_decode_unescaping_payload_mix(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7E\x7D\x5E\x7Babc\x7D\x5D\x7D\x5E\x49\xE5\x7E'
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(decoded_packets, [b'~{abc}~'])
+
+    def test_decode_in_parts(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7EA\x15\xB9\x7E\x7EA\x15\xB9\x7E'
+        decoded_packets = list(decode.add_bytes(test_array[:3]))
+        self.assertEqual(decoded_packets, [])
+        decoded_packets = list(decode.add_bytes(test_array[3:8]))
+        self.assertEqual(decoded_packets, [b'A'])
+        decoded_packets = list(decode.add_bytes(test_array[8:]))
+        self.assertEqual(decoded_packets, [b'A'])
+
+        decoded_packets = list(decode.add_bytes(test_array))
+        self.assertEqual(len(decoded_packets), 2)
+        self.assertEqual(decoded_packets, [b'A', b'A'])
+
+    def test_decode_incorrectcrc(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7EA\x15\xB8\x7E'
+        with self.assertRaises(decoder.CrcMismatchError):
+            next(decode.add_bytes(test_array))
+
+    def test_decode_incorrectcrc_mix(self):
+        decode = decoder.Decoder()
+        test_array = b'\x7EA\x15\xB9\x7E\x7EA\x15\xB8\x7E'
+        decoded_packets = decode.add_bytes(test_array)
+        self.assertEqual(next(decoded_packets), b'A')
+        with self.assertRaises(decoder.CrcMismatchError):
+            next(decoded_packets)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py b/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py
new file mode 100644
index 0000000..88a0e38
--- /dev/null
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/decoder.py
@@ -0,0 +1,62 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Decoder class for decoding bytes using HDLC-Lite protocol"""
+
+import binascii
+
+HDLC_UNESCAPING_CONSTANT = 0x20
+HDLC_ESCAPE = 0x7D
+HDLC_FRAME_DELIMITER = 0x7E
+
+
+class CrcMismatchError(Exception):
+    pass
+
+
+class Decoder:
+    """Decodes the received _data-frames using the HDLC-Lite protocol."""
+    def __init__(self):
+        self._data = bytearray()
+        self._unescape_next_byte_flag = False
+
+    def add_bytes(self, byte_array: bytes):
+        """Unescapes the bytes and yields the CRC-verified packets.
+
+        If the CRC-verification fails, the function will raise a
+        CrcMismatchError exception.
+        """
+        for byte in byte_array:
+            if byte == HDLC_FRAME_DELIMITER:
+                if self._data:
+                    if self._check_crc():
+                        yield self._data[:-2]
+                    else:
+                        raise CrcMismatchError()
+                    self._data.clear()
+            elif byte == HDLC_ESCAPE:
+                self._unescape_next_byte_flag = True
+            else:
+                self._add_unescaped_byte(byte)
+
+    def _add_unescaped_byte(self, byte):
+        """Unescapes the bytes based on the _unescape_next_byte_flag flag."""
+        if self._unescape_next_byte_flag:
+            self._data.append(byte ^ HDLC_UNESCAPING_CONSTANT)
+            self._unescape_next_byte_flag = False
+        else:
+            self._data.append(byte)
+
+    def _check_crc(self):
+        return binascii.crc_hqx(self._data[:-2], 0xFFFF).to_bytes(
+            2, byteorder='little') == self._data[-2:]
diff --git a/pw_hdlc_lite/py/setup.py b/pw_hdlc_lite/py/setup.py
new file mode 100644
index 0000000..9ec1c48
--- /dev/null
+++ b/pw_hdlc_lite/py/setup.py
@@ -0,0 +1,25 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""pw_hdlc_lite"""
+
+import setuptools
+
+setuptools.setup(
+    name='pw_hdlc_lite',
+    version='0.0.1',
+    author='Pigweed Authors',
+    author_email='pigweed-developers@googlegroups.com',
+    description='Tools for Encoding/Decoding data using the HDLC-Lite protocol',
+    packages=setuptools.find_packages(),
+)