blob: a8368c5cc965072efcdcb8da1678fbf105e14d0f [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import random
import string
import time
import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import type_matches
from mobly import asserts
class DRLK_COMMON:
async def read_drlk_attribute_expect_success(self, attribute):
cluster = Clusters.Objects.DoorLock
return await self.read_single_attribute_check_success(endpoint=self.endpoint, cluster=cluster, attribute=attribute)
async def write_drlk_attribute_expect_success(self, attribute):
cluster = Clusters.Objects.DoorLock
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(self.endpoint, attribute)])
err_msg = "Received error status {} when writing {}:{}".format(str(result[0].Status), str(cluster), str(attribute))
asserts.assert_equal(result[0].Status, Status.Success, err_msg)
async def write_drlk_attribute_expect_error(self, attribute, error):
cluster = Clusters.Objects.DoorLock
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(self.endpoint, attribute)])
err_msg = "Did not see expected error {} when writing {}:{}".format(str(error), str(cluster), str(attribute))
asserts.assert_equal(result[0].Status, error, err_msg)
async def send_drlk_cmd_expect_success(self, command) -> None:
await self.send_single_cmd(cmd=command, endpoint=self.endpoint, timedRequestTimeoutMs=1000)
async def send_drlk_cmd_expect_error(self, command, error: Status) -> None:
try:
await self.send_single_cmd(cmd=command, endpoint=self.endpoint, timedRequestTimeoutMs=1000)
asserts.assert_true(False, "Unexpected command success, command=%s", command)
except InteractionModelError as e:
asserts.assert_equal(e.status, error, "Unexpected error returned")
pass
async def send_set_credential_cmd(self, operationType, credential, credentialData, userIndex, userStatus, userType) -> Clusters.Objects.DoorLock.Commands.SetCredentialResponse:
ret = await self.send_single_cmd(cmd=Clusters.Objects.DoorLock.Commands.SetCredential(operationType=operationType,
credential=credential,
credentialData=credentialData,
userIndex=userIndex,
userStatus=userStatus,
userType=userType),
endpoint=self.endpoint,
timedRequestTimeoutMs=1000)
asserts.assert_true(type_matches(ret, Clusters.Objects.DoorLock.Commands.SetCredentialResponse),
"Unexpected return type for SetCredential")
asserts.assert_true(ret.status == Status.Success, "Error sending SetCredential command, status={}".format(str(ret.status)))
return ret
async def send_clear_credential_cmd(self, credential) -> None:
await self.send_single_cmd(cmd=Clusters.Objects.DoorLock.Commands.ClearCredential(credential=credential),
endpoint=self.endpoint,
timedRequestTimeoutMs=1000)
ret = await self.send_single_cmd(cmd=Clusters.Objects.DoorLock.Commands.GetCredentialStatus(credential=self.createdCredential),
endpoint=self.endpoint)
asserts.assert_true(type_matches(ret, Clusters.Objects.DoorLock.Commands.GetCredentialStatusResponse),
"Unexpected return type for GetCredentialStatus")
asserts.assert_false(ret.credentialExists, "Error clearing Credential (credentialExists==True)")
async def cleanup_users_and_credentials(self):
self.print_step("Cleanup", "Clear created User and Credential on the DUT")
if self.createdCredential:
await self.send_clear_credential_cmd(self.createdCredential)
logging.info("Credential cleared at CredentialIndex %d" % (self.createdCredential.credentialIndex))
self.createdCredential = None
async def generate_pincode(self, maxPincodeLength):
return ''.join(random.choices(string.digits, k=maxPincodeLength))
async def teardown(self):
await self.cleanup_users_and_credentials()
async def run_drlk_test_common(self, lockUnlockCommand, lockUnlockCmdRspPICS, lockUnlockText, doAutoRelockTest):
is_ci = self.check_pics('PICS_SDK_CI_ONLY')
self.createdCredential = None
self.endpoint = self.user_params.get("endpoint", 1)
# Allow for user overrides of these values
credentialIndex = self.user_params.get("credential_index", 1)
userCodeTemporaryDisableTime = self.user_params.get("user_code_temporary_disable_time", 15)
wrongCodeEntryLimit = self.user_params.get("wrong_code_entry_limit", 3)
autoRelockTime = self.user_params.get("auto_relock_time", 60)
if is_ci:
autoRelockTime = 10
cluster = Clusters.Objects.DoorLock
attributes = Clusters.DoorLock.Attributes
validPincode = None
invalidPincode = None
self.print_step(0, "Commissioning, already done")
if self.check_pics("DRLK.S.F00") and self.check_pics("DRLK.S.F07"):
self.print_step("Preconditions.1a",
"TH reads MaxPINCodeLength attribute from DUT and generates a valid PINCode")
maxPincodeLength_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.MaxPINCodeLength)
logging.info("MaxPINCodeLength value is %s" % (maxPincodeLength_dut))
validPincodeString = await self.generate_pincode(maxPincodeLength_dut)
while True:
invalidPincodeString = await self.generate_pincode(maxPincodeLength_dut)
if invalidPincodeString != validPincodeString:
break
logging.info("Valid PinCode=%s, Invalid PinCode=%s" % (validPincodeString, invalidPincodeString))
validPincode = bytes(validPincodeString, 'ascii')
invalidPincode = bytes(invalidPincodeString, 'ascii')
self.print_step("Preconditions.1b",
"TH sends SetCredential command to DUT to set up User and Credential at CredentialIndex {}".format(str(credentialIndex)))
credential = cluster.Structs.CredentialStruct(credentialIndex=credentialIndex,
credentialType=Clusters.DoorLock.Enums.CredentialTypeEnum.kPin)
ret = await self.send_set_credential_cmd(Clusters.DoorLock.Enums.DataOperationTypeEnum.kAdd,
credential,
validPincode,
NullValue,
NullValue,
NullValue)
logging.info("Credential created at CredentialIndex %d, UserIndex %d." % (credentialIndex, ret.userIndex))
self.createdCredential = credential
requirePinForRemoteOperation_dut = False
if self.check_pics("DRLK.S.F00") and self.check_pics("DRLK.S.F07"):
self.print_step("1", "TH writes the RequirePINforRemoteOperation attribute value as false on the DUT")
attribute = attributes.RequirePINforRemoteOperation(False)
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
print("---------------------- PICS is true")
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
print("---------------------- PICS is false")
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
if self.check_pics("DRLK.S.A0033"):
self.print_step("2", "TH reads and saves the value of the RequirePINforRemoteOperation attribute from the DUT")
requirePinForRemoteOperation_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.RequirePINforRemoteOperation)
logging.info("Current RequirePINforRemoteOperation value is %s" % (requirePinForRemoteOperation_dut))
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
self.print_step("2a", "TH verifies that RequirePINforRemoteOperation is FALSE")
asserts.assert_false(requirePinForRemoteOperation_dut, "RequirePINforRemoteOperation is expected to be FALSE")
else:
asserts.assert_true(
False, "RequirePINforRemoteOperation is a mandatory attribute if DRLK.S.F07(COTA) & DRLK.S.F00(PIN)")
if self.check_pics(lockUnlockCmdRspPICS):
self.print_step("3", "TH sends %s Command to the DUT without PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=None)
if requirePinForRemoteOperation_dut:
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
else:
await self.send_drlk_cmd_expect_success(command=command)
self.print_step("4", "TH sends %s Command to the DUT with valid PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
else:
asserts.assert_true(False, "%sResponse is a mandatory command response and must be supported in PICS" % lockUnlockText)
if self.check_pics("DRLK.S.F00") and self.check_pics("DRLK.S.F07"):
self.print_step("5", "TH writes the RequirePINforRemoteOperation attribute value as true on the DUT")
attribute = attributes.RequirePINforRemoteOperation(True)
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
if self.check_pics("DRLK.S.A0033"):
self.print_step("6", "TH reads and saves the value of the RequirePINforRemoteOperation attribute from the DUT")
requirePinForRemoteOperation_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.RequirePINforRemoteOperation)
logging.info("Current RequirePINforRemoteOperation value is %s" % (requirePinForRemoteOperation_dut))
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
self.print_step("6a", "TH verifies that RequirePINforRemoteOperation is TRUE")
asserts.assert_true(requirePinForRemoteOperation_dut, "RequirePINforRemoteOperation is expected to be TRUE")
if self.check_pics("DRLK.S.F00") and self.check_pics(lockUnlockCmdRspPICS) and self.check_pics("DRLK.S.A0033"):
self.print_step("7", "TH sends %s Command to the DUT with an invalid PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=invalidPincode)
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
self.print_step("8", "TH sends %s Command to the DUT without PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=None)
if requirePinForRemoteOperation_dut:
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
else:
await self.send_drlk_cmd_expect_success(command=command)
if self.check_pics(lockUnlockCmdRspPICS) and self.check_pics("DRLK.S.A0033"):
self.print_step("9", "TH sends %s Command to the DUT with valid PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
if self.check_pics("DRLK.S.F00") or self.check_pics("DRLK.S.F01"):
self.print_step("10a", "TH writes the WrongCodeEntryLimit to any value between 1 and 255")
attribute = attributes.WrongCodeEntryLimit(wrongCodeEntryLimit)
if self.check_pics("DRLK.S.M.WrongCodeEntryLimitAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
self.print_step("10b", "TH reads the value of WrongCodeEntryLimit attribute. Verify a range of 1-255")
wrongCodeEntryLimit_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.WrongCodeEntryLimit)
logging.info("WrongCodeEntryLimit value is %s" % (wrongCodeEntryLimit_dut))
asserts.assert_in(wrongCodeEntryLimit_dut, range(1, 255), "WrongCodeEntryLimit value is out of range")
self.print_step("11a", "TH writes the UserCodeTemporaryDisableTime to any value between 1 and 255")
attribute = attributes.UserCodeTemporaryDisableTime(userCodeTemporaryDisableTime)
if self.check_pics("DRLK.S.M.UserCodedTemporaryDisableTimeAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
self.print_step("11b", "TH reads the value of UserCodeTemporaryDisableTime attribute. Verify a range of 1-255")
userCodeTemporaryDisableTime_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.UserCodeTemporaryDisableTime)
logging.info("UserCodeTemporaryDisableTime value is %s" % (userCodeTemporaryDisableTime_dut))
asserts.assert_in(userCodeTemporaryDisableTime_dut, range(1, 255), "UserCodeTemporaryDisableTime value is out of range")
if self.check_pics(lockUnlockCmdRspPICS) and self.check_pics("DRLK.S.F00"):
self.print_step("12", "TH sends {} Command to the DUT with an invalid PINCode, repeated {} times".format(
lockUnlockText, wrongCodeEntryLimit_dut))
for i in range(wrongCodeEntryLimit_dut):
command = lockUnlockCommand(PINCode=invalidPincode)
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
self.print_step("13", "TH sends %s Command to the DUT with valid PINCode. Verify failure or no response" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
if self.check_pics("DRLK.S.A0031"):
self.print_step("14", "Wait for UserCodeTemporaryDisableTime seconds")
time.sleep(userCodeTemporaryDisableTime_dut)
if not doAutoRelockTest:
self.print_step("15", "Send %s with valid Pincode and verify success" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
if doAutoRelockTest:
if self.check_pics("DRLK.S.A0023"):
self.print_step("15", "TH writes the AutoRelockTime attribute value on the DUT")
attribute = attributes.AutoRelockTime(autoRelockTime)
if self.check_pics("DRLK.S.M.AutoRelockTimeAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
self.print_step("16", "TH reads the value of AutoRelockTime attribute.")
autoRelockTime_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.AutoRelockTime)
logging.info("AutoRelockTime value is %s" % (autoRelockTime_dut))
if self.check_pics(lockUnlockCmdRspPICS):
self.print_step("17", "Send %s with valid Pincode and verify success" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
if self.check_pics("DRLK.S.A0023"):
self.print_step("18a", "Wait for AutoRelockTime seconds")
# Add additional wait time buffer for motor movement, etc.
time.sleep(autoRelockTime_dut + 5)
if self.check_pics("DRLK.S.A0000"):
self.print_step("18b", "TH reads LockState attribute after AutoRelockTime Expires")
lockstate_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.LockState)
logging.info("Current LockState is %s" % (lockstate_dut))
asserts.assert_equal(lockstate_dut, Clusters.DoorLock.Enums.DlLockState.kLocked,
"LockState expected to be value==Locked")
await self.cleanup_users_and_credentials()
async def run_drlk_test_2_2(self):
await self.run_drlk_test_common(lockUnlockCommand=Clusters.Objects.DoorLock.Commands.LockDoor,
lockUnlockCmdRspPICS="DRLK.S.C00.Rsp",
lockUnlockText="LockDoor",
doAutoRelockTest=False)
async def run_drlk_test_2_3(self):
await self.run_drlk_test_common(lockUnlockCommand=Clusters.Objects.DoorLock.Commands.UnlockDoor,
lockUnlockCmdRspPICS="DRLK.S.C01.Rsp",
lockUnlockText="UnlockDoor",
doAutoRelockTest=True)
async def run_drlk_test_2_12(self):
await self.run_drlk_test_common(lockUnlockCommand=Clusters.Objects.DoorLock.Commands.UnboltDoor,
lockUnlockCmdRspPICS="DRLK.S.C27.Rsp",
lockUnlockText="UnboltDoor",
doAutoRelockTest=True)