pw_hdlc: Record and log raw bytes while decoding
- Record raw encoded and decoded bytes in the Frame object.
- Log the raw encoded bytes when there are decoding errors.
Change-Id: Id0f6b2f219ae143f01a1fd50ef58303dea331341
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/30840
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Reviewed-by: Zoltan Szatmary-Ban <szatmz@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
diff --git a/pw_hdlc/py/decode_test.py b/pw_hdlc/py/decode_test.py
index e5da8d5..ff4b6b6 100755
--- a/pw_hdlc/py/decode_test.py
+++ b/pw_hdlc/py/decode_test.py
@@ -14,7 +14,7 @@
# the License.
"""Contains the Python decoder tests and generates C++ decoder tests."""
-from typing import Iterator, List, NamedTuple, Tuple
+from typing import Iterator, List, NamedTuple, Tuple, Union
import unittest
from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest
@@ -48,6 +48,18 @@
and self.data == other.data and self.status is other.status)
+class ExpectedRaw(NamedTuple):
+ raw_encoded: bytes
+ status: FrameStatus
+
+ def __eq__(self, other) -> bool:
+ """Define == so an ExpectedRaw and a Frame can be compared."""
+ return (self.raw_encoded == other.raw_encoded
+ and self.status is other.status)
+
+
+Expectation = Union[Expected, ExpectedRaw]
+
_PARTIAL = fcs(b'\x0ACmsg\x5e')
_ESCAPED_FLAG_TEST_CASE = (
b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e',
@@ -57,7 +69,7 @@
],
)
-TEST_CASES: Tuple[GroupOrTest[Tuple[bytes, List[Expected]]], ...] = (
+TEST_CASES: Tuple[GroupOrTest[Tuple[bytes, List[Expectation]]], ...] = (
'Empty payload',
(_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
(_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
@@ -154,6 +166,16 @@
(b'\x7e1234\x7da' + _encode(1, 2, b'3'),
[Expected.error(FrameStatus.FRAMING_ERROR),
Expected(1, b'\2', b'3')]),
+ 'Invalid frame records raw data',
+ (b'Hello?~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]),
+ (b'~~Hel\x7d\x7dlo~',
+ [ExpectedRaw(b'Hel\x7d\x7dlo', FrameStatus.FRAMING_ERROR)]),
+ (b'Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]),
+ (b'~~~~Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FCS_MISMATCH)]),
+ (b'Hello?~~Goodbye~', [
+ ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR),
+ ExpectedRaw(b'Goodbye', FrameStatus.FCS_MISMATCH),
+ ]),
) # yapf: disable
# Formatting for the above tuple is very slow, so disable yapf.
@@ -198,7 +220,8 @@
for i, frame in enumerate(frames, 1):
if frame.status is FrameStatus.OK:
- frame_bytes = ''.join(rf'\x{byte:02x}' for byte in frame.raw)
+ frame_bytes = ''.join(rf'\x{byte:02x}'
+ for byte in frame.raw_decoded)
yield (f' static constexpr auto kDecodedFrame{i:02} = '
f'bytes::String("{frame_bytes}");')
else:
diff --git a/pw_hdlc/py/pw_hdlc/decode.py b/pw_hdlc/py/pw_hdlc/decode.py
index dbf0d83..d727bc3 100644
--- a/pw_hdlc/py/pw_hdlc/decode.py
+++ b/pw_hdlc/py/pw_hdlc/decode.py
@@ -13,9 +13,10 @@
# the License.
"""Decoder class for decoding bytes using HDLC protocol"""
+from dataclasses import dataclass
import enum
import logging
-from typing import Iterator, NamedTuple, Optional
+from typing import Iterator, Optional
import zlib
from pw_hdlc import protocol
@@ -33,11 +34,15 @@
FRAMING_ERROR = 'invalid flag or escape characters'
-class Frame(NamedTuple):
+@dataclass(frozen=True)
+class Frame:
"""Represents an HDLC frame."""
- # All bytes in the frame (address, control, information, FCS)
- raw: bytes
+ # The complete HDLC-encoded frame, excluding HDLC flag characters.
+ raw_encoded: bytes
+
+ # The complete decoded frame (address, control, information, FCS).
+ raw_decoded: bytes
# Whether parsing the frame succeeded.
status: FrameStatus = FrameStatus.OK
@@ -45,17 +50,17 @@
@property
def address(self) -> int:
"""The frame's address field (assumes only one byte for now)."""
- return self.raw[0] if self.ok() else NO_ADDRESS
+ return self.raw_decoded[0] if self.ok() else NO_ADDRESS
@property
def control(self) -> bytes:
"""The control byte (assumes only one byte for now)."""
- return self.raw[1:2] if self.ok() else b''
+ return self.raw_decoded[1:2] if self.ok() else b''
@property
def data(self) -> bytes:
"""The information field in the frame."""
- return self.raw[2:-4] if self.ok() else b''
+ return self.raw_decoded[2:-4] if self.ok() else b''
def ok(self) -> bool:
"""True if this represents a valid frame.
@@ -96,7 +101,8 @@
class FrameDecoder:
"""Decodes one or more HDLC frames from a stream of data."""
def __init__(self):
- self._data = bytearray()
+ self._decoded_data = bytearray()
+ self._raw_data = bytearray()
self._state = _State.INTERFRAME
def process(self, data: bytes) -> Iterator[Frame]:
@@ -120,43 +126,48 @@
yield frame
else:
_LOG.warning('Failed to decode frame: %s; discarded %d bytes',
- frame.status.value, len(frame.data))
- _LOG.debug('Discarded data: %s', frame.data)
+ frame.status.value, len(frame.raw_encoded))
+ _LOG.debug('Discarded data: %s', frame.raw_encoded)
+
+ def _finish_frame(self, status: FrameStatus) -> Frame:
+ frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
+ self._raw_data.clear()
+ self._decoded_data.clear()
+ return frame
def _process_byte(self, byte: int) -> Optional[Frame]:
frame: Optional[Frame] = None
+ # Record every byte except the flag character.
+ if byte != protocol.FLAG:
+ self._raw_data.append(byte)
+
if self._state is _State.INTERFRAME:
if byte == protocol.FLAG:
- if self._data:
- frame = Frame(bytes(self._data), FrameStatus.FRAMING_ERROR)
+ if self._raw_data:
+ frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
self._state = _State.FRAME
- self._data.clear()
- else:
- self._data.append(byte)
elif self._state is _State.FRAME:
if byte == protocol.FLAG:
- if self._data:
- frame = Frame(bytes(self._data), _check_frame(self._data))
+ if self._raw_data:
+ frame = self._finish_frame(_check_frame(
+ self._decoded_data))
self._state = _State.FRAME
- self._data.clear()
elif byte == protocol.ESCAPE:
self._state = _State.FRAME_ESCAPE
else:
- self._data.append(byte)
+ self._decoded_data.append(byte)
elif self._state is _State.FRAME_ESCAPE:
if byte == protocol.FLAG:
- frame = Frame(bytes(self._data), FrameStatus.FRAMING_ERROR)
+ frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
self._state = _State.FRAME
- self._data.clear()
- elif byte in (0x5d, 0x5e): # Valid escape characters
+ elif byte in protocol.VALID_ESCAPED_BYTES:
self._state = _State.FRAME
- self._data.append(protocol.escape(byte))
+ self._decoded_data.append(protocol.escape(byte))
else:
self._state = _State.INTERFRAME
- self._data.append(byte)
else:
raise AssertionError(f'Invalid decoder state: {self._state}')
diff --git a/pw_hdlc/py/pw_hdlc/protocol.py b/pw_hdlc/py/pw_hdlc/protocol.py
index a9e7251..a6dd83b 100644
--- a/pw_hdlc/py/pw_hdlc/protocol.py
+++ b/pw_hdlc/py/pw_hdlc/protocol.py
@@ -21,6 +21,9 @@
# Special character for escaping other special characters in a frame.
ESCAPE = 0x7D
+# Characters allowed after a 0x7d escape character.
+VALID_ESCAPED_BYTES = 0x5D, 0x5E
+
def escape(byte: int) -> int:
"""Escapes or unescapes a byte, which should have been preceeded by 0x7d."""