blob: 98be3859cabf5c09775e45610a69d40ed8b57e96 [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import random
from typing import Any, List, Optional
from mobly import asserts
import matter.clusters as Clusters
from matter.clusters import ClusterObjects, Globals
from matter.clusters.Types import NullValue
from matter.testing import matter_asserts
from matter.testing.matter_testing import AttributeMatcher, AttributeValue, MatterBaseTest
logger = logging.getLogger(__name__)
cluster = Clusters.CommodityTariff
class CommodityTariffTestBaseHelper(MatterBaseTest):
"""This class contains supporting methods for the CommodityTariff test cases."""
# Test event trigger IDs
EventTriggerFakeData = 0x0700000000000000
EventTriggerClear = 0x0700000000000001
EventTriggerChangeDay = 0x0700000000000002
EventTriggerChangeTime = 0x0700000000000003
# Variables to store values between test steps
# Attributes
startDateAttributeValue: int = None
defaultRandomizationType: cluster.Attributes.DefaultRandomizationType = None
defaultRandomizationOffset: int = None
tariffInfoValue: cluster.Structs.TariffInformationStruct = None
tariffComponentsValue: List[cluster.Structs.TariffComponentStruct] = None
currentTariffComponentsValue: List[cluster.Structs.TariffComponentStruct] = None
nextTariffComponentsValue: List[cluster.Structs.TariffComponentStruct] = None
tariffPeriodsValue: List[cluster.Structs.TariffPeriodStruct] = None
dayEntriesValue: List[cluster.Structs.DayEntryStruct] = None
dayPatternsValue: List[cluster.Structs.DayPatternStruct] = None
calendarPeriodsValue: List[cluster.Structs.CalendarPeriodStruct] = None
currentDayValue: cluster.Structs.DayStruct = None
nextDayValue: cluster.Structs.DayStruct = None
individualDaysValue: cluster.Structs.DayStruct = None
currentDayEntryDateValue: int = None
nextDayEntryDateValue: int = None
# Fields
blockModeValue: cluster.Enums.BlockModeEnum = None
# Other
dayEntryIDsEvents: List[int] = []
async def check_list_elements_uniqueness(self, list_to_check: list[Any], object_name: str = "Elements") -> None:
"""
Checks that all elements in the list are unique.
"""
asserts.assert_equal(len(list_to_check), len(set(list_to_check)), f"{object_name} in the list must be unique")
async def check_randomization_offset(self, randomization_type: cluster.Enums.DayEntryRandomizationTypeEnum, randomization_offset: int) -> None:
"""Checks RandomizationOffset field of DayEntryStruct or defaultRandomizationOffset attribute depending on the value of RandomizationType.
Args:
randomization_type (DayEntryRandomizationTypeEnum): Value of RandomizationType field of DayEntryStruct or defaultRandomizationType attribute.
randomization_offset (int): Value of RandomizationOffset field of DayEntryStruct or defaultRandomizationOffset attribute.
"""
if randomization_type == cluster.Enums.DayEntryRandomizationTypeEnum.kFixed:
matter_asserts.assert_valid_int16(randomization_offset, 'RandomizationOffset must be int16 type value.')
elif randomization_type == cluster.Enums.DayEntryRandomizationTypeEnum.kRandomNegative:
matter_asserts.assert_valid_int16(randomization_offset, 'RandomizationOffset must be int16 type value.')
asserts.assert_less_equal(randomization_offset, 0, "RandomizationOffset must be less than 0.")
else:
matter_asserts.assert_valid_int16(randomization_offset, 'RandomizationOffset must be int16 type value.')
asserts.assert_greater_equal(randomization_offset, 0, "RandomizationOffset must be greater than 0.")
async def checkAuxiliaryLoadSwitchSettingsStruct(self,
cluster: Clusters.CommodityTariff = None,
struct: Clusters.CommodityTariff.Structs.AuxiliaryLoadSwitchSettingsStruct = None) -> None:
"""
Checks the correctness of the AuxiliaryLoadSwitchSettingsStruct data type entries.
Args:
cluster (Clusters.CommodityTariff): CommodityTariff cluster;
struct (AuxiliaryLoadSwitchSettingsStruct): AuxiliaryLoadSwitchSettingsStruct data type entity.
"""
# checks Number field, must be uint8 type value
matter_asserts.assert_valid_uint8(struct.number, 'Number must be uint8 type value.')
# checks RequiredState field, must be AuxiliaryLoadSettingEnum type value and in range 0 - 2
matter_asserts.assert_valid_enum(
struct.requiredState, "RequiredState attribute must return a AuxiliaryLoadSettingEnum", cluster.Enums.AuxiliaryLoadSettingEnum)
asserts.assert_greater_equal(struct.requiredState, 0)
asserts.assert_less_equal(struct.requiredState, 2)
async def checkCalendarPeriodStruct(self,
struct: Clusters.CommodityTariff.Structs.CalendarPeriodStruct = None,
start_date_attribute: int = None) -> None:
"""
Checks the correctness of the CalendarPeriodStruct data type entries.
Args:
struct (CalendarPeriodStruct): CalendarPeriodStruct data type entity;
start_date_attribute (int): StartDate attribute value.
"""
# checks StartDate field
if struct.startDate is not NullValue:
matter_asserts.assert_valid_uint32(struct.startDate, 'StartDate')
asserts.assert_greater_equal(struct.startDate, start_date_attribute)
# checks DayPatternIDs field
matter_asserts.assert_list(
struct.dayPatternIDs, "DayPatternIDs attribute must return a list with length in range 1 - 7", min_length=1, max_length=7)
for dayPatternID in struct.dayPatternIDs:
matter_asserts.assert_valid_uint32(dayPatternID, 'DayPatternID must has uint32 type.')
async def checkCurrencyStruct(self,
struct: Globals.Structs.CurrencyStruct = None) -> None:
"""
Checks the correctness of the CurrencyStruct data type entries.
Args:
struct (CurrencyStruct): CurrencyStruct data type entity.
"""
# checks Currency field must be uint16 type value and in range 0 - 999
matter_asserts.assert_valid_uint16(struct.currency, 'Currency')
asserts.assert_less_equal(struct.currency, 999)
# checks DecimalPoints field must be uint8 type value
matter_asserts.assert_valid_uint8(struct.decimalPoints, 'DecimalPoints')
async def checkDayEntryStruct(self,
endpoint: int,
cluster: Clusters.CommodityTariff = None,
struct: Clusters.CommodityTariff.Structs.DayEntryStruct = None) -> None:
"""
Checks the correctness of the DayEntryStruct data type entries.
Args:
endpoint (int): Endpoint ID;
cluster (Clusters.CommodityTariff): CommodityTariff cluster;
struct (DayEntryStruct): DayEntryStruct data type entity.
"""
# checks DayEntryID field must be uint32 type value
matter_asserts.assert_valid_uint32(struct.dayEntryID, 'DayEntryID must has uint32 type.')
# check StartTime field must be uint16 type value and in range 0 - 1499
matter_asserts.assert_valid_uint16(struct.startTime, 'StartTime must has uint16 type.')
asserts.assert_less_equal(struct.startTime, 1499)
# checking Duration field only for days with DayType: Event
# Duration must be uint16 type value and in range 1500 - StartTime
if struct.dayEntryID in self.dayEntryIDsEvents:
if struct.duration is not None:
matter_asserts.assert_valid_uint16(struct.duration, 'Duration must has uint16 type.')
asserts.assert_less_equal(struct.duration, 1500 - struct.startTime)
# if SETRF.S.F05(RNDM) feature is enabled
# checks RandomizationOffset and RandomizationType fields
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kRandomization):
if struct.randomizationType is not None:
# checks RandomizationType field must be DayEntryRandomizationTypeEnum type value and in range 0 - 4
matter_asserts.assert_valid_enum(
struct.randomizationType, "RandomizationType attribute must return a DayEntryRandomizationTypeEnum", cluster.Enums.DayEntryRandomizationTypeEnum)
asserts.assert_greater_equal(struct.randomizationType, 0)
asserts.assert_less_equal(struct.randomizationType, 4)
# checks RandomizationOffset field depending on the value of RandomizationType
if struct.randomizationOffset is not None:
await self.check_randomization_offset(struct.randomizationType, struct.randomizationOffset)
else: # if SETRF.S.F05(RNDM) feature is disabled
asserts.assert_is_none(struct.randomizationOffset, "RandomizationOffset must be None")
asserts.assert_is_none(struct.randomizationType, "RandomizationType must be None")
async def checkDayPatternStruct(self,
struct: Clusters.CommodityTariff.Structs.DayPatternStruct = None) -> None:
"""
Checks the correctness of the DayPatternStruct data type entries.
Args:
struct (DayPatternStruct): DayPatternStruct data type entity.
"""
# checks DayPatternID field must be uint32 type value
matter_asserts.assert_valid_uint32(struct.dayPatternID, 'DayPatternID')
# checks DaysOfWeek field must be uint8 type value
matter_asserts.is_valid_int_value(struct.daysOfWeek)
# Check bitmap value less than or equal to (Sunday | Monday | Tuesday | Wednesday | Thursday | Friday | Saturday)
asserts.assert_less_equal(struct.daysOfWeek, 127)
# checks DayEntryIDs field must be list with length in range 1 - 96
matter_asserts.assert_list(struct.dayEntryIDs, "DayEntryIDs attribute must return a list", min_length=1, max_length=96)
await self.check_list_elements_uniqueness(struct.dayEntryIDs, "DayEntryIDs")
for dayEntryID in struct.dayEntryIDs:
matter_asserts.assert_valid_uint32(dayEntryID, 'DayEntryID must has uint32 type.')
async def checkDayStruct(self,
cluster: Clusters.CommodityTariff = None,
struct: Clusters.CommodityTariff.Structs.DayStruct = None) -> None:
"""
Checks the correctness of the DayStruct data type entries.
Args:
cluster (Clusters.CommodityTariff): CommodityTariff cluster;
struct (DayStruct): DayStruct data type entity.
"""
# checks Date field must be uint32 type value
matter_asserts.assert_valid_uint32(struct.date, 'Date field must has epoch-s type.')
# checks DayType field must be DayTypeEnum type value and in range 0 - 3
matter_asserts.assert_valid_enum(
struct.dayType, "DayType field must has a DayTypeEnum", cluster.Enums.DayTypeEnum)
asserts.assert_greater_equal(struct.dayType, 0, "DayType must be greater than 0.")
asserts.assert_less_equal(struct.dayType, 3, "DayType must be less than 3.")
# checks DayEntryIDs field must be list with length in range 1 - 96
matter_asserts.assert_list(
struct.dayEntryIDs, "DayEntryIDs attribute must return a list with length in range 1 - 96", min_length=1, max_length=96)
for dayEntryID in struct.dayEntryIDs:
matter_asserts.assert_valid_uint32(dayEntryID, 'DayEntryID must has uint32 type.')
await self.check_list_elements_uniqueness(struct.dayEntryIDs, "DayEntryIDs")
# Collect all DayEntryIDs for DayType: Event
if struct.dayType == 3:
self.dayEntryIDsEvents += struct.dayEntryIDs
async def checkPeakPeriodStruct(self,
cluster: Clusters.CommodityTariff = None,
struct: Clusters.CommodityTariff.Structs.PeakPeriodStruct = None) -> None:
"""
Checks the correctness of the PeakPeriodStruct data type entries.
Args:
cluster (Clusters.CommodityTariff): CommodityTariff cluster;
struct (PeakPeriodStruct): PeakPeriodStruct data type entity.
"""
# checks Severity field must be PeakPeriodSeverityEnum type value and in range 0 - 3
matter_asserts.assert_valid_enum(
struct.severity, "Severity attribute must return a PeakPeriodSeverityEnum", cluster.Enums.PeakPeriodSeverityEnum)
asserts.assert_greater_equal(struct.severity, 0, "Severity must be greater than 0.")
asserts.assert_less_equal(struct.severity, 3, "Severity must be less than 3.")
# checks PeakPeriod field must be uint16 type value
matter_asserts.assert_valid_uint16(struct.peakPeriod, 'PeakPeriod must has uint16 type.')
asserts.assert_greater_equal(struct.peakPeriod, 1, "PeakPeriod must be greater or equal than 1.")
async def checkPowerThresholdStruct(self,
struct: Globals.Structs.PowerThresholdStruct = None) -> None:
"""
Checks the correctness of the PowerThresholdStruct data type entries.
Args:
struct (PowerThresholdStruct): PowerThresholdStruct data type entity.
"""
# checks PowerThreshold field must be int64 type
if struct.powerThreshold is not None:
matter_asserts.assert_valid_int64(struct.powerThreshold, 'PowerThreshold must has power-mW type.')
# checks ApparentPowerThreshold field must be int64 type
if struct.apparentPowerThreshold is not None:
matter_asserts.assert_valid_int64(struct.apparentPowerThreshold, 'ApparentPowerThreshold must has power-mVA type.')
# checks PowerThresholdSource field must be PowerThresholdSourceEnum type value and in range 0 - 2
if struct.powerThresholdSource is not NullValue:
matter_asserts.assert_valid_enum(
struct.powerThresholdSource, "PowerThresholdSource attribute must return a PowerThresholdSourceEnum", Globals.Enums.PowerThresholdSourceEnum)
asserts.assert_greater_equal(struct.powerThresholdSource, 0, "PowerThresholdSource must be greater than 0.")
asserts.assert_less_equal(struct.powerThresholdSource, 2, "PowerThresholdSource must be less than 2.")
async def checkTariffComponentStruct(self,
endpoint: int = None,
cluster: Clusters.CommodityTariff = None,
struct: Clusters.CommodityTariff.Structs.TariffComponentStruct = None) -> None:
"""
Checks the correctness of the TariffComponentStruct data type entries.
Args:
endpoint (int): Endpoint number;
cluster (Clusters.CommodityTariff): CommodityTariff cluster;
struct (TariffComponentStruct): TariffComponentStruct data type entity.
"""
# checks TariffComponentID field must be uint32 type
matter_asserts.assert_valid_uint32(struct.tariffComponentID, 'TariffComponentID must has uint32 type.')
# checks that at least one field from price, friendlyCredit, auxiliaryLoad, peakPeriod, powerThreshold is set
asserts.assert_true(any((struct.price, struct.friendlyCredit, struct.auxiliaryLoad, struct.peakPeriod, struct.powerThreshold)),
'At least one field from price, friendlyCredit, auxiliaryLoad, peakPeriod, powerThreshold must be set')
# if SETRF.S.F00(PRICE) feature is enabled
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kPricing):
# checks Price field must be of type TariffPriceStruct
if struct.price is not None and struct.price is not NullValue:
asserts.assert_true(isinstance(
struct.price, cluster.Structs.TariffPriceStruct), "Price field must be of type TariffPriceStruct")
await self.checkTariffPriceStruct(struct=struct.price)
else: # if SETRF.S.F00(PRICE) feature is disabled
asserts.assert_is_none(struct.price, "Price must be None")
# if SETRF.S.F01(FCRED) feature is enabled
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kFriendlyCredit):
if struct.friendlyCredit is not None:
# checks FriendlyCredit field must be bool
matter_asserts.assert_valid_bool(struct.friendlyCredit, 'FriendlyCredit')
else: # if SETRF.S.F01(FCRED) feature is disabled
asserts.assert_is_none(struct.friendlyCredit, "FriendlyCredit must be None")
# if SETRF.S.F02(AUXLD) feature is enabled
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kAuxiliaryLoad):
if struct.auxiliaryLoad is not None:
# checks AuxiliaryLoad field must be of type AuxiliaryLoadSwitchSettingsStruct
asserts.assert_true(isinstance(
struct.auxiliaryLoad, cluster.Structs.AuxiliaryLoadSwitchSettingsStruct), "AuxiliaryLoad must be of type AuxiliaryLoadSwitchSettingsStruct")
await self.checkAuxiliaryLoadSwitchSettingsStruct(cluster=cluster, struct=struct.auxiliaryLoad)
else: # if SETRF.S.F02(AUXLD) feature is disabled
asserts.assert_is_none(struct.auxiliaryLoad, "AuxiliaryLoad must be None")
# if SETRF.S.F03(PEAKP) feature is enabled
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kPeakPeriod):
if struct.peakPeriod is not None:
# checks PeakPeriod field must be of type PeakPeriodStruct
asserts.assert_true(isinstance(
struct.peakPeriod, cluster.Structs.PeakPeriodStruct), "PeakPeriod must be of type PeakPeriodStruct")
await self.checkPeakPeriodStruct(cluster=cluster, struct=struct.peakPeriod)
else: # if SETRF.S.F03(PEAKP) feature is disabled
asserts.assert_is_none(struct.peakPeriod, "PeakPeriod must be None")
# if SETRF.S.F04(PWRTHLD) feature is enabled
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kPowerThreshold):
if struct.powerThreshold is not None:
# checks PowerThreshold field must be of type PowerThresholdStruct
asserts.assert_true(isinstance(
struct.powerThreshold, Globals.Structs.PowerThresholdStruct), "PowerThreshold must be of type PowerThresholdStruct")
await self.checkPowerThresholdStruct(struct=struct.powerThreshold)
else: # if SETRF.S.F04(PWRTHLD) feature is disabled
asserts.assert_is_none(struct.powerThreshold, "PowerThreshold must be None")
# checks Threshold field must be int64 or null
# All Threshold fields on TariffComponentStruct in a tariff whose BlockMode field is set to NoBlock SHALL be null.
if self.blockModeValue == 0:
asserts.assert_equal(struct.threshold, NullValue, "Threshold field must be Null if BlockMode is 0.")
else:
if struct.threshold is not NullValue:
matter_asserts.assert_valid_int64(struct.threshold, 'Threshold field must has int64 type.')
# checks Label field must be string
if struct.label is not None and struct.label is not NullValue:
matter_asserts.assert_is_string(struct.label, "Label must be a string")
asserts.assert_less_equal(len(struct.label), 128, "Label must have length at most 128!")
# checks Predicted field must be bool
if struct.predicted is not None:
matter_asserts.assert_valid_bool(struct.predicted, 'Predicted must has bool type.')
async def checkTariffInformationStruct(self,
endpoint: int = None,
cluster: Clusters.CommodityTariff = None,
struct: Clusters.CommodityTariff.Structs.TariffInformationStruct = None) -> None:
"""
Checks the correctness of the TariffInformationStruct data type entries.
Args:
endpoint (int): Endpoint number;
cluster (Clusters.CommodityTariff): CommodityTariff cluster;
struct (TariffInformationStruct): TariffInformationStruct data type entity.
"""
# checks TariffLabel field must be string
if struct.tariffLabel is not NullValue:
matter_asserts.assert_is_string(struct.tariffLabel, "TariffLabel must be a string")
asserts.assert_less_equal(len(struct.tariffLabel), 128, "TariffLabel must have length at most 128!")
# checks ProviderName field must be string
if struct.providerName is not NullValue:
matter_asserts.assert_is_string(struct.providerName, "ProviderName must be a string")
asserts.assert_less_equal(len(struct.providerName), 128, "ProviderName must have length at most 128!")
# if SETRF.S.F00(PRICE) feature is enabled
if await self.feature_guard(cluster=cluster, endpoint=endpoint, feature_int=cluster.Bitmaps.Feature.kPricing):
# checks Currency field must be CurrencyStruct
asserts.assert_true(struct.currency is not None, "Currency must have real value or can be Null")
if struct.currency is not NullValue:
asserts.assert_true(isinstance(
struct.currency, Globals.Structs.CurrencyStruct), "Currency must be of type CurrencyStruct")
await self.checkCurrencyStruct(struct=struct.currency)
else: # if SETRF.S.F00(PRICE) feature is disabled
asserts.assert_is_none(struct.currency, "Currency must be None")
# checks BlockMode field must be BlockModeEnum
if struct.blockMode is not NullValue:
matter_asserts.assert_valid_enum(
struct.blockMode, "BlockMode attribute must return a BlockModeEnum", cluster.Enums.BlockModeEnum)
self.blockModeValue = struct.blockMode
async def checkTariffPeriodStruct(self,
struct: Clusters.CommodityTariff.Structs.TariffPeriodStruct = None) -> None:
"""
Checks the correctness of the TariffPeriodStruct data type entries.
Args:
struct (TariffPeriodStruct): TariffPeriodStruct data type entity.
"""
# checks Label field must be string
if struct.label is not NullValue:
matter_asserts.assert_is_string(struct.label, "Label must be a string")
asserts.assert_less_equal(len(struct.label), 128, "Label must have length at most 128!")
# checks DayEntryIDs field must be list with length in range 1 - 20
matter_asserts.assert_list(
struct.dayEntryIDs, "DayEntryIDs attribute must return a list with length in range 1 - 20", min_length=1, max_length=20)
for dayEntryID in struct.dayEntryIDs:
matter_asserts.assert_valid_uint32(dayEntryID, 'DayEntryID must has uint32 type.')
await self.check_list_elements_uniqueness(struct.dayEntryIDs, "DayEntryIDs")
# checks TariffComponentIDs field must be list with length in range 1 - 20
matter_asserts.assert_list(
struct.tariffComponentIDs, "TariffComponentIDs attribute must return a list with length in range 1 - 20", min_length=1, max_length=20)
for tariffComponentID in struct.tariffComponentIDs:
matter_asserts.assert_valid_uint32(tariffComponentID, 'TariffComponentID must has uint32 type.')
async def checkTariffPriceStruct(self,
struct: Clusters.CommodityTariff.Structs.TariffPriceStruct = None) -> None:
"""
Checks the correctness of the TariffPriceStruct data type entries.
Args:
struct (TariffPriceStruct): TariffPriceStruct data type entity.
"""
# checks PriceType field must be TariffPriceTypeEnum and in range 0 - 4
matter_asserts.assert_valid_enum(
struct.priceType, "PriceType attribute must return a TariffPriceTypeEnum", Globals.Enums.TariffPriceTypeEnum)
asserts.assert_greater_equal(struct.priceType, 0, "PriceType field of TariffPriceStruct must be greater than 0.")
asserts.assert_less_equal(struct.priceType, 4, "PriceType field of TariffPriceStruct must be less than 4.")
# checks Price field must be int64
if struct.price is not None:
matter_asserts.assert_valid_int64(struct.price, 'Price field of TariffPriceStruct must be money')
# checks PriceLevel field must be int16
if struct.priceLevel is not None:
matter_asserts.assert_valid_int16(struct.priceLevel, 'PriceLevel field of TariffPriceStruct must be int16')
async def send_test_event_trigger_for_attributes_value_set(self):
"""Simulate updating of values for all cluster attributes with valid data not equal to the pre-test state."""
await self.send_test_event_triggers(eventTrigger=self.EventTriggerFakeData)
async def send_test_event_trigger_clear(self):
"""Return the device to pre-test state."""
await self.send_test_event_triggers(eventTrigger=self.EventTriggerClear)
async def send_test_event_trigger_change_day(self):
"""This test event trigger ensure time shifting on 24h to simulate a day change."""
await self.send_test_event_triggers(eventTrigger=self.EventTriggerChangeDay)
async def send_test_event_trigger_change_time(self):
"""This test event trigger ensure time shifting on 4h to simulate a time change."""
await self.send_test_event_triggers(eventTrigger=self.EventTriggerChangeTime)
async def check_tariff_info_attribute(self, endpoint: int, attribute_value: Optional[cluster.Structs.TariffInformationStruct] = None) -> None:
"""Validate TariffInfo attribute.
Args:
endpoint (int): endpoint
attribute_value (cluster.Structs.TariffInformationStruct, optional): TariffInfo attribute value. Defaults to None.
"""
self.tariffInfoValue = attribute_value
# if attribute value is not set, read it
if not self.tariffInfoValue:
self.tariffInfoValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.TariffInfo)
logger.info(f"TariffInfo attribute value is: {self.tariffInfoValue}")
# if attribute value is not null it must be of type TariffInformationStruct
if self.tariffInfoValue is not NullValue:
asserts.assert_true(isinstance(
self.tariffInfoValue, cluster.Structs.TariffInformationStruct), "TariffInfo must be of type TariffInformationStruct")
await self.checkTariffInformationStruct(endpoint=endpoint, cluster=cluster, struct=self.tariffInfoValue)
async def check_tariff_unit_attribute(self, endpoint: int, attribute_value: Optional[Globals.Enums.TariffUnitEnum] = None) -> None:
"""Validate TariffUnit attribute.
Args:
endpoint (int): endpoint
attribute_value (Optional[Globals.Enums.TariffUnitEnum], optional): TariffUnit attribute value. Defaults to None.
"""
if not attribute_value:
attribute_value = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.TariffUnit)
logger.info(f"TariffUnit attribute value is: {attribute_value}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(attribute_value, NullValue, "TariffUnit must be Null when TariffInfo is Null")
# if attribute value is not null it must be of type TariffUnitEnum and in range 0 - 1
if attribute_value is not NullValue:
matter_asserts.assert_valid_enum(
attribute_value, "TariffUnit attribute must return a TariffUnitEnum", Globals.Enums.TariffUnitEnum)
asserts.assert_true(attribute_value >= 0 and attribute_value <= 1, "TariffUnit must be in range 0 - 1")
async def check_start_date_attribute(self, endpoint: int, attribute_value: Optional[int] = None) -> None:
"""Validate StartDate attribute.
Args:
endpoint (int): endpoint
attribute_value (Optional[int], optional): StartDate attribute value. Defaults to None.
"""
self.startDateAttributeValue = attribute_value
if not self.startDateAttributeValue:
self.startDateAttributeValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.StartDate)
logger.info(f"StartDate attribute value is: {self.startDateAttributeValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.startDateAttributeValue, NullValue, "StartDate must be Null when TariffInfo is Null")
# if attribute value is not null it must be of type uint32
if self.startDateAttributeValue is not NullValue:
matter_asserts.assert_valid_uint32(self.startDateAttributeValue, 'StartDate attribute must has uint32 type.')
async def check_day_entries_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.DayEntryStruct]] = None) -> None:
"""Validate DayEntries attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.DayEntryStruct]], optional): DayEntries attribute value. Defaults to None.
"""
self.dayEntriesValue = attribute_value
if not self.dayEntriesValue:
self.dayEntriesValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.DayEntries)
logger.info(f"DayEntries attribute value is: {self.dayEntriesValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.dayEntriesValue, NullValue, "DayEntries must be Null when TariffInfo is Null")
# if attribute value is not null it must be list of DayEntryStruct
if self.dayEntriesValue is not NullValue:
matter_asserts.assert_list(
self.dayEntriesValue, "DayEntries attribute must return a list with length less or equal 672", max_length=672)
matter_asserts.assert_list_element_type(
self.dayEntriesValue, cluster.Structs.DayEntryStruct, "DayEntries attribute must contain DayEntryStruct elements")
# is used to check DayEntryID uniqueness below
dayEntryIDs_from_day_entries_attribute = []
# check each DayEntryStruct
for item in self.dayEntriesValue:
await self.checkDayEntryStruct(endpoint=endpoint, cluster=cluster, struct=item)
dayEntryIDs_from_day_entries_attribute.append(item.dayEntryID)
# check DayEntryID uniqueness
await self.check_list_elements_uniqueness(dayEntryIDs_from_day_entries_attribute, "DayEntryIDs")
async def check_day_patterns_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.DayPatternStruct]] = None) -> None:
"""Validate DayPatterns attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.DayPatternStruct]], optional): DayPatterns attribute value. Defaults to None.
"""
self.dayPatternsValue = attribute_value
if not self.dayPatternsValue:
self.dayPatternsValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.DayPatterns)
logger.info(f"DayPatterns attribute value is: {self.dayPatternsValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.dayPatternsValue, NullValue, "DayPatterns must be Null when TariffInfo is Null")
# if attribute value is not null it must be list of DayPatternStruct with length in range 0 - 28
if self.dayPatternsValue is not NullValue:
matter_asserts.assert_list(
self.dayPatternsValue, "DayPatterns attribute must return a list with length less or equal 28", min_length=0, max_length=28)
if self.dayPatternsValue: # if list is not empty
matter_asserts.assert_list_element_type(
self.dayPatternsValue, cluster.Structs.DayPatternStruct, "DayPatterns attribute must contain DayPatternStruct elements")
for item in self.dayPatternsValue: # check each DayPatternStruct
await self.checkDayPatternStruct(struct=item)
async def check_calendar_periods_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.CalendarPeriodStruct]] = None) -> None:
"""Validate CalendarPeriods attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.CalendarPeriodStruct]], optional): CalendarPeriods attribute value. Defaults to None.
"""
self.calendarPeriodsValue = attribute_value
if not self.calendarPeriodsValue:
self.calendarPeriodsValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.CalendarPeriods)
if self.startDateAttributeValue is None:
self.startDateAttributeValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.StartDate)
self.check_start_date_attribute(endpoint, self.startDateAttributeValue)
logger.info(f"CalendarPeriods attribute value is: {self.calendarPeriodsValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.calendarPeriodsValue, NullValue, "CalendarPeriods must be Null when TariffInfo is Null")
# if attribute value is not null it must be list of CalendarPeriodStruct with length in range 1 - 4
if self.calendarPeriodsValue is not NullValue:
matter_asserts.assert_list(
self.calendarPeriodsValue, "CalendarPeriods attribute must return a list with length in range 1 - 4", min_length=1, max_length=4)
matter_asserts.assert_list_element_type(
self.calendarPeriodsValue, cluster.Structs.CalendarPeriodStruct, "CalendarPeriods attribute must contain CalendarPeriodStruct elements")
# check each CalendarPeriodStruct
for item in self.calendarPeriodsValue:
await self.checkCalendarPeriodStruct(struct=item, start_date_attribute=self.startDateAttributeValue)
# check CalendarPeriods order
for item in range(len(self.calendarPeriodsValue) - 1):
asserts.assert_less(self.calendarPeriodsValue[item].startDate, self.calendarPeriodsValue[item + 1].startDate,
"CalendarPeriods must be sorted by Date in increasing order!")
# If StartDate is Null, the first CalendarPeriod item Start Date field must also be Null
if self.startDateAttributeValue is NullValue:
asserts.assert_true(self.calendarPeriodsValue[0].startDate is NullValue,
"If StartDate is Null, the first CalendarPeriod item Start Date field must also be Null")
# If StartDate is Null only first CalendarPeriod item Start Date field must be Null, the other CalendarPeriod items Start Date field must not be Null
if len(self.calendarPeriodsValue) > 1:
for item in range(1, len(self.calendarPeriodsValue)):
asserts.assert_true(self.calendarPeriodsValue[item].startDate is not NullValue,
"If StartDate is Null only first CalendarPeriod item Start Date field must be Null, the other CalendarPeriod items Start Date field must not be Null")
async def check_individual_days_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.DayStruct]] = None) -> None:
"""Validate IndividualDays attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.DayStruct]], optional): IndividualDays attribute value. Defaults to None.
"""
self.individualDaysValue = attribute_value
if not self.individualDaysValue:
self.individualDaysValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.IndividualDays)
logger.info(f"IndividualDays attribute value is: {self.individualDaysValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.individualDaysValue, NullValue, "IndividualDays must be Null when TariffInfo is Null")
# if attribute value is not null it must be list of DayStruct with length in range 0 - 50
if self.individualDaysValue is not NullValue:
matter_asserts.assert_list(
self.individualDaysValue, "IndividualDays attribute must return a list with length less or equal 50", max_length=50)
matter_asserts.assert_list_element_type(
self.individualDaysValue, cluster.Structs.DayStruct, "IndividualDays attribute must contain DayStruct elements")
# check each DayStruct
for item in self.individualDaysValue:
await self.checkDayStruct(cluster=cluster, struct=item)
# check IndividualDays order
for item in range(len(self.individualDaysValue) - 1):
asserts.assert_less(self.individualDaysValue[item].date, self.individualDaysValue[item + 1].date,
"IndividualDays must be sorted by Date in increasing order!")
async def check_current_day_attribute(self, endpoint: int, attribute_value: Optional[cluster.Structs.DayStruct] = None) -> None:
"""Validate CurrentDay attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[cluster.Structs.DayStruct], optional): CurrentDay attribute value. Defaults to None.
"""
self.currentDayValue = attribute_value
if not self.currentDayValue:
self.currentDayValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.CurrentDay)
logger.info(f"CurrentDay attribute value is: {self.currentDayValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.currentDayValue, NullValue, "CurrentDay must be Null when TariffInfo is Null")
# if attribute value is not null it must be DayStruct
if self.currentDayValue is not NullValue:
asserts.assert_true(isinstance(
self.currentDayValue, cluster.Structs.DayStruct), "CurrentDay must be of type DayStruct")
await self.checkDayStruct(cluster=cluster, struct=self.currentDayValue)
async def check_next_day_attribute(self, endpoint: int, attribute_value: Optional[cluster.Structs.DayStruct] = None) -> None:
"""Validate NextDay attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[cluster.Structs.DayStruct], optional): NextDay attribute value. Defaults to None.
"""
self.nextDayValue = attribute_value
if not self.nextDayValue:
self.nextDayValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.NextDay)
logger.info(f"NextDay attribute value is: {self.nextDayValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.nextDayValue, NullValue, "NextDay must be Null when TariffInfo is Null")
# if attribute value is not null it must be DayStruct
if self.nextDayValue is not NullValue:
asserts.assert_true(isinstance(
self.nextDayValue, cluster.Structs.DayStruct), "NextDay must be of type DayStruct")
await self.checkDayStruct(cluster=cluster, struct=self.nextDayValue)
async def check_current_day_entry_attribute(self, endpoint: int, attribute_value: Optional[cluster.Structs.DayEntryStruct] = None) -> None:
"""Validate CurrentDayEntry attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[cluster.Structs.DayEntryStruct], optional): CurrentDayEntry attribute value. Defaults to None.
"""
if not attribute_value:
attribute_value = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.CurrentDayEntry)
logger.info(f"CurrentDayEntry attribute value is: {attribute_value}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(attribute_value, NullValue, "CurrentDayEntry must be Null when TariffInfo is Null")
# if attribute value is not null it must be DayEntryStruct
if attribute_value is not NullValue:
asserts.assert_true(isinstance(
attribute_value, cluster.Structs.DayEntryStruct), "CurrentDayEntry must be of type DayEntryStruct")
await self.checkDayEntryStruct(endpoint=endpoint, cluster=cluster, struct=attribute_value)
async def check_current_day_entry_date_attribute(self, endpoint: int, attribute_value: Optional[int] = None) -> None:
"""Validate CurrentDayEntryDate attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[int], optional): CurrentDayEntryDate attribute value. Defaults to None.
"""
self.currentDayEntryDateValue = attribute_value
if not self.currentDayEntryDateValue:
self.currentDayEntryDateValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.CurrentDayEntryDate)
logger.info(f"CurrentDayEntryDate attribute value is: {self.currentDayEntryDateValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.currentDayEntryDateValue, NullValue,
"CurrentDayEntryDate must be Null when TariffInfo is Null")
# if attribute value is not null it must be uint32
if self.currentDayEntryDateValue is not NullValue:
matter_asserts.assert_valid_uint32(self.currentDayEntryDateValue, 'CurrentDayEntryDate must be of type uint32')
async def check_next_day_entry_attribute(self, endpoint: int, attribute_value: Optional[cluster.Structs.DayEntryStruct] = None) -> None:
"""Validate NextDayEntry attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[cluster.Structs.DayEntryStruct], optional): NextDayEntry attribute value. Defaults to None.
"""
if not attribute_value:
attribute_value = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.NextDayEntry)
logger.info(f"NextDayEntry attribute value is: {attribute_value}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(attribute_value, NullValue, "NextDayEntry must be Null when TariffInfo is Null")
# if attribute value is not null it must be DayEntryStruct
if attribute_value is not NullValue:
asserts.assert_true(isinstance(
attribute_value, cluster.Structs.DayEntryStruct), "NextDayEntry must be of type DayEntryStruct")
await self.checkDayEntryStruct(endpoint=endpoint, cluster=cluster, struct=attribute_value)
async def check_next_day_entry_date_attribute(self, endpoint: int, attribute_value: Optional[int] = None) -> None:
"""Validate NextDayEntryDate attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[int], optional): NextDayEntryDate attribute value. Defaults to None.
"""
self.nextDayEntryDateValue = attribute_value
if not self.nextDayEntryDateValue:
self.nextDayEntryDateValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.NextDayEntryDate)
logger.info(f"NextDayEntryDate attribute value is: {self.nextDayEntryDateValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.nextDayEntryDateValue, NullValue, "NextDayEntryDate must be Null when TariffInfo is Null")
# if attribute value is not null it must be uint32 and greater than CurrentDayEntryDate
if self.nextDayEntryDateValue is not NullValue:
matter_asserts.assert_valid_uint32(self.nextDayEntryDateValue, 'NextDayEntryDate must be of type uint32')
asserts.assert_greater(self.nextDayEntryDateValue, self.currentDayEntryDateValue,
"NextDayEntryDate must be greater than CurrentDayEntryDate")
async def check_tariff_components_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.TariffComponentStruct]] = None) -> None:
"""Validate TariffComponents attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.TariffComponentStruct]], optional): TariffComponents attribute value. Defaults to None.
"""
self.tariffComponentsValue = attribute_value
if not self.tariffComponentsValue:
self.tariffComponentsValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.TariffComponents)
logger.info(f"TariffComponents attribute value is: {self.tariffComponentsValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.tariffComponentsValue, NullValue, "TariffComponents must be empty when TariffInfo is Null")
# if attribute value is not null it must be list of TariffComponentStruct
if self.tariffComponentsValue is not NullValue:
matter_asserts.assert_list(
self.tariffComponentsValue, "TariffComponents attribute must return a list with length greater or equal 1", min_length=1, max_length=672)
matter_asserts.assert_list_element_type(
self.tariffComponentsValue, cluster.Structs.TariffComponentStruct, "TariffComponents attribute must contain TariffComponentStruct elements")
# check each TariffComponentStruct
for item in self.tariffComponentsValue:
await self.checkTariffComponentStruct(endpoint=endpoint, cluster=cluster, struct=item)
async def check_tariff_periods_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.TariffPeriodStruct]] = None) -> None:
"""Validate TariffPeriods attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.TariffPeriodStruct]], optional): TariffPeriods attribute value. Defaults to None.
"""
self.tariffPeriodsValue = attribute_value
if not self.tariffPeriodsValue:
self.tariffPeriodsValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.TariffPeriods)
logger.info(f"TariffPeriods attribute value is: {self.tariffPeriodsValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.tariffPeriodsValue, NullValue, "TariffPeriods must be empty when TariffInfo is Null")
# if attribute value is not null it must be list of TariffPeriodStruct
if self.tariffPeriodsValue is not NullValue:
matter_asserts.assert_list(
self.tariffPeriodsValue, "TariffPeriods attribute must return a list with length greater or equal 1", min_length=1, max_length=672)
matter_asserts.assert_list_element_type(
self.tariffPeriodsValue, cluster.Structs.TariffPeriodStruct, "TariffPeriods attribute must contain TariffPeriodStruct elements")
# check each TariffPeriodStruct
for tariff_period in self.tariffPeriodsValue:
await self.checkTariffPeriodStruct(struct=tariff_period)
await self.validate_tariff_component_ID_uniqueness_for_features(await self.get_tariff_components_by_its_IDs(tariff_period.tariffComponentIDs))
async def check_current_tariff_components_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.TariffComponentStruct]] = None) -> None:
"""Validate CurrentTariffComponents attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.TariffComponentStruct]], optional): CurrentTariffComponents attribute value. Defaults to None.
"""
self.currentTariffComponentsValue = attribute_value
if not self.currentTariffComponentsValue:
self.currentTariffComponentsValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.CurrentTariffComponents)
logger.info(f"CurrentTariffComponents attribute value is: {self.currentTariffComponentsValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.currentTariffComponentsValue, NullValue,
"CurrentTariffComponents must be Null when TariffInfo is Null")
# if attribute value is not null it must be list of TariffComponentStruct with length in range 0 - 20
if self.currentTariffComponentsValue is not NullValue:
matter_asserts.assert_list(
self.currentTariffComponentsValue, "CurrentTariffComponents attribute must return a list with length less or equal 20", max_length=20)
matter_asserts.assert_list_element_type(
self.currentTariffComponentsValue, cluster.Structs.TariffComponentStruct, "CurrentTariffComponents attribute must contain TariffComponentStruct elements")
# check each TariffComponentStruct
for item in self.currentTariffComponentsValue:
await self.checkTariffComponentStruct(endpoint=endpoint, cluster=cluster, struct=item)
async def check_next_tariff_components_attribute(self, endpoint: int, attribute_value: Optional[List[cluster.Structs.TariffComponentStruct]] = None) -> None:
"""Validate NextTariffComponents attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[List[cluster.Structs.TariffComponentStruct]], optional): NextTariffComponents attribute value. Defaults to None.
"""
self.nextTariffComponentsValue = attribute_value
if not self.nextTariffComponentsValue:
self.nextTariffComponentsValue = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.NextTariffComponents)
logger.info(f"NextTariffComponents attribute value is: {self.nextTariffComponentsValue}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.nextTariffComponentsValue, NullValue,
"NextTariffComponents must be Null when TariffInfo is Null")
# if attribute value is not null it must be list of TariffComponentStruct with length in range 0 - 20
if self.nextTariffComponentsValue is not NullValue:
matter_asserts.assert_list(
self.nextTariffComponentsValue, "NextTariffComponents attribute must return a list with length less or equal 20", max_length=20)
matter_asserts.assert_list_element_type(
self.nextTariffComponentsValue, cluster.Structs.TariffComponentStruct, "NextTariffComponents attribute must contain TariffComponentStruct elements")
# check TariffComponentStruct
for item in self.nextTariffComponentsValue:
await self.checkTariffComponentStruct(endpoint=endpoint, cluster=cluster, struct=item)
async def check_default_randomization_offset_attribute(self, endpoint: int, attribute_value: Optional[int] = None) -> None:
"""Validate DefaultRandomizationOffset attribute.
Args:
endpoint (int): endpoint;
attribute_value (Optional[int], optional): DefaultRandomizationOffset attribute value. Defaults to None.
"""
self.defaultRandomizationOffset = attribute_value
if self.defaultRandomizationOffset is None:
if await self.attribute_guard(endpoint=endpoint, attribute=cluster.Attributes.DefaultRandomizationOffset):
self.defaultRandomizationOffset = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.DefaultRandomizationOffset)
logger.info(f"DefaultRandomizationOffset attribute value is: {self.defaultRandomizationOffset}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.defaultRandomizationOffset, NullValue,
"DefaultRandomizationOffset must be Null when TariffInfo is Null")
# check if RNDM feature and DefaultRandomizationOffset attribute are enabled
if (await self.attribute_guard(endpoint=endpoint, attribute=cluster.Attributes.DefaultRandomizationOffset) and
await self.feature_guard(endpoint=endpoint, cluster=cluster, feature_int=cluster.Bitmaps.Feature.kRandomization)):
# if feature is enabled, DefaultRandomizationOffset attribute must not be None
asserts.assert_is_not_none(
self.defaultRandomizationOffset, "DefaultRandomizationOffset attribute must not be None if RNDM feature is enabled.")
# check that DefaultRandomizationOffset attribute is NullValue if DefaultRandomizationType attribute is NullValue
if self.defaultRandomizationType is NullValue:
asserts.assert_equal(self.defaultRandomizationOffset, NullValue,
"DefaultRandomizationOffset attribute must be NullValue if DefaultRandomizationType is NullValue.")
# check DefaultRandomizationOffset value depending on DefaultRandomizationType
if self.defaultRandomizationOffset is not None and self.defaultRandomizationOffset is not NullValue:
await self.check_randomization_offset(self.defaultRandomizationType, self.defaultRandomizationOffset)
else: # if feature is disabled, DefaultRandomizationOffset attribute must be None
asserts.assert_is_none(
self.defaultRandomizationOffset, "DefaultRandomizationOffset attribute must be None if RNDM feature is disabled.")
async def check_default_randomization_type_attribute(self, endpoint: int, attribute_value: Optional[cluster.Enums.DayEntryRandomizationTypeEnum] = None) -> None:
self.defaultRandomizationType = attribute_value
if self.defaultRandomizationType is None:
if await self.attribute_guard(endpoint=endpoint, attribute=cluster.Attributes.DefaultRandomizationType):
self.defaultRandomizationType = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=cluster.Attributes.DefaultRandomizationType)
logger.info(f"DefaultRandomizationType attribute value is: {self.defaultRandomizationType}")
if self.tariffInfoValue is not None and self.tariffInfoValue is NullValue:
asserts.assert_equal(self.defaultRandomizationType, NullValue,
"DefaultRandomizationType must be Null when TariffInfo is Null")
# check if RNDM feature is enabled
if (await self.attribute_guard(endpoint=endpoint, attribute=cluster.Attributes.DefaultRandomizationType) and
await self.feature_guard(endpoint=endpoint, cluster=cluster, feature_int=cluster.Bitmaps.Feature.kRandomization)):
# if feature is enabled, DefaultRandomizationType attribute must not be None
asserts.assert_is_not_none(
self.defaultRandomizationType, "DefaultRandomizationType attribute must not be None if RNDM feature is enabled.")
# check DefaultRandomizationType must be of type DayEntryRandomizationTypeEnum and in range 0 - 4
if self.defaultRandomizationType is not NullValue:
matter_asserts.assert_valid_enum(
self.defaultRandomizationType, "DefaultRandomizationType attribute must return a DayEntryRandomizationTypeEnum", cluster.Enums.DayEntryRandomizationTypeEnum)
asserts.assert_greater_equal(self.defaultRandomizationType, 0,
"DefaultRandomizationType must be greater or equal than 0.")
asserts.assert_less_equal(self.defaultRandomizationType, 4,
"DefaultRandomizationType must be less or equal than 4.")
else: # if feature is disabled, DefaultRandomizationType attribute must be None
asserts.assert_is_none(
self.defaultRandomizationType, "DefaultRandomizationType attribute must be None if RNDM feature is disabled.")
async def convert_matter_time_to_posix_epoch_time(self, matter_time: int) -> int:
MATTER_UNIX_EPOCH_OFFSET = 10957 * 24 * 60 * 60
return matter_time + MATTER_UNIX_EPOCH_OFFSET
async def get_day_pattern_IDs_for_active_calendar_period(self, next: bool = False) -> List[int]:
"""Define active calendar period based on CurrentDayEntryDateValue or NextDayEntryDateValue and extract DayPatternIDs field.
Args:
next (bool, optional): If True, use NextDayEntryDateValue. Defaults to False (Use CurrentDayEntryDateValue).
Returns:
List[int]: DayPatternIDs field of active calendar period.
"""
day_patterns_IDs = []
if not next:
date = self.currentDayEntryDateValue
else:
date = self.nextDayEntryDateValue
for period in self.calendarPeriodsValue[::-1]:
if period.startDate < date:
day_patterns_IDs = period.dayPatternIDs
return day_patterns_IDs
async def get_day_of_week_from_day_entry_date(self, day_entry_date: int) -> int:
weekDays = {
0: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kMonday,
1: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kTuesday,
2: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kWednesday,
3: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kThursday,
4: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kFriday,
5: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kSaturday,
6: cluster.Bitmaps.DayPatternDayOfWeekBitmap.kSunday
}
day_entry_date_epoch = await self.convert_matter_time_to_posix_epoch_time(day_entry_date)
dayOfWeek = datetime.datetime.fromtimestamp(day_entry_date_epoch).weekday()
return weekDays[dayOfWeek]
async def get_day_entry_IDs_from_tariff_periods_for_particular_tariff_component(self, tariff_componentID: int) -> List[int]:
"""Extracts DayEntryIDs field values from the list of TariffPeriodStruct where particular TariffComponentID is present.
Args:
tariff_componentID (int): TariffComponentID to search for in TariffComponentIDs field.
Returns:
List[int]: List of all DayEntryIDs from the list of TariffPeriodStruct where particular TariffComponentID is present.
"""
return sorted(set([dayEntryID for tariff_period in self.tariffPeriodsValue if tariff_componentID in tariff_period.tariffComponentIDs for dayEntryID in tariff_period.dayEntryIDs]))
async def get_tariff_period_label_for_particular_tariff_component(self, tariff_componentID: int) -> List[int]:
"""Gets Label field from the TariffPeriodStruct where particular TariffComponentID is present.
Args:
tariff_componentID (int): TariffComponentID to search for in TariffComponentIDs field.
Returns:
List[int]: Label field from the TariffPeriodStruct where particular TariffComponentID is present.
"""
for tariff_period in self.tariffPeriodsValue:
if tariff_componentID in tariff_period.tariffComponentIDs:
return tariff_period.label
async def get_tariff_components_IDs_from_tariff_components_attribute(self, tariff_components: List[cluster.Structs.TariffComponentStruct]) -> List[int]:
"""Extracts TariffComponentIDs from the list of TariffComponentStruct entities.
Args:
tariff_components (List[cluster.Structs.TariffComponentStruct]): List of TariffComponentStruct entities.
Returns:
List[int]: List of TariffComponentIDs only.
"""
return [tariff_component.tariffComponentID for tariff_component in tariff_components]
async def get_tariff_components_by_its_IDs(self, tariff_components_IDs: List[int]) -> List[cluster.Structs.TariffComponentStruct]:
"""Extracts TariffComponentStruct entities by TariffComponentIDs.
Args:
tariff_components_IDs (List[int]): List of TariffComponentIDs.
Returns:
List[cluster.Structs.TariffComponentStruct]: List of TariffComponentStruct entities with specified TariffComponentIDs.
"""
if self.tariffComponentsValue is None:
self.tariffComponentsValue = await self.read_single_attribute_check_success(endpoint=self.get_endpoint(), cluster=cluster, attribute=cluster.Attributes.TariffComponents)
return [tariff_component for tariff_component in self.tariffComponentsValue if tariff_component.tariffComponentID in tariff_components_IDs]
async def get_day_entry_IDs_from_day_entries_attribute(self, tariff_components: List[cluster.Structs.DayEntryStruct]) -> List[int]:
"""Extracts DayEntryIDs from the list of DayEntryStruct entities.
Args:
tariff_components (List[cluster.Structs.DayEntryStruct]): List of DayEntryStruct entities.
Returns:
List[int]: List of DayEntryIDs only.
"""
return [tariff_component.dayEntryID for tariff_component in tariff_components]
async def generate_unique_uint32_for_IDs(self, list_of_IDs: List[int]) -> int:
"""This function generates random ID (uint32) that is not in the list of given IDs.
Is intended to be used in TC_SETRF_2_2.
Args:
list_of_IDs (List[int]): List of IDs (for example, DayEntryIDs or TariffComponentIDs).
Returns:
int: ID (uint32) that is not in the list of given IDs
"""
IDs_set = set(list_of_IDs)
while True:
new_ID = random.randint(0, 2**32 - 1)
if new_ID not in IDs_set:
return new_ID
async def verify_reporting(self, reports: dict, attribute: ClusterObjects.ClusterAttributeDescriptor, attribute_name: str, saved_value) -> bool:
"""This function verifies that the reported value is different from the saved value.
Args:
reports (dict): All reports stored in attribute_reports attribute in subscription handler object;
attribute (ClusterObjects.ClusterAttributeDescriptor): Attribute to check;
attribute_name (str): String with attribute name for error message;
saved_value: Value that has been saved before TestEventTrigger sending;
Returns:
bool: - True if report is presented;
- False if there are no reports at all;
"""
try:
asserts.assert_not_equal(reports[attribute][0].value, saved_value,
f"""Reported '{attribute_name}' value should be different from saved value.
Subscriptions should only report when values have changed.""")
return True
except (KeyError, IndexError) as err:
asserts.fail(f"There are no reports for attribute {attribute_name}:\n{err}")
return False
@staticmethod
def _tariff_info_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.TariffInfo:
return True
else:
return False
return AttributeMatcher.from_callable(description="TariffInfo", matcher=predicate)
@staticmethod
def _tariff_unit_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.TariffUnit:
return True
else:
return False
return AttributeMatcher.from_callable(description="TariffUnit", matcher=predicate)
@staticmethod
def _start_date_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.StartDate:
return True
else:
return False
return AttributeMatcher.from_callable(description="StartDate", matcher=predicate)
@staticmethod
def _day_entries_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.DayEntries:
return True
else:
return False
return AttributeMatcher.from_callable(description="DayEntries", matcher=predicate)
@staticmethod
def _day_patterns_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.DayPatterns:
return True
else:
return False
return AttributeMatcher.from_callable(description="DayPatterns", matcher=predicate)
@staticmethod
def _calendar_periods_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.CalendarPeriods:
return True
else:
return False
return AttributeMatcher.from_callable(description="CalendarPeriods", matcher=predicate)
@staticmethod
def _individual_days_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.IndividualDays:
return True
else:
return False
return AttributeMatcher.from_callable(description="IndividualDays", matcher=predicate)
@staticmethod
def _current_day_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.CurrentDay:
return True
else:
return False
return AttributeMatcher.from_callable(description="CurrentDay", matcher=predicate)
@staticmethod
def _next_day_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.NextDay:
return True
else:
return False
return AttributeMatcher.from_callable(description="NextDay", matcher=predicate)
@staticmethod
def _current_day_entry_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.CurrentDayEntry:
return True
else:
return False
return AttributeMatcher.from_callable(description="CurrentDayEntry", matcher=predicate)
@staticmethod
def _current_day_entry_date_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.CurrentDayEntryDate:
return True
else:
return False
return AttributeMatcher.from_callable(description="CurrentDayEntryDate", matcher=predicate)
@staticmethod
def _next_day_entry_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.NextDayEntry:
return True
else:
return False
return AttributeMatcher.from_callable(description="NextDayEntry", matcher=predicate)
@staticmethod
def _next_day_entry_date_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.NextDayEntryDate:
return True
else:
return False
return AttributeMatcher.from_callable(description="NextDayEntryDate", matcher=predicate)
@staticmethod
def _tariff_components_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.TariffComponents:
return True
else:
return False
return AttributeMatcher.from_callable(description="TariffComponents", matcher=predicate)
@staticmethod
def _tariff_periods_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.TariffPeriods:
return True
else:
return False
return AttributeMatcher.from_callable(description="TariffPeriods", matcher=predicate)
@staticmethod
def _current_tariff_component_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.CurrentTariffComponents:
return True
else:
return False
return AttributeMatcher.from_callable(description="CurrentTariffComponents", matcher=predicate)
@staticmethod
def _next_tariff_component_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.NextTariffComponents:
return True
else:
return False
return AttributeMatcher.from_callable(description="NextTariffComponents", matcher=predicate)
@staticmethod
def _default_randomization_offset_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.DefaultRandomizationOffset:
return True
else:
return False
return AttributeMatcher.from_callable(description="DefaultRandomizationOffset", matcher=predicate)
@staticmethod
def _default_randomization_type_matcher() -> AttributeMatcher:
def predicate(report: AttributeValue) -> bool:
if report.attribute == cluster.Attributes.DefaultRandomizationType:
return True
else:
return False
return AttributeMatcher.from_callable(description="DefaultRandomizationType", matcher=predicate)
def get_mandatory_matchers(self) -> List[AttributeMatcher]:
return [
self._tariff_info_matcher(),
self._tariff_unit_matcher(),
self._start_date_matcher(),
self._day_entries_matcher(),
self._day_patterns_matcher(),
self._calendar_periods_matcher(),
self._individual_days_matcher(),
self._current_day_matcher(),
self._next_day_matcher(),
self._current_day_entry_matcher(),
self._current_day_entry_date_matcher(),
self._next_day_entry_matcher(),
self._next_day_entry_date_matcher(),
self._tariff_components_matcher(),
self._tariff_periods_matcher(),
self._current_tariff_component_matcher(),
self._next_tariff_component_matcher()
]
async def search_day_pattern_for_given_day_of_week_check_day_entry_IDs(self, list_of_day_pattern_IDs: List[int],
day_of_week: int,
list_of_day_entry_IDs: List[int],
current_or_next: str) -> None:
"""1. Searches DayPattern in currently active CalendarPeriod that contains the given day of week.
2. Checks that DayEntryIDs from DayPattern are equal to DayEntryIDs from CurrentDay/NextDay attribute.
Args:
list_of_day_pattern_IDs (List[int]): List of DayPatternIDs from defined active CalendarPeriodStruct.
day_of_week (int): Day of Week defined based on CurrentDayEntryDate/NextDayEntryDate attribute value.
list_of_day_entry_IDs (List[int]): List of DayEntryIDs from CurrentDay/NextDay attribute.
current_or_next (str): "Current" or "Next".
"""
day_pattern_found = False
for dayPattern in self.dayPatternsValue:
if dayPattern.dayPatternID in list_of_day_pattern_IDs:
if (dayPattern.daysOfWeek & day_of_week) == day_of_week:
asserts.assert_equal(sorted(dayPattern.dayEntryIDs), sorted(list_of_day_entry_IDs),
f"DayEntryIDs from DayPatterns must be equal to DayEntryIDs from {current_or_next}Day.")
day_pattern_found = True
return
if not day_pattern_found:
asserts.fail(f"DayPattern not found for {current_or_next}DayEntryDate attribute value.")
async def validate_tariff_component_ID_uniqueness_for_features(self, tariff_components_list: List[cluster.Structs.TariffComponentStruct]) -> None:
# flags to check specific feature fields in mask
features_fields_flags = {
"price": 1 << 0,
"friendlyCredit": 1 << 1,
"auxiliaryLoad": 1 << 2,
"peakPeriod": 1 << 3,
"powerThreshold": 1 << 4
}
# store feature mask for each unique threshold value
groups_by_threshold_field = {}
# iterate over all tariff components
for tariff_component in tariff_components_list:
# skip predicted tariff components
if tariff_component.predicted is True:
continue
# Add Threshold value if it is not in groups_by_threshold_field
if tariff_component.threshold not in groups_by_threshold_field:
groups_by_threshold_field[tariff_component.threshold] = 0
# feature mask for current tariff_component
current_component_features_flags = 0
# iterate over all feature flags
for feature_flag in features_fields_flags.keys():
current_component_features_flags |= features_fields_flags[feature_flag] if getattr(
tariff_component, feature_flag, None) is not None else current_component_features_flags | 0
# validate that there are no duplicate feature fields (comparing bitwise AND with current component feature flags mask and
# feature mask for current threshold value in groups_by_threshold_field)
asserts.assert_equal(current_component_features_flags & groups_by_threshold_field[tariff_component.threshold], 0,
f"TariffComponentID {tariff_component.tariffComponentID} must have unique combination of feature fields for given Threshold.")
# If validation above is successful then update feature mask for current threshold based on current component feature flags
groups_by_threshold_field[tariff_component.threshold] |= current_component_features_flags