| #!/usr/bin/env python3 |
| # coding=utf-8 |
| |
| # |
| # Copyright (c) 2020 Project CHIP Authors |
| # Copyright (c) 2019-2020 Google LLC. |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # |
| # @file |
| # This file contains definitions for working with data encoded in Chip TLV format |
| # |
| |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| import struct |
| from collections import Mapping, Sequence, OrderedDict |
| |
| |
| TLV_TYPE_SIGNED_INTEGER = 0x00 |
| TLV_TYPE_UNSIGNED_INTEGER = 0x04 |
| TLV_TYPE_BOOLEAN = 0x08 |
| TLV_TYPE_FLOATING_POINT_NUMBER = 0x0A |
| TLV_TYPE_UTF8_STRING = 0x0C |
| TLV_TYPE_BYTE_STRING = 0x10 |
| TLV_TYPE_NULL = 0x14 |
| TLV_TYPE_STRUCTURE = 0x15 |
| TLV_TYPE_ARRAY = 0x16 |
| TLV_TYPE_PATH = 0x17 |
| |
| TLV_TAG_CONTROL_ANONYMOUS = 0x00 |
| TLV_TAG_CONTROL_CONTEXT_SPECIFIC = 0x20 |
| TLV_TAG_CONTROL_COMMON_PROFILE_2Bytes = 0x40 |
| TLV_TAG_CONTROL_COMMON_PROFILE_4Bytes = 0x60 |
| TLV_TAG_CONTROL_IMPLICIT_PROFILE_2Bytes = 0x80 |
| TLV_TAG_CONTROL_IMPLICIT_PROFILE_4Bytes = 0xA0 |
| TLV_TAG_CONTROL_FULLY_QUALIFIED_6Bytes = 0xC0 |
| TLV_TAG_CONTROL_FULLY_QUALIFIED_8Bytes = 0xE0 |
| |
| TLVBoolean_False = TLV_TYPE_BOOLEAN |
| TLVBoolean_True = TLV_TYPE_BOOLEAN + 1 |
| |
| TLVEndOfContainer = 0x18 |
| |
| INT8_MIN = -128 |
| INT16_MIN = -32768 |
| INT32_MIN = -2147483648 |
| INT64_MIN = -9223372036854775808 |
| |
| INT8_MAX = 127 |
| INT16_MAX = 32767 |
| INT32_MAX = 2147483647 |
| INT64_MAX = 9223372036854775807 |
| |
| UINT8_MAX = 255 |
| UINT16_MAX = 65535 |
| UINT32_MAX = 4294967295 |
| UINT64_MAX = 18446744073709551615 |
| |
| ElementTypes = { |
| 0x00: "Signed Integer 1-byte value", |
| 0x01: "Signed Integer 2-byte value", |
| 0x02: "Signed Integer 4-byte value", |
| 0x03: "Signed Integer 8-byte value", |
| 0x04: "Unsigned Integer 1-byte value", |
| 0x05: "Unsigned Integer 2-byte value", |
| 0x06: "Unsigned Integer 4-byte value", |
| 0x07: "Unsigned Integer 8-byte value", |
| 0x08: "Boolean False", |
| 0x09: "Boolean True", |
| 0x0A: "Floating Point 4-byte value", |
| 0x0B: "Floating Point 8-byte value", |
| 0x0C: "UTF-8 String 1-byte length", |
| 0x0D: "UTF-8 String 2-byte length", |
| 0x0E: "UTF-8 String 4-byte length", |
| 0x0F: "UTF-8 String 8-byte length", |
| 0x10: "Byte String 1-byte length", |
| 0x11: "Byte String 2-byte length", |
| 0x12: "Byte String 4-byte length", |
| 0x13: "Byte String 8-byte length", |
| 0x14: "Null", |
| 0x15: "Structure", |
| 0x16: "Array", |
| 0x17: "Path", |
| 0x18: "End of Collection", |
| } |
| |
| TagControls = { |
| 0x00: "Anonymous", |
| 0x20: "Context 1-byte", |
| 0x40: "Common Profile 2-byte", |
| 0x60: "Common Profile 4-byte", |
| 0x80: "Implicit Profile 2-byte", |
| 0xA0: "Implicit Profile 4-byte", |
| 0xC0: "Fully Qualified 6-byte", |
| 0xE0: "Fully Qualified 8-byte", |
| } |
| |
| |
| class TLVWriter(object): |
| def __init__(self, encoding=None, implicitProfile=None): |
| self._encoding = encoding if encoding is not None else bytearray() |
| self._implicitProfile = implicitProfile |
| self._containerStack = [] |
| |
| @property |
| def encoding(self): |
| """The object into which encoded TLV data is written. |
| |
| By default this is a bytearray object. |
| """ |
| return self._encoding |
| |
| @encoding.setter |
| def encoding(self, val): |
| self._encoding = val |
| |
| @property |
| def implicitProfile(self): |
| """The Chip profile id used when encoding implicit profile tags. |
| |
| Setting this value will result in an implicit profile tag being encoded |
| whenever the profile of the tag to be encoded matches the specified implicit |
| profile id. |
| |
| Setting this value to None (the default) disabled encoding of implicit |
| profile tags. |
| """ |
| return self._implicitProfile |
| |
| @implicitProfile.setter |
| def implicitProfile(self, val): |
| self._implicitProfile = val |
| |
| def put(self, tag, val): |
| """Write a value in TLV format with the specified TLV tag. |
| |
| val can be a Python object which will be encoded as follows: |
| - Python bools, floats and strings are encoded as their respective TLV types. |
| - Python ints are encoded as unsigned TLV integers if zero or positive; signed TLV |
| integers if negative. |
| - None is encoded as a TLV Null. |
| - bytes and bytearray objects are encoded as TVL byte strings. |
| - Mapping-like objects (e.g. dict) are encoded as TLV structures. The keys of the |
| map object are expected to be tag values, as described below for the tag argument. |
| Map values are encoded recursively, using the same rules as defined for the val |
| argument. The encoding order of elements depends on the type of the map object. |
| Elements within a dict are automatically encoded tag numerical order. Elements |
| within other forms of mapping object (e.g. OrderedDict) are encoded in the |
| object's natural iteration order. |
| - Sequence-like objects (e.g. arrays) are written as TLV arrays. Elements within |
| the array are encoded recursively, using the same rules as defined for the val |
| argument. |
| |
| tag can be a small int (0-255), a tuple of two integers, or None. |
| If tag is an integer, it is encoded as a TLV context-specific tag. |
| If tag is a two-integer tuple, it is encoded as a TLV profile-specific tag, with |
| the first integer encoded as the profile id and the second as the tag number. |
| If tag is None, it is encoded as a TLV anonymous tag. |
| """ |
| if val is None: |
| self.putNull(tag) |
| elif isinstance(val, bool): |
| self.putBool(tag, val) |
| elif isinstance(val, int): |
| if val < 0: |
| self.putSignedInt(tag, val) |
| else: |
| self.putUnsignedInt(tag, val) |
| elif isinstance(val, float): |
| self.putFloat(tag, val) |
| elif isinstance(val, str): |
| self.putString(tag, val) |
| elif isinstance(val, bytes) or isinstance(val, bytearray): |
| self.putBytes(tag, val) |
| elif isinstance(val, Mapping): |
| self.startStructure(tag) |
| if type(val) == dict: |
| val = OrderedDict( |
| sorted(val.items(), key=lambda item: tlvTagToSortKey(item[0])) |
| ) |
| for containedTag, containedVal in val.items(): |
| self.put(containedTag, containedVal) |
| self.endContainer() |
| elif isinstance(val, Sequence): |
| self.startArray(tag) |
| for containedVal in val: |
| self.put(None, containedVal) |
| self.endContainer() |
| else: |
| raise ValueError("Attempt to TLV encode unsupported value") |
| |
| def putSignedInt(self, tag, val): |
| """Write a value as a TLV signed integer with the specified TLV tag.""" |
| if val >= INT8_MIN and val <= INT8_MAX: |
| format = "<b" |
| elif val >= INT16_MIN and val <= INT16_MAX: |
| format = "<h" |
| elif val >= INT32_MIN and val <= INT32_MAX: |
| format = "<l" |
| elif val >= INT64_MIN and val <= INT64_MAX: |
| format = "<q" |
| else: |
| raise ValueError("Integer value out of range") |
| val = struct.pack(format, val) |
| controlAndTag = self._encodeControlAndTag( |
| TLV_TYPE_SIGNED_INTEGER, tag, lenOfLenOrVal=len(val) |
| ) |
| self._encoding.extend(controlAndTag) |
| self._encoding.extend(val) |
| |
| def putUnsignedInt(self, tag, val): |
| """Write a value as a TLV unsigned integer with the specified TLV tag.""" |
| val = self._encodeUnsignedInt(val) |
| controlAndTag = self._encodeControlAndTag( |
| TLV_TYPE_UNSIGNED_INTEGER, tag, lenOfLenOrVal=len(val) |
| ) |
| self._encoding.extend(controlAndTag) |
| self._encoding.extend(val) |
| |
| def putFloat(self, tag, val): |
| """Write a value as a TLV float with the specified TLV tag.""" |
| val = struct.pack("d", val) |
| controlAndTag = self._encodeControlAndTag( |
| TLV_TYPE_FLOATING_POINT_NUMBER, tag, lenOfLenOrVal=len(val) |
| ) |
| self._encoding.extend(controlAndTag) |
| self._encoding.extend(val) |
| |
| def putString(self, tag, val): |
| """Write a value as a TLV string with the specified TLV tag.""" |
| val = val.encode("utf-8") |
| valLen = self._encodeUnsignedInt(len(val)) |
| controlAndTag = self._encodeControlAndTag( |
| TLV_TYPE_UTF8_STRING, tag, lenOfLenOrVal=len(valLen) |
| ) |
| self._encoding.extend(controlAndTag) |
| self._encoding.extend(valLen) |
| self._encoding.extend(val) |
| |
| def putBytes(self, tag, val): |
| """Write a value as a TLV byte string with the specified TLV tag.""" |
| valLen = self._encodeUnsignedInt(len(val)) |
| controlAndTag = self._encodeControlAndTag( |
| TLV_TYPE_BYTE_STRING, tag, lenOfLenOrVal=len(valLen) |
| ) |
| self._encoding.extend(controlAndTag) |
| self._encoding.extend(valLen) |
| self._encoding.extend(val) |
| |
| def putBool(self, tag, val): |
| """Write a value as a TLV boolean with the specified TLV tag.""" |
| if val: |
| type = TLVBoolean_True |
| else: |
| type = TLVBoolean_False |
| controlAndTag = self._encodeControlAndTag(type, tag) |
| self._encoding.extend(controlAndTag) |
| |
| def putNull(self, tag): |
| """Write a TLV null with the specified TLV tag.""" |
| controlAndTag = self._encodeControlAndTag(TLV_TYPE_NULL, tag) |
| self._encoding.extend(controlAndTag) |
| |
| def startContainer(self, tag, containerType): |
| """Start writing a TLV container with the specified TLV tag. |
| |
| containerType can be one of TLV_TYPE_STRUCTURE, TLV_TYPE_ARRAY or |
| TLV_TYPE_PATH. |
| """ |
| self._verifyValidContainerType(containerType) |
| controlAndTag = self._encodeControlAndTag(containerType, tag) |
| self._encoding.extend(controlAndTag) |
| self._containerStack.insert(0, containerType) |
| |
| def startStructure(self, tag): |
| """Start writing a TLV structure with the specified TLV tag.""" |
| self.startContainer(tag, containerType=TLV_TYPE_STRUCTURE) |
| |
| def startArray(self, tag): |
| """Start writing a TLV array with the specified TLV tag.""" |
| self.startContainer(tag, containerType=TLV_TYPE_ARRAY) |
| |
| def startPath(self, tag): |
| """Start writing a TLV path with the specified TLV tag.""" |
| self.startContainer(tag, containerType=TLV_TYPE_PATH) |
| |
| def endContainer(self): |
| """End writing the current TLV container.""" |
| self._containerStack.pop(0) |
| controlAndTag = self._encodeControlAndTag(TLVEndOfContainer, None) |
| self._encoding.extend(controlAndTag) |
| |
| def _encodeControlAndTag(self, type, tag, lenOfLenOrVal=0): |
| controlByte = type |
| if lenOfLenOrVal == 2: |
| controlByte |= 1 |
| elif lenOfLenOrVal == 4: |
| controlByte |= 2 |
| elif lenOfLenOrVal == 8: |
| controlByte |= 3 |
| if tag is None: |
| if ( |
| type != TLVEndOfContainer |
| and len(self._containerStack) != 0 |
| and self._containerStack[0] == TLV_TYPE_STRUCTURE |
| ): |
| raise ValueError("Attempt to encode anonymous tag within TLV structure") |
| controlByte |= TLV_TAG_CONTROL_ANONYMOUS |
| return struct.pack("<B", controlByte) |
| if isinstance(tag, int): |
| if tag < 0 or tag > UINT8_MAX: |
| raise ValueError("Context-specific TLV tag number out of range") |
| if len(self._containerStack) == 0: |
| raise ValueError( |
| "Attempt to encode context-specific TLV tag at top level" |
| ) |
| if self._containerStack[0] == TLV_TYPE_ARRAY: |
| raise ValueError( |
| "Attempt to encode context-specific tag within TLV array" |
| ) |
| controlByte |= TLV_TAG_CONTROL_CONTEXT_SPECIFIC |
| return struct.pack("<BB", controlByte, tag) |
| if isinstance(tag, tuple): |
| (profile, tagNum) = tag |
| if not isinstance(tagNum, int): |
| raise ValueError("Invalid object given for TLV tag") |
| if tagNum < 0 or tagNum > UINT32_MAX: |
| raise ValueError("TLV tag number out of range") |
| if profile != None: |
| if not isinstance(profile, int): |
| raise ValueError("Invalid object given for TLV profile id") |
| if profile < 0 or profile > UINT32_MAX: |
| raise ValueError("TLV profile id value out of range") |
| if ( |
| len(self._containerStack) != 0 |
| and self._containerStack[0] == TLV_TYPE_ARRAY |
| ): |
| raise ValueError( |
| "Attempt to encode profile-specific tag within TLV array" |
| ) |
| if profile is None or profile == self._implicitProfile: |
| if tagNum <= UINT16_MAX: |
| controlByte |= TLV_TAG_CONTROL_IMPLICIT_PROFILE_2Bytes |
| return struct.pack("<BH", controlByte, tagNum) |
| else: |
| controlByte |= TLV_TAG_CONTROL_IMPLICIT_PROFILE_4Bytes |
| return struct.pack("<BL", controlByte, tagNum) |
| elif profile == 0: |
| if tagNum <= UINT16_MAX: |
| controlByte |= TLV_TAG_CONTROL_COMMON_PROFILE_2Bytes |
| return struct.pack("<BH", controlByte, tagNum) |
| else: |
| controlByte |= TLV_TAG_CONTROL_COMMON_PROFILE_4Bytes |
| return struct.pack("<BL", controlByte, tagNum) |
| else: |
| if tagNum <= UINT16_MAX: |
| controlByte |= TLV_TAG_CONTROL_FULLY_QUALIFIED_6Bytes |
| return struct.pack("<BLH", controlByte, profile, tagNum) |
| else: |
| controlByte |= TLV_TAG_CONTROL_FULLY_QUALIFIED_8Bytes |
| return struct.pack("<BLL", controlByte, profile, tagNum) |
| raise ValueError("Invalid object given for TLV tag") |
| |
| @staticmethod |
| def _encodeUnsignedInt(val): |
| if val < 0: |
| raise ValueError("Integer value out of range") |
| if val <= UINT8_MAX: |
| format = "<B" |
| elif val <= UINT16_MAX: |
| format = "<H" |
| elif val <= UINT32_MAX: |
| format = "<L" |
| elif val <= UINT64_MAX: |
| format = "<Q" |
| else: |
| raise ValueError("Integer value out of range") |
| return struct.pack(format, val) |
| |
| @staticmethod |
| def _verifyValidContainerType(containerType): |
| if ( |
| containerType != TLV_TYPE_STRUCTURE |
| and containerType != TLV_TYPE_ARRAY |
| and containerType != TLV_TYPE_PATH |
| ): |
| raise ValueError("Invalid TLV container type") |
| |
| |
| class TLVReader(object): |
| def __init__(self, tlv): |
| self._tlv = tlv |
| self._bytesRead = 0 |
| self._decodings = [] |
| |
| @property |
| def decoding(self): |
| return self._decodings |
| |
| def get(self): |
| """Get the dictionary representation of tlv data""" |
| out = {} |
| self._get(self._tlv, self._decodings, out) |
| return out |
| |
| def _decodeControlByte(self, tlv, decoding): |
| (controlByte,) = struct.unpack("<B", tlv[self._bytesRead : self._bytesRead + 1]) |
| controlTypeIndex = controlByte & 0xE0 |
| decoding["tagControl"] = TagControls[controlTypeIndex] |
| elementtypeIndex = controlByte & 0x1F |
| decoding["type"] = ElementTypes[elementtypeIndex] |
| self._bytesRead += 1 |
| |
| def _decodeControlAndTag(self, tlv, decoding): |
| """The control byte specifies the type of a TLV element and how its tag, length and value fields are encoded. |
| The control byte consists of two subfields: an element type field which occupies the lower 5 bits, |
| and a tag control field which occupies the upper 3 bits. The element type field encodes the element’s type |
| as well as how the corresponding length and value fields are encoded. In the case of Booleans and the |
| null value, the element type field also encodes the value itself.""" |
| |
| self._decodeControlByte(tlv, decoding) |
| |
| if decoding["tagControl"] == "Anonymous": |
| decoding["tag"] = None |
| decoding["tagLen"] = 0 |
| elif decoding["tagControl"] == "Context 1-byte": |
| (decoding["tag"],) = struct.unpack( |
| "<B", tlv[self._bytesRead : self._bytesRead + 1] |
| ) |
| decoding["tagLen"] = 1 |
| self._bytesRead += 1 |
| elif decoding["tagControl"] == "Common Profile 2-byte": |
| profile = 0 |
| (tag,) = struct.unpack("<H", tlv[self._bytesRead : self._bytesRead + 2]) |
| decoding["profileTag"] = (profile, tag) |
| decoding["tagLen"] = 2 |
| self._bytesRead += 2 |
| elif decoding["tagControl"] == "Common Profile 4-byte": |
| profile = 0 |
| (tag,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4]) |
| decoding["profileTag"] = (profile, tag) |
| decoding["tagLen"] = 4 |
| self._bytesRead += 4 |
| elif decoding["tagControl"] == "Implicit Profile 2-byte": |
| profile = None |
| (tag,) = struct.unpack("<H", tlv[self._bytesRead : self._bytesRead + 2]) |
| decoding["profileTag"] = (profile, tag) |
| decoding["tagLen"] = 2 |
| self._bytesRead += 2 |
| elif decoding["tagControl"] == "Implicit Profile 4-byte": |
| profile = None |
| (tag,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4]) |
| decoding["profileTag"] = (profile, tag) |
| decoding["tagLen"] = 4 |
| self._bytesRead += 4 |
| elif decoding["tagControl"] == "Fully Qualified 6-byte": |
| (profile,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4]) |
| (tag,) = struct.unpack("<H", tlv[self._bytesRead + 4 : self._bytesRead + 6]) |
| decoding["profileTag"] = (profile, tag) |
| decoding["tagLen"] = 2 |
| self._bytesRead += 6 |
| elif decoding["tagControl"] == "Fully Qualified 8-byte": |
| (profile,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4]) |
| (tag,) = struct.unpack("<L", tlv[self._bytesRead + 4 : self._bytesRead + 8]) |
| decoding["profileTag"] = (profile, tag) |
| decoding["tagLen"] = 4 |
| self._bytesRead += 8 |
| |
| def _decodeStrLength(self, tlv, decoding): |
| """UTF-8 or Byte StringLength fields are encoded in 0, 1, 2 or 4 byte widths, as specified by |
| the element type field. If the element type needs a length field grab the next bytes as length""" |
| if "length" in decoding["type"]: |
| if "1-byte" in decoding["type"]: |
| (decoding["strDataLen"],) = struct.unpack( |
| "<B", tlv[self._bytesRead : self._bytesRead + 1] |
| ) |
| decoding["strDataLenLen"] = 1 |
| self._bytesRead += 1 |
| elif "2-byte" in decoding["type"]: |
| (decoding["strDataLen"],) = struct.unpack( |
| "<H", tlv[self._bytesRead : self._bytesRead + 2] |
| ) |
| decoding["strDataLenLen"] = 2 |
| self._bytesRead += 2 |
| elif "4-byte" in decoding["type"]: |
| (decoding["strDataLen"],) = struct.unpack( |
| "<L", tlv[self._bytesRead : self._bytesRead + 4] |
| ) |
| decoding["strDataLenLen"] = 4 |
| self._bytesRead += 4 |
| elif "8-byte" in decoding["type"]: |
| (decoding["strDataLen"],) = struct.unpack( |
| "<Q", tlv[self._bytesRead : self._bytesRead + 8] |
| ) |
| decoding["strDataLenLen"] = 8 |
| self._bytesRead += 8 |
| else: |
| decoding["strDataLen"] = 0 |
| decoding["strDataLenLen"] = 0 |
| |
| def _decodeVal(self, tlv, decoding): |
| """decode primitive tlv value to the corresponding python value, tlv array and path are decoded as |
| python list, tlv structure is decoded as python dictionary""" |
| if decoding["type"] == "Structure": |
| decoding["value"] = {} |
| decoding["Structure"] = [] |
| self._get(tlv, decoding["Structure"], decoding["value"]) |
| elif decoding["type"] == "Array": |
| decoding["value"] = [] |
| decoding["Array"] = [] |
| self._get(tlv, decoding["Array"], decoding["value"]) |
| elif decoding["type"] == "Path": |
| decoding["value"] = [] |
| decoding["Path"] = [] |
| self._get(tlv, decoding["Path"], decoding["value"]) |
| elif decoding["type"] == "Null": |
| decoding["value"] = None |
| elif decoding["type"] == "End of Collection": |
| decoding["value"] = None |
| elif decoding["type"] == "Boolean True": |
| decoding["value"] = True |
| elif decoding["type"] == "Boolean False": |
| decoding["value"] = False |
| elif decoding["type"] == "Unsigned Integer 1-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<B", tlv[self._bytesRead : self._bytesRead + 1] |
| ) |
| self._bytesRead += 1 |
| elif decoding["type"] == "Signed Integer 1-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<b", tlv[self._bytesRead : self._bytesRead + 1] |
| ) |
| self._bytesRead += 1 |
| elif decoding["type"] == "Unsigned Integer 2-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<H", tlv[self._bytesRead : self._bytesRead + 2] |
| ) |
| self._bytesRead += 2 |
| elif decoding["type"] == "Signed Integer 2-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<h", tlv[self._bytesRead : self._bytesRead + 2] |
| ) |
| self._bytesRead += 2 |
| elif decoding["type"] == "Unsigned Integer 4-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<L", tlv[self._bytesRead : self._bytesRead + 4] |
| ) |
| self._bytesRead += 4 |
| elif decoding["type"] == "Signed Integer 4-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<l", tlv[self._bytesRead : self._bytesRead + 4] |
| ) |
| self._bytesRead += 4 |
| elif decoding["type"] == "Unsigned Integer 8-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<Q", tlv[self._bytesRead : self._bytesRead + 8] |
| ) |
| self._bytesRead += 8 |
| elif decoding["type"] == "Signed Integer 8-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<q", tlv[self._bytesRead : self._bytesRead + 8] |
| ) |
| self._bytesRead += 8 |
| elif decoding["type"] == "Floating Point 4-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<f", tlv[self._bytesRead : self._bytesRead + 4] |
| ) |
| self._bytesRead += 4 |
| elif decoding["type"] == "Floating Point 8-byte value": |
| (decoding["value"],) = struct.unpack( |
| "<d", tlv[self._bytesRead : self._bytesRead + 8] |
| ) |
| self._bytesRead += 8 |
| elif "UTF-8 String" in decoding["type"]: |
| (val,) = struct.unpack( |
| "<%ds" % decoding["strDataLen"], |
| tlv[self._bytesRead : self._bytesRead + decoding["strDataLen"]], |
| ) |
| try: |
| decoding["value"] = str(val, "utf-8") |
| except Exception as ex: |
| decoding["value"] = val |
| self._bytesRead += decoding["strDataLen"] |
| elif "Byte String" in decoding["type"]: |
| (val,) = struct.unpack( |
| "<%ds" % decoding["strDataLen"], |
| tlv[self._bytesRead : self._bytesRead + decoding["strDataLen"]], |
| ) |
| |
| decoding["value"] = val |
| self._bytesRead += decoding["strDataLen"] |
| else: |
| raise ValueError("Attempt to decode unsupported TLV type") |
| |
| def _get(self, tlv, decodings, out): |
| endOfEncoding = False |
| |
| while len(tlv[self._bytesRead :]) > 0 and endOfEncoding == False: |
| decoding = {} |
| self._decodeControlAndTag(tlv, decoding) |
| self._decodeStrLength(tlv, decoding) |
| self._decodeVal(tlv, decoding) |
| decodings.append(decoding) |
| |
| if decoding["type"] == "End of Collection": |
| endOfEncoding = True |
| else: |
| if "profileTag" in list(decoding.keys()): |
| out[decoding["profileTag"]] = decoding["value"] |
| elif "tag" in list(decoding.keys()): |
| if decoding["tag"] is not None: |
| out[decoding["tag"]] = decoding["value"] |
| else: |
| if isinstance(out, Mapping): |
| out["Any"] = decoding["value"] |
| elif isinstance(out, Sequence): |
| out.append(decoding["value"]) |
| else: |
| raise ValueError("Attempt to decode unsupported TLV tag") |
| |
| |
| def tlvTagToSortKey(tag): |
| if tag is None: |
| return -1 |
| if isinstance(tag, int): |
| majorOrder = 0 |
| elif isinstance(tag, tuple): |
| (profileId, tag) = tag |
| if profileId is None: |
| majorOrder = 1 |
| else: |
| majorOrder = profileId + 2 |
| else: |
| raise ValueError("Invalid TLV tag") |
| return (majorOrder << 32) + tag |
| |
| |
| if __name__ == "__main__": |
| val = dict( |
| [ |
| (1, 0), |
| (2, 65536), |
| (3, True), |
| (4, None), |
| (5, "Hello!"), |
| (6, bytearray([0xDE, 0xAD, 0xBE, 0xEF])), |
| (7, ["Goodbye!", 71024724507, False]), |
| ((0x235A0000, 42), "FOO"), |
| ((None, 42), "BAR"), |
| ] |
| ) |
| |
| writer = TLVWriter() |
| encodedVal = writer.put(None, val) |
| |
| reader = TLVReader(writer.encoding) |
| out = reader.get() |
| |
| print("TLVReader input: " + str(val)) |
| print("TLVReader output: " + str(out["Any"])) |
| |
| if val == out["Any"]: |
| print("Test Success") |
| else: |
| print("Test Failure") |