| # |
| # Copyright (c) 2025 Project CHIP Authors |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import random |
| import string |
| |
| import test_plan_support |
| # === BEGIN CI TEST ARGUMENTS === |
| # test-runner-runs: |
| # run1: |
| # app: ${ALL_CLUSTERS_APP} |
| # app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json |
| # script-args: > |
| # --storage-path admin_storage.json |
| # --commissioning-method on-network |
| # --discriminator 1234 |
| # --passcode 20202021 |
| # --PICS src/app/tests/suites/certification/ci-pics-values |
| # --endpoint 1 |
| # --trace-to json:${TRACE_TEST_JSON}.json |
| # --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto |
| # factory-reset: true |
| # quiet: true |
| # === END CI TEST ARGUMENTS === |
| from mobly import asserts |
| from TC_TLS_Utils import TLSUtils |
| |
| import matter.clusters as Clusters |
| from matter.interaction_model import Status |
| from matter.testing.matter_testing import MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches |
| from matter.utils import CommissioningBuildingBlocks |
| |
| |
| class TC_TLSCLIENT(MatterBaseTest): |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| |
| class TwoFabricData: |
| def __init__(self, cr1_cmd: TLSUtils, cr2_cmd: TLSUtils): |
| self.cr1_cmd = cr1_cmd |
| self.cr2_cmd = cr2_cmd |
| |
| async def common_setup(self): |
| self.step(1) |
| endpoint = self.get_endpoint(default=1) |
| cr1_cmd = TLSUtils(self, endpoint=endpoint) |
| cr1 = self.default_controller |
| |
| self.step(2) |
| # Establishing CR2 controller |
| cr2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() |
| cr2_fabric_admin = cr2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=cr1.fabricId + 1) |
| cr2 = cr2_fabric_admin.NewController(nodeId=cr1.nodeId + 1) |
| cr2_dut_node_id = self.dut_node_id+1 |
| |
| self.step(3) |
| _, noc_resp, _ = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( |
| commissionerDevCtrl=cr1, newFabricDevCtrl=cr2, |
| existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id |
| ) |
| fabric_index_cr2 = noc_resp.fabricIndex |
| |
| cr2_cmd = TLSUtils(self, endpoint=endpoint, dev_ctrl=cr2, node_id=cr2_dut_node_id, fabric_index=fabric_index_cr2) |
| return self.TwoFabricData(cr1_cmd=cr1_cmd, cr2_cmd=cr2_cmd) |
| |
| def get_common_steps(self) -> list[TestStep]: |
| return [ |
| TestStep(1, test_plan_support.commission_if_required('CR1'), is_commissioning=True), |
| TestStep(2, test_plan_support.open_commissioning_window()), |
| TestStep(3, test_plan_support.commission_from_existing('CR1', 'CR2')), |
| ] |
| |
| def pics_TC_TLSCLIENT_2_1(self): |
| return ["TLSCLIENT.S"] |
| |
| def desc_TC_TLSCLIENT_2_1(self) -> str: |
| return "[TC-TLSCLIENT-2.1] Attributes with Server as DUT" |
| |
| def steps_TC_TLSCLIENT_2_1(self) -> list[TestStep]: |
| return [ |
| TestStep(1, test_plan_support.commission_if_required('CR1'), is_commissioning=True), |
| TestStep(2, "TH reads MaxProvisioned attribute", "DUT replies with an uint8 value between 5 and 254."), |
| TestStep(3, "TH reads ProvisionedEndpoints attribute", "DUT replies with an empty list of TLSEndpointStruct."), |
| ] |
| |
| @run_if_endpoint_matches(has_cluster(Clusters.TlsClientManagement)) |
| async def test_TC_TLSCLIENT_2_1(self): |
| self.step(1) |
| attributes = Clusters.TlsClientManagement.Attributes |
| cr1_cmd = TLSUtils(self, endpoint=self.get_endpoint(default=1)) |
| |
| self.step(2) |
| max_provisioned = await cr1_cmd.read_tls_client_attribute(attributes.MaxProvisioned) |
| asserts.assert_greater_equal(max_provisioned, 5, "MaxProvisioned should be >= 5") |
| asserts.assert_less_equal(max_provisioned, 254, "MaxProvisioned should be <= 254") |
| |
| self.step(3) |
| provisioned_endpoints = await cr1_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(provisioned_endpoints), 0, "ProvisionedEndpoints should be empty") |
| |
| def pics_TC_TLSCLIENT_3_1(self): |
| return ["TLSCLIENT.S", "TLSCERT.S"] |
| |
| def desc_TC_TLSCLIENT_3_1(self) -> str: |
| return "[TC-TLSCLIENT-3.1] ProvisionEndpoint command basic insertion and modification" |
| |
| def steps_TC_TLSCLIENT_3_1(self) -> list[TestStep]: |
| return [ |
| *self.get_common_steps(), |
| TestStep(4, "Set myHostname to a valid value."), |
| TestStep(5, "Populate myPort[] 3 distinct valid values."), |
| TestStep(6, "Populate myRootCert[] with 2 distinct valid values."), |
| TestStep(7, "CR1 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[0].", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid[0]."), |
| TestStep(8, "CR2 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[1].", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid[1]."), |
| TestStep(9, "Populate myNonce[] with 2 distinct, random 32-octet values"), |
| TestStep(10, "CR1 sends ClientCSR command with Nonce set to myNonce[0]", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[0] and CSR in myCsr[0]."), |
| TestStep(11, "CR2 sends ClientCSR command with Nonce set to myNonce[1].", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[1] and CSR in myCsr[1]."), |
| TestStep(12, |
| "Populate myClientCert[] with 2 distinct, valid, self-signed, DER-encoded x509 certificates using each respective public key from myCsr[i]."), |
| TestStep( |
| 13, "CR1 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[0] and ClientCertificateDetails set to myClientCert[0].", test_plan_support.verify_success()), |
| TestStep( |
| 14, "CR2 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[1] and ClientCertificateDetails set to myClientCert[1].", test_plan_support.verify_success()), |
| TestStep(15, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[0]."), |
| TestStep(16, "CR1 reads ProvisionedEndpoints attribute.", |
| "DUT replies with a list of TLSEndpointStruct with one entry. The entry should correspond to (myEndpoint[1], myHostname, myPort[0], myCaid[0], myCcdid[0], myReferenceCount) where myReferenceCount is 0"), |
| TestStep(17, "CR2 reads ProvisionedEndpoints attribute.", "DUT replies with an empty list of TLSEndpointStruct."), |
| TestStep(18, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[1], CAID myCaid[1], CCDID myCcdid[1] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value equal to myEndpoint[0]."), |
| TestStep(19, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[2], CAID myCaid[0], CCDID myCcdid[0] and EndpointID myEndpoint[0].", |
| "DUT replies with a TLSEndpointID value equal to myEndpoint[0]."), |
| TestStep(20, "CR1 reads ProvisionedEndpoints attribute.", |
| "DUT replies with a list of TLSEndpointStruct with one entry. The entry should correspond to (myEndpoint[0], myHostname, myPort[2], myCaid[0], myCcdid[0], myReferenceCount) where myReferenceCount is 0"), |
| TestStep(21, "CR2 reads ProvisionedEndpoints attribute.", |
| "DUT replies with a list of TLSEndpointStruct with one entry. The entry should correspond to (myEndpoint[1], myHostname, myPort[1], myCaid[0], myCcdid[1], myReferenceCount) where myReferenceCount is 0"), |
| TestStep( |
| 22, "CR1 sends RemoveEndpoint command with EndpointID set to myEndpoint[0].", test_plan_support.verify_success()), |
| TestStep(23, test_plan_support.remove_fabric('CR2', 'CR1'), test_plan_support.verify_success()), |
| ] |
| |
| @run_if_endpoint_matches(has_cluster(Clusters.TlsClientManagement)) |
| async def test_TC_TLSCLIENT_3_1(self): |
| setup_data = await self.common_setup() |
| cr1_cmd = setup_data.cr1_cmd |
| cr2_cmd = setup_data.cr2_cmd |
| attributes = Clusters.TlsClientManagement.Attributes |
| |
| self.step(4) |
| my_hostname = b"myhostname.matter.com" |
| |
| self.step(5) |
| my_port = [1234, 5678, 9012] |
| |
| self.step(6) |
| my_root_cert = [cr1_cmd.gen_cert(), cr1_cmd.gen_cert()] |
| my_caid = [None, None] |
| |
| self.step(7) |
| res = await cr1_cmd.send_provision_root_command(certificate=my_root_cert[0]) |
| my_caid[0] = res.caid |
| |
| self.step(8) |
| res = await cr2_cmd.send_provision_root_command(certificate=my_root_cert[1]) |
| my_caid[1] = res.caid |
| |
| self.step(9) |
| my_nonce = [random.randbytes(32), random.randbytes(32)] |
| my_ccdid = [None, None] |
| my_csr = [None, None] |
| |
| self.step(10) |
| response = await cr1_cmd.send_csr_command(nonce=my_nonce[0]) |
| cr1_cmd.assert_valid_ccdid(response.ccdid) |
| my_ccdid[0] = response.ccdid |
| my_csr[0] = cr2_cmd.assert_valid_csr(response, my_nonce[0]) |
| |
| self.step(11) |
| response = await cr2_cmd.send_csr_command(nonce=my_nonce[1]) |
| cr2_cmd.assert_valid_ccdid(response.ccdid) |
| my_ccdid[1] = response.ccdid |
| my_csr[1] = cr2_cmd.assert_valid_csr(response, my_nonce[1]) |
| |
| self.step(12) |
| my_client_cert = [None, None] |
| root = cr1_cmd.get_key() |
| for i in range(2): |
| my_client_cert[i] = cr1_cmd.gen_cert_with_key(root, public_key=my_csr[i].public_key(), subject=my_csr[i].subject) |
| |
| self.step(13) |
| await cr1_cmd.send_provision_client_command(ccdid=my_ccdid[0], certificate=my_client_cert[0]) |
| |
| self.step(14) |
| await cr2_cmd.send_provision_client_command(ccdid=my_ccdid[1], certificate=my_client_cert[1]) |
| |
| self.step(15) |
| my_endpoint = [None, None] |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[0] = res.endpointID |
| |
| self.step(16) |
| endpoints = await cr1_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints), 1) |
| asserts.assert_equal(endpoints[0].endpointID, my_endpoint[0]) |
| asserts.assert_equal(endpoints[0].hostname, my_hostname) |
| asserts.assert_equal(endpoints[0].port, my_port[0]) |
| asserts.assert_equal(endpoints[0].caid, my_caid[0]) |
| asserts.assert_equal(endpoints[0].ccdid, my_ccdid[0]) |
| asserts.assert_equal(endpoints[0].referenceCount, 0) |
| |
| self.step(17) |
| endpoints = await cr2_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints), 0) |
| |
| self.step(18) |
| res = await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[1], caid=my_caid[1], ccdid=my_ccdid[1]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[1] = res.endpointID |
| |
| self.step(19) |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[2], caid=my_caid[0], ccdid=my_ccdid[0], endpoint_id=my_endpoint[0]) |
| asserts.assert_equal(res.endpointID, my_endpoint[0]) |
| |
| self.step(20) |
| endpoints = await cr1_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints), 1) |
| asserts.assert_equal(endpoints[0].endpointID, my_endpoint[0]) |
| asserts.assert_equal(endpoints[0].hostname, my_hostname) |
| asserts.assert_equal(endpoints[0].port, my_port[2]) |
| asserts.assert_equal(endpoints[0].caid, my_caid[0]) |
| asserts.assert_equal(endpoints[0].ccdid, my_ccdid[0]) |
| asserts.assert_equal(endpoints[0].referenceCount, 0) |
| |
| self.step(21) |
| endpoints = await cr2_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints), 1) |
| asserts.assert_equal(endpoints[0].endpointID, my_endpoint[1]) |
| asserts.assert_equal(endpoints[0].hostname, my_hostname) |
| asserts.assert_equal(endpoints[0].port, my_port[1]) |
| asserts.assert_equal(endpoints[0].caid, my_caid[1]) |
| asserts.assert_equal(endpoints[0].ccdid, my_ccdid[1]) |
| asserts.assert_equal(endpoints[0].referenceCount, 0) |
| |
| self.step(22) |
| await cr1_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[0]) |
| |
| self.step(23) |
| await cr1_cmd.send_remove_fabric_command(cr2_cmd.fabric_index) |
| |
| def pics_TC_TLSCLIENT_3_2(self): |
| return ["TLSCLIENT.S", "TLSCERT.S"] |
| |
| def desc_TC_TLSCLIENT_3_2(self) -> str: |
| return "[TC-TLSCLIENT-3.2] ProvisionEndpoint command verification with several entries" |
| |
| def steps_TC_TLSCLIENT_3_2(self) -> list[TestStep]: |
| return [ |
| *self.get_common_steps(), |
| TestStep(4, "Set myHostname to a valid value."), |
| TestStep(5, "Populate myPort[] with myMaxProvisioned + 3 distinct valid values."), |
| TestStep(6, "Populate myRootCert[] with 2 distinct, valid, self-signed, DER-encoded x509 certificates"), |
| TestStep( |
| 7, "CR1 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[0].", "DUT replies with a TLSCAID value. Store the returned value as myCaid[0]."), |
| TestStep( |
| 8, "CR2 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[1].", "DUT replies with a TLSCAID value. Store the returned value as myCaid[1]."), |
| TestStep(9, "Populate myNonce[] with 2 distinct, random 32-octet values"), |
| TestStep(10, "CR1 sends ClientCSR command with Nonce set to myNonce[0]", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[0] and CSR in myCsr[0]."), |
| TestStep(11, "CR2 sends ClientCSR command with Nonce set to myNonce[1].", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[1] and CSR in myCsr[1]."), |
| TestStep(12, |
| "Populate myClientCert[] with 2 distinct, valid, self-signed, DER-encoded x509 certificates using each respective public key from myCsr[i]."), |
| TestStep( |
| 13, "CR1 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[0] and ClientCertificateDetails set to myClientCert[0].", test_plan_support.verify_success()), |
| TestStep( |
| 14, "CR2 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[1] and ClientCertificateDetails set to myClientCert[1].", test_plan_support.verify_success()), |
| TestStep(15, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[0]."), |
| TestStep(16, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[i], CAID myCaid[1], CCDID myCcdid[1] and null EndpointID, for i in (1..myMaxProvisioned+1).", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[i]."), |
| TestStep(17, "CR1 reads ProvisionedEndpoints attribute.", |
| "DUT replies with a list of TLSEndpointStruct with 2 entries. The entries should correspond to (myEndpoint[i], myHostname, myPort[i], myCaid[0], myCcdid[0], myReferenceCount) where myReferenceCount is 0, and i is [0..1]."), |
| TestStep(18, "CR2 reads ProvisionedEndpoints attribute.", |
| "DUT replies with a list of TLSEndpointStruct with myMaxProvisioned entries. The entries should correspond to (myEndpoint[i], myHostname, myPort[i], myCaid[1], myCcdid[1], myReferenceCount) where myReferenceCount is 0, and i is [2..myMaxProvisioned-1]."), |
| TestStep(19, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[myMaxProvisioned], CAID myCaid[1], CCDID myCcdid[1] and null EndpointID.", test_plan_support.verify_status( |
| Status.ResourceExhausted)), |
| TestStep(20, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[myMaxProvisioned+1], CAID myCaid[1], CCDID myCcdid[1] and EndpointID set to myEndpoint[1]", |
| "DUT replies with a TLSEndpointID value equal to myEndpoint[1]"), |
| TestStep( |
| 21, "CR2 sends RemoveEndpoint command with EndpointID set to myEndpoint[2]", test_plan_support.verify_success()), |
| TestStep(22, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[myMaxProvisioned+2], CAID myCaid[1], CCDID myCcdid[1] and null EndpointID, for i in (1..myMaxProvisioned+1).", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[myMaxProvisioned+1]."), |
| TestStep(23, "CR2 reads ProvisionedEndpoints attribute.", |
| "DUT replies with a list of TLSEndpointStruct with myMaxProvisioned entries. The entries should correspond to (myEndpoint[i], myHostname, myPort[i], myCaid[1], myCcdid[1], myReferenceCount) where myReferenceCount is 0, and i is [3..myMaxProvisioned+1]."), |
| TestStep( |
| 24, "CR1 sends RemoveEndpoint command with EndpointID set to myEndpoint[i] for i in (0..2).", test_plan_support.verify_success()), |
| TestStep(25, test_plan_support.remove_fabric('CR2', 'CR1'), test_plan_support.verify_success()), |
| ] |
| |
| @run_if_endpoint_matches(has_cluster(Clusters.TlsClientManagement)) |
| async def test_TC_TLSCLIENT_3_2(self): |
| setup_data = await self.common_setup() |
| cr1_cmd = setup_data.cr1_cmd |
| cr2_cmd = setup_data.cr2_cmd |
| attributes = Clusters.TlsClientManagement.Attributes |
| |
| max_provisioned = await cr1_cmd.read_tls_client_attribute(attributes.MaxProvisioned) |
| |
| self.step(4) |
| my_hostname = b"myhostname.matter.com" |
| |
| self.step(5) |
| my_port = [1000 + i for i in range(max_provisioned + 3)] |
| |
| self.step(6) |
| my_root_cert = [cr1_cmd.gen_cert(), cr1_cmd.gen_cert()] |
| my_caid = [None, None] |
| |
| self.step(7) |
| res = await cr1_cmd.send_provision_root_command(certificate=my_root_cert[0]) |
| my_caid[0] = res.caid |
| |
| self.step(8) |
| res = await cr2_cmd.send_provision_root_command(certificate=my_root_cert[1]) |
| my_caid[1] = res.caid |
| |
| self.step(9) |
| my_nonce = [random.randbytes(32), random.randbytes(32)] |
| my_ccdid = [None, None] |
| my_csr = [None, None] |
| |
| self.step(10) |
| response = await cr1_cmd.send_csr_command(nonce=my_nonce[0]) |
| cr1_cmd.assert_valid_ccdid(response.ccdid) |
| my_ccdid[0] = response.ccdid |
| my_csr[0] = cr2_cmd.assert_valid_csr(response, my_nonce[0]) |
| |
| self.step(11) |
| response = await cr2_cmd.send_csr_command(nonce=my_nonce[1]) |
| cr2_cmd.assert_valid_ccdid(response.ccdid) |
| my_ccdid[1] = response.ccdid |
| my_csr[1] = cr2_cmd.assert_valid_csr(response, my_nonce[1]) |
| |
| self.step(12) |
| my_client_cert = [None, None] |
| root = cr1_cmd.get_key() |
| for i in range(2): |
| my_client_cert[i] = cr1_cmd.gen_cert_with_key(root, public_key=my_csr[i].public_key(), subject=my_csr[i].subject) |
| |
| self.step(13) |
| await cr1_cmd.send_provision_client_command(ccdid=my_ccdid[0], certificate=my_client_cert[0]) |
| |
| self.step(14) |
| await cr2_cmd.send_provision_client_command(ccdid=my_ccdid[1], certificate=my_client_cert[1]) |
| my_endpoint = [None] * (max_provisioned + 2) |
| |
| self.step(15) |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[0] = res.endpointID |
| |
| self.step(16) |
| for i in range(1, max_provisioned+1): |
| res = await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[i], caid=my_caid[1], ccdid=my_ccdid[1]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[i] = res.endpointID |
| |
| self.step(17) |
| endpoints1 = await cr1_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints1), 1) |
| |
| self.step(18) |
| endpoints2 = await cr2_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints2), max_provisioned) |
| |
| self.step(19) |
| await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[max_provisioned+1], caid=my_caid[1], ccdid=my_ccdid[1], expected_status=Status.ResourceExhausted) |
| |
| self.step(20) |
| res = await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[max_provisioned+1], caid=my_caid[1], ccdid=my_ccdid[1], endpoint_id=my_endpoint[1]) |
| asserts.assert_equal(res.endpointID, my_endpoint[1]) |
| |
| self.step(21) |
| await cr2_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[1]) |
| |
| self.step(22) |
| res = await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[max_provisioned+2], caid=my_caid[1], ccdid=my_ccdid[1]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[max_provisioned+1] = res.endpointID |
| |
| self.step(23) |
| endpoints2 = await cr2_cmd.read_tls_client_attribute(attributes.ProvisionedEndpoints) |
| asserts.assert_equal(len(endpoints2), max_provisioned) |
| found_endpoints = dict() |
| for ep in endpoints2: |
| found_endpoints[ep.endpointID] = ep |
| for i in range(2, max_provisioned+2): |
| ep = found_endpoints[my_endpoint[i]] |
| asserts.assert_equal(ep.hostname, my_hostname) |
| if i == max_provisioned+1: |
| port = my_port[max_provisioned+2] |
| else: |
| port = my_port[i] |
| asserts.assert_equal(ep.port, port) |
| asserts.assert_equal(ep.caid, my_caid[1]) |
| asserts.assert_equal(ep.ccdid, my_ccdid[1]) |
| asserts.assert_equal(ep.referenceCount, 0) |
| |
| self.step(24) |
| await cr1_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[0]) |
| |
| self.step(25) |
| await cr1_cmd.send_remove_fabric_command(cr2_cmd.fabric_index) |
| |
| def pics_TC_TLSCLIENT_3_3(self): |
| return ["TLSCLIENT.S", "TLSCERT.S"] |
| |
| def desc_TC_TLSCLIENT_3_3(self) -> str: |
| return "[TC-TLSCLIENT-3.3] ProvisionEndpoint command verification with bad arguments" |
| |
| def steps_TC_TLSCLIENT_3_3(self) -> list[TestStep]: |
| return [ |
| *self.get_common_steps(), |
| TestStep(4, "Set myShortHostname to a value shorter than 4."), |
| TestStep(5, "Set myLongHostname to a value longer than 253."), |
| TestStep(6, "Set myHostname to a valid value."), |
| TestStep(7, "Populate myPort[] with 2 distinct valid values."), |
| TestStep(8, "Populate myRootCert[] with 2 distinct valid values."), |
| TestStep(9, "CR1 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[0].", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid[0]."), |
| TestStep(10, "CR2 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[1].", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid[1]."), |
| TestStep(11, "Populate myNonce[] with 2 distinct, random 32-octet values"), |
| TestStep(12, "CR1 sends ClientCSR command with Nonce set to myNonce[0]", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[0] and CSR in myCsr[0]."), |
| TestStep(13, "CR2 sends ClientCSR command with Nonce set to myNonce[1].", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[1] and CSR in myCsr[1]."), |
| TestStep(14, |
| "Populate myClientCert[] with 2 distinct, valid, self-signed, DER-encoded x509 certificates using each respective public key from myCsr[i]."), |
| TestStep(15, "CR1 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[0] and ClientCertificateDetails set to myClientCert[0].", |
| test_plan_support.verify_success()), |
| TestStep(16, "CR2 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[1] and ClientCertificateDetails set to myClientCert[1].", |
| test_plan_support.verify_success()), |
| TestStep(17, "CR1 sends ProvisionEndpoint command with valid Hostname myShortHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| test_plan_support.verify_status(Status.ConstraintError)), |
| TestStep(18, "CR1 sends ProvisionEndpoint command with valid Hostname myLongHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| test_plan_support.verify_status(Status.ConstraintError)), |
| TestStep(19, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[1] and null EndpointID.", |
| "DUT replies with the cluster-specific error RootCertificateNotFound."), |
| TestStep(20, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[1], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with the cluster-specific error ClientCertificateNotFound."), |
| TestStep(21, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[0]."), |
| TestStep(22, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with the cluster-specific error EndpointAlreadyInstalled."), |
| TestStep(23, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[1], CCDID myCcdid[1] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value"), |
| TestStep(24, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[1], CAID myCaid[0], CCDID myCcdid[0] and EndpointID set to myEndpoint + 1.", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(25, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[1], CAID myCaid[1], CCDID myCcdid[1] and EndpointID set to myEndpoint.", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(26, test_plan_support.remove_fabric('CR2', 'CR1'), test_plan_support.verify_success()), |
| ] |
| |
| @run_if_endpoint_matches(has_cluster(Clusters.TlsClientManagement)) |
| async def test_TC_TLSCLIENT_3_3(self): |
| setup_data = await self.common_setup() |
| cr1_cmd = setup_data.cr1_cmd |
| cr2_cmd = setup_data.cr2_cmd |
| |
| self.step(4) |
| my_short_hostname = b"abc" |
| |
| self.step(5) |
| my_long_hostname = ''.join(random.choices( |
| string.ascii_letters + string.digits, k=254)).encode('utf-8') |
| |
| self.step(6) |
| my_hostname = b"myhostname.matter.com" |
| |
| self.step(7) |
| my_port = [1234, 5678] |
| |
| self.step(8) |
| my_root_cert = [cr1_cmd.gen_cert(), cr2_cmd.gen_cert()] |
| my_caid = [None, None] |
| |
| self.step(9) |
| res = await cr1_cmd.send_provision_root_command(certificate=my_root_cert[0]) |
| my_caid[0] = res.caid |
| |
| self.step(10) |
| res = await cr2_cmd.send_provision_root_command(certificate=my_root_cert[1]) |
| my_caid[1] = res.caid |
| |
| self.step(11) |
| my_nonce = [random.randbytes(32), random.randbytes(32)] |
| my_ccdid = [None, None] |
| my_csr = [None, None] |
| |
| self.step(12) |
| response = await cr1_cmd.send_csr_command(nonce=my_nonce[0]) |
| my_ccdid[0] = response.ccdid |
| my_csr[0] = cr1_cmd.assert_valid_csr(response, my_nonce[0]) |
| |
| self.step(13) |
| response = await cr2_cmd.send_csr_command(nonce=my_nonce[1]) |
| my_ccdid[1] = response.ccdid |
| my_csr[1] = cr2_cmd.assert_valid_csr(response, my_nonce[1]) |
| |
| self.step(14) |
| my_client_cert = [None, None] |
| root = cr1_cmd.get_key() |
| for i in range(2): |
| my_client_cert[i] = cr1_cmd.gen_cert_with_key(root, public_key=my_csr[i].public_key(), subject=my_csr[i].subject) |
| |
| self.step(15) |
| await cr1_cmd.send_provision_client_command(ccdid=my_ccdid[0], certificate=my_client_cert[0]) |
| |
| self.step(16) |
| await cr2_cmd.send_provision_client_command(ccdid=my_ccdid[1], certificate=my_client_cert[1]) |
| |
| self.step(17) |
| await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_short_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0], expected_status=Status.ConstraintError) |
| |
| self.step(18) |
| await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_long_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0], expected_status=Status.ConstraintError) |
| |
| self.step(19) |
| await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[1], expected_status=Clusters.TlsClientManagement.Enums.StatusCodeEnum.kRootCertificateNotFound) |
| |
| self.step(20) |
| await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[1], ccdid=my_ccdid[0], expected_status=Clusters.TlsClientManagement.Enums.StatusCodeEnum.kClientCertificateNotFound) |
| |
| self.step(21) |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint_id = res.endpointID |
| |
| self.step(22) |
| await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0], expected_status=Clusters.TlsClientManagement.Enums.StatusCodeEnum.kEndpointAlreadyInstalled) |
| |
| self.step(23) |
| await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[1], ccdid=my_ccdid[1]) |
| |
| self.step(24) |
| await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[1], caid=my_caid[0], ccdid=my_ccdid[0], endpoint_id=my_endpoint_id + 1, expected_status=Status.NotFound) |
| |
| self.step(25) |
| await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[1], caid=my_caid[1], ccdid=my_ccdid[1], endpoint_id=my_endpoint_id, expected_status=Status.NotFound) |
| |
| self.step(26) |
| await cr1_cmd.send_remove_fabric_command(cr2_cmd.fabric_index) |
| |
| def pics_TC_TLSCLIENT_3_10(self): |
| return ["TLSCLIENT.S", "TLSCERT.S"] |
| |
| def desc_TC_TLSCLIENT_3_10(self) -> str: |
| return "[TC-TLSCLIENT-3.10] FindEndpoint command verification" |
| |
| def steps_TC_TLSCLIENT_3_10(self) -> list[TestStep]: |
| return [ |
| *self.get_common_steps(), |
| TestStep(4, "Set myHostname to a valid value."), |
| TestStep(5, "Set myPort to a valid port value."), |
| TestStep(6, "Set myRootCert to a valid value."), |
| TestStep(7, "CR1 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert.", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid."), |
| TestStep(8, "Populate myNonce with a distinct, random 32-octet values"), |
| TestStep(9, "CR1 sends ClientCSR command with Nonce set to myNonce", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid and CSR in myCsr."), |
| TestStep(10, |
| "Populate myClientCert with a distinct, valid, self-signed, DER-encoded x509 certificates using each respective public key from myCsr."), |
| TestStep(11, "CR1 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid and ClientCertificateDetails set to myClientCert.", |
| test_plan_support.verify_success()), |
| TestStep(12, "CR1 sends FindEndpoint command with EndpointID 0.", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(13, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort, CAID myCaid, CCDID myCcdid and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint."), |
| TestStep(14, "CR1 sends FindEndpoint command with EndpointID set to myEndpoint.", |
| "DUT replies with a TLSEndpointStruct with value (myEndpoint, myHostname, myPort, myCaid, myCcdid, myReferenceCount), where myReferenceCount is 0."), |
| TestStep(15, "CR1 sends FindEndpoint command with EndpointID set to myEndpoint + 1.", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(16, "CR2 sends FindEndpoint command with EndpointID set to myEndpoint.", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep( |
| 17, "CR1 sends RemoveEndpoint command with EndpointID set to myEndpoint[0].", test_plan_support.verify_success()), |
| TestStep(18, test_plan_support.remove_fabric('CR2', 'CR1'), test_plan_support.verify_success()), |
| ] |
| |
| @run_if_endpoint_matches(has_cluster(Clusters.TlsClientManagement)) |
| async def test_TC_TLSCLIENT_3_10(self): |
| setup_data = await self.common_setup() |
| cr1_cmd = setup_data.cr1_cmd |
| cr2_cmd = setup_data.cr2_cmd |
| |
| self.step(4) |
| my_hostname = b"myhostname.matter.com" |
| |
| self.step(5) |
| my_port = 1234 |
| |
| self.step(6) |
| my_root_cert = cr1_cmd.gen_cert() |
| |
| self.step(7) |
| res = await cr1_cmd.send_provision_root_command(certificate=my_root_cert) |
| my_caid = res.caid |
| |
| self.step(8) |
| my_nonce = random.randbytes(32) |
| |
| self.step(9) |
| res = await cr1_cmd.send_csr_command(nonce=my_nonce) |
| my_ccdid = res.ccdid |
| csr = cr1_cmd.assert_valid_csr(res, my_nonce) |
| |
| self.step(10) |
| my_client_cert = cr1_cmd.gen_cert_with_key( |
| cr1_cmd.get_key(), public_key=csr.public_key(), subject=csr.subject) |
| |
| self.step(11) |
| await cr1_cmd.send_provision_client_command(ccdid=my_ccdid, certificate=my_client_cert) |
| |
| self.step(12) |
| await cr1_cmd.send_find_tls_endpoint_command(endpoint_id=0, expected_status=Status.NotFound) |
| |
| self.step(13) |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port, caid=my_caid, ccdid=my_ccdid) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint = res.endpointID |
| |
| self.step(14) |
| res = await cr1_cmd.send_find_tls_endpoint_command(endpoint_id=my_endpoint) |
| asserts.assert_equal(res.endpoint.endpointID, my_endpoint) |
| asserts.assert_equal(res.endpoint.hostname, my_hostname) |
| asserts.assert_equal(res.endpoint.port, my_port) |
| asserts.assert_equal(res.endpoint.caid, my_caid) |
| asserts.assert_equal(res.endpoint.ccdid, my_ccdid) |
| |
| self.step(15) |
| await cr1_cmd.send_find_tls_endpoint_command(endpoint_id=my_endpoint + 1, expected_status=Status.NotFound) |
| |
| self.step(16) |
| await cr2_cmd.send_find_tls_endpoint_command(endpoint_id=my_endpoint, expected_status=Status.NotFound) |
| |
| self.step(17) |
| await cr1_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint) |
| |
| self.step(18) |
| await cr1_cmd.send_remove_fabric_command(cr2_cmd.fabric_index) |
| |
| def pics_TC_TLSCLIENT_3_20(self): |
| return ["TLSCLIENT.S", "TLSCERT.S"] |
| |
| def desc_TC_TLSCLIENT_3_20(self) -> str: |
| return "[TC-TLSCLIENT-3.20] RemoveEndpoint command verification" |
| |
| def steps_TC_TLSCLIENT_3_20(self) -> list[TestStep]: |
| return [ |
| *self.get_common_steps(), |
| TestStep(4, "Set myHostname to a valid value."), |
| TestStep(5, "Populate myPort[] with 3 distinct valid port values."), |
| TestStep(6, "Populate myRootCert[] with 2 distinct valid values."), |
| TestStep(7, "CR1 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[0].", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid[0]."), |
| TestStep(8, "CR2 sends ProvisionRootCertificate command to the TLSCertificateManagementCluster with null CAID and Certificate set to myRootCert[1].", |
| "DUT replies with a TLSCAID value. Store the returned value as myCaid[1]."), |
| TestStep(9, "Populate myNonce[] with 2 distinct, random 32-octet values"), |
| TestStep(10, "CR1 sends ClientCSR command with Nonce set to myNonce[0]", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[0] and CSR in myCsr[0]."), |
| TestStep(11, "CR2 sends ClientCSR command with Nonce set to myNonce[1].", |
| "DUT replies with CCDID, CSR and Nonce. Store TLSCCDID in myCcdid[1] and CSR in myCsr[1]."), |
| TestStep(12, |
| "Populate myClientCert[] with 2 distinct, valid, self-signed, DER-encoded x509 certificates using each respective public key from myCsr[i]."), |
| TestStep(13, "CR1 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[0] and ClientCertificateDetails set to myClientCert[0].", |
| test_plan_support.verify_success()), |
| TestStep(14, "CR2 sends ProvisionClientCertificate command to the TLSCertificateManagementCluster with CCDID set to myCcdid[1] and ClientCertificateDetails set to myClientCert[1].", |
| test_plan_support.verify_success()), |
| TestStep(15, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[0], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[0]."), |
| TestStep(16, "CR1 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[1], CAID myCaid[0], CCDID myCcdid[0] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[1]."), |
| TestStep(17, "CR2 sends ProvisionEndpoint command with valid Hostname myHostname, Port myPort[2], CAID myCaid[1], CCDID myCcdid[1] and null EndpointID.", |
| "DUT replies with a TLSEndpointID value. Store the returned value as myEndpoint[2]."), |
| TestStep(18, "CR1 sends RemoveEndpoint command with EndpointID set to myEndpoint[0].", |
| test_plan_support.verify_success()), |
| TestStep(19, "CR1 sends FindEndpoint command with EndpointID set to myEndpoint[0].", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(20, "CR1 sends RemoveEndpoint command with EndpointID set to myEndpoint[0].", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(21, "CR2 sends RemoveEndpoint command with EndpointID set to myEndpoint[1].", |
| test_plan_support.verify_status(Status.NotFound)), |
| TestStep(22, "CR1 sends RemoveEndpoint command with EndpointID set to myEndpoint[1].", |
| test_plan_support.verify_success()), |
| TestStep(23, "CR2 sends RemoveRootCertificate command to the TLSCertificateManagementCluster with CAID myCaid[1].", |
| test_plan_support.verify_status(Status.InvalidInState)), |
| TestStep(24, "CR2 sends RemoveClientCertificate command to the TLSCertificateManagementCluster with CAID myCcdid[1].", |
| test_plan_support.verify_status(Status.InvalidInState)), |
| TestStep(25, "CR2 sends RemoveEndpoint command with EndpointID set to myEndpoint[2].", |
| test_plan_support.verify_success()), |
| TestStep(26, "CR2 sends RemoveRootCertificate command to the TLSCertificateManagementCluster with CAID myCaid[1].", |
| test_plan_support.verify_success()), |
| TestStep(27, "CR2 sends RemoveClientCertificate command to the TLSCertificateManagementCluster with CAID myCcdid[1].", |
| test_plan_support.verify_success()), |
| TestStep(28, test_plan_support.remove_fabric('CR2', 'CR1'), test_plan_support.verify_success()), |
| ] |
| |
| @run_if_endpoint_matches(has_cluster(Clusters.TlsClientManagement)) |
| async def test_TC_TLSCLIENT_3_20(self): |
| setup_data = await self.common_setup() |
| cr1_cmd = setup_data.cr1_cmd |
| cr2_cmd = setup_data.cr2_cmd |
| |
| self.step(4) |
| my_hostname = b"myhostname.matter.com" |
| |
| self.step(5) |
| my_port = [1000 + i for i in range(3)] |
| |
| self.step(6) |
| my_root_cert = [cr1_cmd.gen_cert(), cr2_cmd.gen_cert()] |
| my_caid = [None, None] |
| |
| self.step(7) |
| res = await cr1_cmd.send_provision_root_command(certificate=my_root_cert[0]) |
| my_caid[0] = res.caid |
| |
| self.step(8) |
| res = await cr2_cmd.send_provision_root_command(certificate=my_root_cert[1]) |
| my_caid[1] = res.caid |
| |
| self.step(9) |
| my_nonce = [random.randbytes(32), random.randbytes(32)] |
| my_ccdid = [None, None] |
| my_csr = [None, None] |
| |
| self.step(10) |
| res = await cr1_cmd.send_csr_command(nonce=my_nonce[0]) |
| my_ccdid[0] = res.ccdid |
| my_csr[0] = cr1_cmd.assert_valid_csr(res, my_nonce[0]) |
| |
| self.step(11) |
| res = await cr2_cmd.send_csr_command(nonce=my_nonce[1]) |
| my_ccdid[1] = res.ccdid |
| my_csr[1] = cr2_cmd.assert_valid_csr(res, my_nonce[1]) |
| |
| self.step(12) |
| my_client_cert = [None, None] |
| root = cr1_cmd.get_key() |
| for i in range(2): |
| my_client_cert[i] = cr1_cmd.gen_cert_with_key(root, public_key=my_csr[i].public_key(), subject=my_csr[i].subject) |
| |
| self.step(13) |
| await cr1_cmd.send_provision_client_command(ccdid=my_ccdid[0], certificate=my_client_cert[0]) |
| |
| self.step(14) |
| await cr2_cmd.send_provision_client_command(ccdid=my_ccdid[1], certificate=my_client_cert[1]) |
| |
| my_endpoint = [None] * 3 |
| |
| self.step(15) |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[0], caid=my_caid[0], ccdid=my_ccdid[0]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[0] = res.endpointID |
| |
| self.step(16) |
| res = await cr1_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[1], caid=my_caid[0], ccdid=my_ccdid[0]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[1] = res.endpointID |
| |
| self.step(17) |
| res = await cr2_cmd.send_provision_tls_endpoint_command(hostname=my_hostname, port=my_port[2], caid=my_caid[1], ccdid=my_ccdid[1]) |
| asserts.assert_is_not_none(res.endpointID) |
| my_endpoint[2] = res.endpointID |
| |
| self.step(18) |
| await cr1_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[0]) |
| |
| self.step(19) |
| await cr1_cmd.send_find_tls_endpoint_command(endpoint_id=my_endpoint[0], expected_status=Status.NotFound) |
| |
| self.step(20) |
| await cr1_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[0], expected_status=Status.NotFound) |
| |
| self.step(21) |
| await cr2_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[1], expected_status=Status.NotFound) |
| |
| self.step(22) |
| await cr1_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[1]) |
| |
| self.step(23) |
| await cr2_cmd.send_remove_root_command(caid=my_caid[1], expected_status=Status.InvalidInState) |
| |
| self.step(24) |
| await cr2_cmd.send_remove_client_command(ccdid=my_ccdid[1], expected_status=Status.InvalidInState) |
| |
| self.step(25) |
| await cr2_cmd.send_remove_tls_endpoint_command(endpoint_id=my_endpoint[2]) |
| |
| self.step(26) |
| await cr2_cmd.send_remove_root_command(caid=my_caid[1]) |
| |
| self.step(27) |
| await cr2_cmd.send_remove_client_command(ccdid=my_ccdid[1]) |
| |
| self.step(28) |
| await cr1_cmd.send_remove_fabric_command(cr2_cmd.fabric_index) |
| |
| |
| if __name__ == "__main__": |
| default_matter_test_main() |