pw_hdlc_lite: Update encoder function
- Add address and control bytes.
- Use CRC-32 instead of CRC-16-CCITT.
Change-Id: Idc3ca5c3d219e14dba928ac0a4cb250507bf3911
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/17843
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_hdlc_lite/py/encoder_test.py b/pw_hdlc_lite/py/encoder_test.py
old mode 100644
new mode 100755
index 2150378..7374f2d
--- a/pw_hdlc_lite/py/encoder_test.py
+++ b/pw_hdlc_lite/py/encoder_test.py
@@ -16,60 +16,41 @@
import unittest
-from pw_hdlc_lite import encoder
+from pw_hdlc_lite.encoder import encode_information_frame
+from pw_hdlc_lite import protocol
+from pw_hdlc_lite.protocol import frame_check_sequence as _fcs
+
+FLAG = bytes([protocol.FLAG])
-class TestEncoderFunctions(unittest.TestCase):
+def _with_fcs(data: bytes) -> bytes:
+ return data + _fcs(data)
+
+
+class TestEncodeInformationFrame(unittest.TestCase):
"""Tests Encoding bytes with different arguments using a custom serial."""
- def test_encode_1byte_payload(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'A', data.extend)
+ def test_empty(self):
+ self.assertEqual(encode_information_frame(0, b''),
+ FLAG + _with_fcs(b'\0\0') + FLAG)
+ self.assertEqual(encode_information_frame(0x1a, b''),
+ FLAG + _with_fcs(b'\x1a\0') + FLAG)
- expected_bytes = b'\x7EA\x15\xB9\x7E'
- self.assertEqual(data, expected_bytes)
+ def test_1byte(self):
+ self.assertEqual(encode_information_frame(0, b'A'),
+ FLAG + _with_fcs(b'\0\0A') + FLAG)
- def test_encode_empty_payload(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'', data.extend)
+ def test_multibyte(self):
+ self.assertEqual(encode_information_frame(0, b'123456789'),
+ FLAG + _with_fcs(b'\x00\x00123456789') + FLAG)
- expected_bytes = b'\x7E\xFF\xFF\x7E'
- self.assertEqual(data, expected_bytes)
-
- def test_encode_9byte_payload(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'123456789', data.extend)
-
- expected_bytes = b'\x7E123456789\xB1\x29\x7E'
- self.assertEqual(data, expected_bytes)
-
- def test_encode_unescaping_payload_escapeflag(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'\x7D', data.extend)
-
- expected_bytes = b'\x7E\x7D\x5D\xCA\x4E\x7E'
- self.assertEqual(data, expected_bytes)
-
- def test_encode_unescaping_payload_framedelimiter(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'\x7E', data.extend)
-
- expected_bytes = b'\x7E\x7D\x5E\xA9\x7D\x5E\x7E'
- self.assertEqual(data, expected_bytes)
-
- def test_encode_unescaping_payload_mix(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'~{abc}~', data.extend)
-
- expected_bytes = b'\x7E\x7D\x5E\x7Babc\x7D\x5D\x7D\x5E\x49\xE5\x7E'
- self.assertEqual(data, expected_bytes)
-
- def test_encode_multiple_payloads(self):
- data = bytearray()
- encoder.encode_and_write_payload(b'A', data.extend)
- encoder.encode_and_write_payload(b'A', data.extend)
-
- expected_bytes = b'\x7EA\x15\xB9\x7E\x7EA\x15\xB9\x7E'
- self.assertEqual(data, expected_bytes)
+ def test_escape(self):
+ self.assertEqual(
+ encode_information_frame(0x7e, b'\x7d'),
+ FLAG + b'\x7d\x5e\x00\x7d\x5d' + _fcs(b'\x7e\x00\x7d') + FLAG)
+ self.assertEqual(
+ encode_information_frame(0x7d, b'A\x7e\x7dBC'),
+ FLAG + b'\x7d\x5d\x00A\x7d\x5e\x7d\x5dBC' +
+ _fcs(b'\x7d\x00A\x7e\x7dBC') + FLAG)
if __name__ == '__main__':
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py b/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py
index 1537e70..ccb97f9 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/client_console_example.py
@@ -29,7 +29,7 @@
import serial
from pw_hdlc_lite import decoder
-from pw_hdlc_lite import encoder
+from pw_hdlc_lite.encoder import encode_information_frame
from pw_protobuf_compiler import python_protos
from pw_rpc import callback_client, client, descriptors
@@ -67,8 +67,8 @@
ser.write(bytes([byte]))
# Creating a channel object
- hdlc_channel_output = lambda data: encoder.encode_and_write_payload(
- data, delayed_write)
+ hdlc_channel_output = lambda data: ser.write(
+ encode_information_frame(data, delayed_write))
channel = descriptors.Channel(1, hdlc_channel_output)
# Creating a list of modules that provide the .proto service methods
diff --git a/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py b/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py
index 64122f8..5cf4d68 100644
--- a/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py
+++ b/pw_hdlc_lite/py/pw_hdlc_lite/encoder.py
@@ -13,24 +13,17 @@
# the License.
"""Encoder functions for encoding bytes using HDLC-Lite protocol"""
-import binascii
-
from pw_hdlc_lite import protocol
-_HDLC_ESCAPE = bytes([protocol.ESCAPE])
-_HDLC_FRAME_DELIMITER = bytes([protocol.FLAG])
+_ESCAPE_BYTE = bytes([protocol.ESCAPE])
+_FLAG_BYTE = bytes([protocol.FLAG])
+_CONTROL = 0 # Currently, hard-coded to 0; no sequence numbers are used
-def encode_and_write_payload(payload, write):
- """Escapes the payload and writes the data-frame to the serial device."""
-
- write(_HDLC_FRAME_DELIMITER)
-
- # The crc_hqx function computes the 2-byte CCITT-CRC16 value
- crc = binascii.crc_hqx(payload, 0xFFFF).to_bytes(2, byteorder='little')
- payload += crc
- payload = payload.replace(_HDLC_ESCAPE, b'\x7D\x5D')
- payload = payload.replace(_HDLC_FRAME_DELIMITER, b'\x7D\x5E')
- write(payload)
-
- write(_HDLC_FRAME_DELIMITER)
+def encode_information_frame(address: int, data: bytes) -> bytes:
+ """Encodes an HDLC I-frame."""
+ frame = bytearray([address, _CONTROL]) + data
+ frame += protocol.frame_check_sequence(frame)
+ frame = frame.replace(_ESCAPE_BYTE, b'\x7d\x5d')
+ frame = frame.replace(_FLAG_BYTE, b'\x7d\x5e')
+ return b''.join([_FLAG_BYTE, frame, _FLAG_BYTE])