blob: b186fb1bef6282ea19e643d9ec40c1758c51fe36 [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import string
from abc import ABC, abstractmethod
class ConstraintParseError(Exception):
def __init__(self, message):
super().__init__(message)
class BaseConstraint(ABC):
'''Constraint Interface'''
def __init__(self, types: list, is_null_allowed: bool = False):
'''An empty type list provided that indicates any type is accepted'''
self._types = types
self._is_null_allowed = is_null_allowed
def is_met(self, value, value_type_name):
if value is None:
return self._is_null_allowed
response_type = type(value)
if self._types:
found_type_match = any(
[issubclass(response_type, expected) for expected in self._types])
if not found_type_match:
return False
return self.check_response(value, value_type_name)
@abstractmethod
def check_response(self, value, value_type_name) -> bool:
pass
class _ConstraintHasValue(BaseConstraint):
def __init__(self, has_value):
super().__init__(types=[])
self._has_value = has_value
def is_met(self, value, value_type_name):
# We are overriding the BaseConstraint of is_met since has value is a special case where
# we might not be expecting a value at all, but the basic null check in BaseConstraint
# is not what we want.
return self.check_response(value, value_type_name)
def check_response(self, value, value_type_name) -> bool:
has_value = value is not None
return self._has_value == has_value
class _ConstraintType(BaseConstraint):
def __init__(self, type):
super().__init__(types=[], is_null_allowed=True)
self._type = type
def check_response(self, value, value_type_name) -> bool:
success = False
if self._type == 'boolean' and type(value) is bool:
success = True
elif self._type == 'list' and type(value) is list:
success = True
elif self._type == 'char_string' and type(value) is str:
success = True
elif self._type == 'long_char_string' and type(value) is str:
success = True
elif self._type == 'octet_string' and type(value) is bytes:
success = True
elif self._type == 'long_octet_string' and type(value) is bytes:
success = True
elif self._type == 'vendor_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'device_type_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'cluster_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'attribute_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'field_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'command_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'event_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'action_id' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'transaction_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'node_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'bitmap8' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'bitmap16' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'bitmap32' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'bitmap64' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'enum8' and isinstance(value, int):
success = value >= 0 and value <= 0xFF
elif self._type == 'enum16' and isinstance(value, int):
success = value >= 0 and value <= 0xFFFF
elif self._type == 'Percent' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'Percent100ths' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'epoch_us' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'epoch_s' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'utc' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'date' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'tod' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'int8u' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'int16u' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'int24u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFF
elif self._type == 'int32u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'int40u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFF
elif self._type == 'int48u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFF
elif self._type == 'int56u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFF
elif self._type == 'int64u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'nullable_int8u' and type(value) is int:
success = value >= 0 and value <= 0xFE
elif self._type == 'nullable_int16u' and type(value) is int:
success = value >= 0 and value <= 0xFFFE
elif self._type == 'nullable_int24u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFE
elif self._type == 'nullable_int32u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFE
elif self._type == 'nullable_int40u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFE
elif self._type == 'nullable_int48u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFE
elif self._type == 'nullable_int56u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFE
elif self._type == 'nullable_int64u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFE
elif self._type == 'int8s' and type(value) is int:
success = value >= -128 and value <= 127
elif self._type == 'int16s' and type(value) is int:
success = value >= -32768 and value <= 32767
elif self._type == 'int24s' and type(value) is int:
success = value >= -8388608 and value <= 8388607
elif self._type == 'int32s' and type(value) is int:
success = value >= -2147483648 and value <= 2147483647
elif self._type == 'int40s' and type(value) is int:
success = value >= -549755813888 and value <= 549755813887
elif self._type == 'int48s' and type(value) is int:
success = value >= -140737488355328 and value <= 140737488355327
elif self._type == 'int56s' and type(value) is int:
success = value >= -36028797018963968 and value <= 36028797018963967
elif self._type == 'int64s' and type(value) is int:
success = value >= -9223372036854775808 and value <= 9223372036854775807
elif self._type == 'nullable_int8s' and type(value) is int:
success = value >= -127 and value <= 127
elif self._type == 'nullable_int16s' and type(value) is int:
success = value >= -32767 and value <= 32767
elif self._type == 'nullable_int24s' and type(value) is int:
success = value >= -8388607 and value <= 8388607
elif self._type == 'nullable_int32s' and type(value) is int:
success = value >= -2147483647 and value <= 2147483647
elif self._type == 'nullable_int40s' and type(value) is int:
success = value >= -549755813887 and value <= 549755813887
elif self._type == 'nullable_int48s' and type(value) is int:
success = value >= -140737488355327 and value <= 140737488355327
elif self._type == 'nullable_int56s' and type(value) is int:
success = value >= -36028797018963967 and value <= 36028797018963967
elif self._type == 'nullable_int64s' and type(value) is int:
success = value >= -9223372036854775807 and value <= 9223372036854775807
else:
success = self._type == value_type_name
return success
class _ConstraintMinLength(BaseConstraint):
def __init__(self, min_length):
super().__init__(types=[str, bytes, list])
self._min_length = min_length
def check_response(self, value, value_type_name) -> bool:
return len(value) >= self._min_length
class _ConstraintMaxLength(BaseConstraint):
def __init__(self, max_length):
super().__init__(types=[str, bytes, list])
self._max_length = max_length
def check_response(self, value, value_type_name) -> bool:
return len(value) <= self._max_length
class _ConstraintIsHexString(BaseConstraint):
def __init__(self, is_hex_string: bool):
super().__init__(types=[str])
self._is_hex_string = is_hex_string
def check_response(self, value, value_type_name) -> bool:
return all(c in string.hexdigits for c in value) == self._is_hex_string
class _ConstraintStartsWith(BaseConstraint):
def __init__(self, starts_with):
super().__init__(types=[str])
self._starts_with = starts_with
def check_response(self, value, value_type_name) -> bool:
return value.startswith(self._starts_with)
class _ConstraintEndsWith(BaseConstraint):
def __init__(self, ends_with):
super().__init__(types=[str])
self._ends_with = ends_with
def check_response(self, value, value_type_name) -> bool:
return value.endswith(self._ends_with)
class _ConstraintIsUpperCase(BaseConstraint):
def __init__(self, is_upper_case):
super().__init__(types=[str])
self._is_upper_case = is_upper_case
def check_response(self, value, value_type_name) -> bool:
return value.isupper() == self._is_upper_case
class _ConstraintIsLowerCase(BaseConstraint):
def __init__(self, is_lower_case):
super().__init__(types=[str])
self._is_lower_case = is_lower_case
def check_response(self, value, value_type_name) -> bool:
return value.islower() == self._is_lower_case
class _ConstraintMinValue(BaseConstraint):
def __init__(self, min_value):
super().__init__(types=[int, float], is_null_allowed=True)
self._min_value = min_value
def check_response(self, value, value_type_name) -> bool:
return value >= self._min_value
class _ConstraintMaxValue(BaseConstraint):
def __init__(self, max_value):
super().__init__(types=[int, float], is_null_allowed=True)
self._max_value = max_value
def check_response(self, value, value_type_name) -> bool:
return value <= self._max_value
class _ConstraintContains(BaseConstraint):
def __init__(self, contains):
super().__init__(types=[list])
self._contains = contains
def check_response(self, value, value_type_name) -> bool:
return set(self._contains).issubset(value)
class _ConstraintExcludes(BaseConstraint):
def __init__(self, excludes):
super().__init__(types=[list])
self._excludes = excludes
def check_response(self, value, value_type_name) -> bool:
return set(self._excludes).isdisjoint(value)
class _ConstraintHasMaskSet(BaseConstraint):
def __init__(self, has_masks_set):
super().__init__(types=[int])
self._has_masks_set = has_masks_set
def check_response(self, value, value_type_name) -> bool:
return all([(value & mask) == mask for mask in self._has_masks_set])
class _ConstraintHasMaskClear(BaseConstraint):
def __init__(self, has_masks_clear):
super().__init__(types=[int])
self._has_masks_clear = has_masks_clear
def check_response(self, value, value_type_name) -> bool:
return all([(value & mask) == 0 for mask in self._has_masks_clear])
class _ConstraintNotValue(BaseConstraint):
def __init__(self, not_value):
super().__init__(types=[], is_null_allowed=True)
self._not_value = not_value
def check_response(self, value, value_type_name) -> bool:
return value != self._not_value
def get_constraints(constraints: dict) -> list[BaseConstraint]:
_constraints = []
for constraint, constraint_value in constraints.items():
if 'hasValue' == constraint:
_constraints.append(_ConstraintHasValue(constraint_value))
elif 'type' == constraint:
_constraints.append(_ConstraintType(constraint_value))
elif 'minLength' == constraint:
_constraints.append(_ConstraintMinLength(constraint_value))
elif 'maxLength' == constraint:
_constraints.append(_ConstraintMaxLength(constraint_value))
elif 'isHexString' == constraint:
_constraints.append(_ConstraintIsHexString(constraint_value))
elif 'startsWith' == constraint:
_constraints.append(_ConstraintStartsWith(constraint_value))
elif 'endsWith' == constraint:
_constraints.append(_ConstraintEndsWith(constraint_value))
elif 'isUpperCase' == constraint:
_constraints.append(_ConstraintIsUpperCase(constraint_value))
elif 'isLowerCase' == constraint:
_constraints.append(_ConstraintIsLowerCase(constraint_value))
elif 'minValue' == constraint:
_constraints.append(_ConstraintMinValue(constraint_value))
elif 'maxValue' == constraint:
_constraints.append(_ConstraintMaxValue(constraint_value))
elif 'contains' == constraint:
_constraints.append(_ConstraintContains(constraint_value))
elif 'excludes' == constraint:
_constraints.append(_ConstraintExcludes(constraint_value))
elif 'hasMasksSet' == constraint:
_constraints.append(_ConstraintHasMaskSet(constraint_value))
elif 'hasMasksClear' == constraint:
_constraints.append(_ConstraintHasMaskClear(constraint_value))
elif 'notValue' == constraint:
_constraints.append(_ConstraintNotValue(constraint_value))
else:
raise ConstraintParseError(f'Unknown constraint type:{constraint}')
return _constraints
def is_typed_constraint(constraint: str):
constraints = {
'hasValue': False,
'type': False,
'minLength': False,
'maxLength': False,
'isHexString': False,
'startsWith': True,
'endsWith': True,
'isUpperCase': False,
'isLowerCase': False,
'minValue': True,
'maxValue': True,
'contains': True,
'excludes': True,
'hasMasksSet': False,
'hasMasksClear': False,
'notValue': True,
}
is_typed = constraints.get(constraint)
if is_typed is None:
raise ConstraintParseError(f'Unknown constraint type:{constraint}')
return is_typed