blob: c3776772798989bff19489e10091cca8add124b5 [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --PICS src/app/tests/suites/certification/ci-pics-values
# --endpoint 1
# --app-pipe_prefix /tmp/chip_all_clusters_fifo_
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# === END CI TEST ARGUMENTS ===
import datetime
import random
import string
from typing import Optional, Union
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa, utils
from cryptography.x509 import CertificateBuilder, random_serial_number
from cryptography.x509.oid import NameOID
from ecdsa.curves import curve_by_name
from mobly import asserts
from pyasn1.codec.der.decoder import decode as der_decoder
from pyasn1.error import PyAsn1Error
from pyasn1_modules import rfc2986, rfc5480
import matter.clusters as Clusters
from matter import ChipDeviceCtrl
from matter.clusters.Types import Nullable, NullValue
from matter.interaction_model import InteractionModelError, Status
from matter.testing.conversions import hex_from_bytes
from matter.testing.matter_testing import (MatterBaseTest, TestStep, default_matter_test_main, has_cluster, run_if_endpoint_matches,
type_matches)
from matter.tlv import TLVWriter
from matter.utils import CommissioningBuildingBlocks
class TC_TLSCERT(MatterBaseTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# See https://github.com/project-chip/connectedhomeip/issues/39690
self.skip_cr2 = True
def gen_cert(self) -> bytes:
# Start generating the root certificate
root_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
rand_suffix = "".join(
random.choices(string.ascii_letters + string.digits, k=16)
)
root_cert_subject = x509.Name(
[
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "CSA"),
x509.NameAttribute(
NameOID.COMMON_NAME, "TC_PAVS root " + rand_suffix
),
]
)
cert = (
CertificateBuilder()
.subject_name(root_cert_subject)
.issuer_name(root_cert_subject)
.public_key(root_key.public_key())
.serial_number(random_serial_number())
.not_valid_before(datetime.datetime.now(datetime.timezone.utc))
.not_valid_after(
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365.25*20)
)
.add_extension(
# We make it so that our root can only issue leaf certificates, no intermediate here.
x509.BasicConstraints(ca=True, path_length=0), critical=True
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(
root_key.public_key()
),
critical=False,
)
.sign(root_key, hashes.SHA256())
)
return cert.public_bytes(serialization.Encoding.DER)
def assert_valid_caid(self, caid):
asserts.assert_greater_equal(caid, 0, "Invalid CAID returned")
asserts.assert_less_equal(caid, 65534, "Invalid CAID returned")
def assert_valid_ccdid(self, caid):
asserts.assert_greater_equal(caid, 0, "Invalid CCDID returned")
asserts.assert_less_equal(caid, 65534, "Invalid CCDID returned")
def assert_valid_csr(self, response: Clusters.TlsCertificateManagement.Commands.TLSClientCSRResponse, nonce: bytes):
# Verify der encoded and PKCS #10 (rfc2986 is PKCS #10) - next two requirements
try:
temp, _ = der_decoder(response.csr, asn1Spec=rfc2986.CertificationRequest())
except PyAsn1Error:
asserts.fail("Unable to decode CSR - improperly formatted DER")
layer1 = dict(temp)
# Verify public key is 256 bytes
csr = x509.load_der_x509_csr(response.csr)
csr_pubkey = csr.public_key()
asserts.assert_equal(csr_pubkey.key_size, 256, "Incorrect key size")
# Verify signature algorithm is ecdsa-with-SHA156
signature_algorithm = dict(layer1['signatureAlgorithm'])['algorithm']
asserts.assert_equal(signature_algorithm, rfc5480.ecdsa_with_SHA256, "CSR specifies incorrect signature key algorithm")
# Verify signature is valid
asserts.assert_true(csr.is_signature_valid, "Signature is invalid")
# Verify response.nonce is octet string of length 32
try:
# response.nonce is an octet string if it can be converted to an int
int(hex_from_bytes(response.nonce), 16)
except ValueError:
asserts.fail("Returned CSR nonce is not an octet string")
# Verify response.nonce is valid signature
nocsr_elements = dict(
[
(1, response.csr),
(2, nonce),
]
)
writer = TLVWriter()
writer.put(None, nocsr_elements)
baselen = curve_by_name("NIST256p").baselen
signature_raw_r = int(hex_from_bytes(response.nonce[:baselen]), 16)
signature_raw_s = int(hex_from_bytes(response.nonce[baselen:]), 16)
nocsr_signature = utils.encode_dss_signature(signature_raw_r, signature_raw_s)
csr_pubkey.verify(signature=nocsr_signature, data=writer.encoding, signature_algorithm=ec.ECDSA(hashes.SHA256()))
class TargetedFabric:
def __init__(self, matter_test: MatterBaseTest, endpoint: Optional[int] = None,
dev_ctrl: Optional[ChipDeviceCtrl.ChipDeviceController] = None,
node_id: Optional[int] = None):
self.test = matter_test
self.endpoint = endpoint
self.dev_ctrl = dev_ctrl
self.node_id = node_id
async def send_provision_root_command(
self, certificate: bytes,
expected_status: Status = Status.Success) -> Union[Clusters.TlsCertificateManagement.Commands.ProvisionRootCertificateResponse, InteractionModelError]:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.ProvisionRootCertificate(certificate=certificate),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
asserts.assert_true(type_matches(result, Clusters.TlsCertificateManagement.Commands.ProvisionRootCertificateResponse),
"Unexpected return type for ProvisionRootCertificate")
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def send_find_root_command(
self, caid: Union[Nullable, int] = NullValue,
expected_status: Status = Status.Success) -> Union[Clusters.TlsCertificateManagement.Commands.FindRootCertificateResponse, InteractionModelError]:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.FindRootCertificate(caid=caid),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
asserts.assert_true(type_matches(result, Clusters.TlsCertificateManagement.Commands.FindRootCertificateResponse),
"Unexpected return type for FindRootCertificate")
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def send_remove_root_command(
self, caid: int,
expected_status: Status = Status.Success) -> InteractionModelError:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.RemoveRootCertificate(caid=caid),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def send_csr_command(
self, nonce: bytes,
expected_status: Status = Status.Success) -> Union[Clusters.TlsCertificateManagement.Commands.TLSClientCSRResponse, InteractionModelError]:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.TLSClientCSR(nonce=nonce),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
asserts.assert_true(type_matches(result, Clusters.TlsCertificateManagement.Commands.TLSClientCSRResponse),
"Unexpected return type for TLSClientCSR")
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def send_provision_client_command(
self, certificate: bytes, ccdid: int,
expected_status: Status = Status.Success) -> InteractionModelError:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.ProvisionClientCertificate(ccdid=ccdid, clientCertificateDetails=Clusters.TlsCertificateManagement.Structs.TLSClientCertificateDetailStruct(clientCertificate=certificate)),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def send_find_client_command(
self, ccdid: Union[Nullable, int] = NullValue,
expected_status: Status = Status.Success) -> Union[Clusters.TlsCertificateManagement.Commands.FindClientCertificateResponse, InteractionModelError]:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.FindClientCertificate(ccdid=ccdid),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
asserts.assert_true(type_matches(result, Clusters.TlsCertificateManagement.Commands.FindClientCertificateResponse),
"Unexpected return type for FindClientCertificate")
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def send_remove_client_command(
self, ccdid: int,
expected_status: Status = Status.Success) -> InteractionModelError:
try:
result = await self.test.send_single_cmd(cmd=Clusters.TlsCertificateManagement.Commands.RemoveClientCertificate(ccdid=ccdid),
endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, payloadCapability=ChipDeviceCtrl.TransportPayloadCapability.LARGE_PAYLOAD)
return result
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
return e
async def read_tls_cert_attribute(self, attribute):
cluster = Clusters.TlsCertificateManagement
return await self.test.read_single_attribute_check_success(endpoint=self.endpoint, dev_ctrl=self.dev_ctrl, node_id=self.node_id, cluster=cluster, attribute=attribute)
def desc_TC_TLSCERT_3_1(self) -> str:
return "[TC-TLSCERT-3.1] ProvisionRootCertificate command basic insertion and modification"
def steps_TC_TLSCERT_3_1(self) -> list[TestStep]:
steps = [
TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "CR1 opens commissioning window on DUT",
"Commissioning window should open"),
TestStep(3, "CR2 fully commissions DUT_CE", "DUT should fully commission"),
TestStep(4, "Create two distinct, valid, self-signed, DER-encoded x509 certificates"),
TestStep(5, "Read ProvisionedRootCertificates"),
TestStep(6, "Sends the RemoveRootCertificate command for any certificates found in step 4"),
TestStep(7, "Sends the ProvisionRootCertificate command to the TlsCertificateManagement cluster",
"Verify that the DUT sends ProvisionRootCertificateResponse."),
TestStep(8, "Sends another ProvisionRootCertificate command to the TlsCertificateManagement cluster",
"Verify a new ID is generated."),
TestStep(9, "Read ProvisionedRootCertificates to from CR1"),
TestStep(10, "Read ProvisionedRootCertificates to from CR2"),
TestStep(11, "Read ProvisionedRootCertificates to make sure certificates from step 2 and 3 are present"),
TestStep(12, "Sends the FindRootCertificate command specifying a CAID from step 2",
"Verify the DUT sends FindRootCertificateResponse with both certificates"),
TestStep(13, "Sends the FindRootCertificate command specifying a CAID from step 3",
"Verify the DUT sends FindRootCertificateResponse with both certificates"),
TestStep(14, "Sends the FindRootCertificate command specifying null for CAID",
"Verify the DUT sends FindRootCertificateResponse with both certificates"),
TestStep(15, "Sends the RemoveRootCertificate for both certificates added",
"Verify the DUT sends success status"),
]
return steps
@run_if_endpoint_matches(has_cluster(Clusters.TlsCertificateManagement))
async def test_TC_TLSCERT_3_1(self):
endpoint = self.get_endpoint(default=1)
attributes = Clusters.TlsCertificateManagement.Attributes
self.step(1)
# Establishing CR1 controller
cr1 = self.default_controller
cr1_cmd = self.TargetedFabric(self, endpoint=endpoint)
# Establishing CR2 controller
cr2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr2_fabric_admin = cr2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=cr1.fabricId + 1)
cr2 = cr2_fabric_admin.NewController(nodeId=cr1.nodeId + 1)
cr2_dut_node_id = self.dut_node_id+1
cr2_cmd = self.TargetedFabric(self, endpoint=endpoint, dev_ctrl=cr2, node_id=cr2_dut_node_id)
self.step(2)
_, noc_resp, _ = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=cr1, newFabricDevCtrl=cr2,
existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id
)
fabric_index_cr2 = noc_resp.fabricIndex
self.step(3)
self.step(4)
first_cert = self.gen_cert()
second_cert = self.gen_cert()
self.step(5)
attribute_certs = await cr1_cmd.read_tls_cert_attribute(attributes.ProvisionedRootCertificates)
self.step(6)
for cert in attribute_certs:
await cr1_cmd.send_remove_root_command(caid=cert.caid)
self.step(7)
response = await cr1_cmd.send_provision_root_command(certificate=first_cert)
self.assert_valid_caid(response.caid)
self.step(8)
if not self.skip_cr2:
response2 = await cr2_cmd.send_provision_root_command(certificate=second_cert)
self.assert_valid_caid(response2.caid)
asserts.assert_not_equal(response.caid, response2.caid, "CAID should be unique")
self.step(9)
attribute_certs = await cr1_cmd.read_tls_cert_attribute(attributes.ProvisionedRootCertificates)
asserts.assert_greater_equal(len(attribute_certs), 1, "Expected at least 1 certificate")
found_certs = dict()
# Python controller is using TCP, which supports large payloads; need a way to test
# over MRP that certificate is not present
for cert in attribute_certs:
found_certs[cert.caid] = cert.certificate
asserts.assert_in(response.caid, found_certs, "ProvisionedRootCertificates should contain provisioned root cert")
asserts.assert_equal(found_certs[response.caid], first_cert, "Expected matching certificate detail")
self.step(10)
if not self.skip_cr2:
attribute_certs = await cr2_cmd.read_tls_cert_attribute(attribute=attributes.ProvisionedRootCertificates)
asserts.assert_greater_equal(len(attribute_certs), 1, "Expected at least 1 certificate")
found_certs = dict()
# Python controller is using TCP, which supports large payloads; need a way to test
# over MRP that certificate is not present
for cert in attribute_certs:
found_certs[cert.caid] = cert.certificate
asserts.assert_in(response2.caid, found_certs, "ProvisionedRootCertificates should contain provisioned root cert")
asserts.assert_equal(found_certs[response2.caid], second_cert, "Expected matching certificate detail")
self.step(11)
find_response = await cr1_cmd.send_find_root_command(caid=response.caid)
asserts.assert_equal(len(find_response.certificateDetails), 1, "Expected single certificate")
asserts.assert_equal(find_response.certificateDetails[0].certificate, first_cert)
self.step(12)
if not self.skip_cr2:
find_response2 = await cr2_cmd.send_find_root_command(caid=response2.caid)
asserts.assert_equal(len(find_response2.certificateDetails), 1, "Expected single certificate")
asserts.assert_equal(find_response2.certificateDetails[0].certificate,
second_cert, "Expected matching certificate detail")
self.step(13)
find_all_response = await cr1_cmd.send_find_root_command()
asserts.assert_greater_equal(len(find_all_response.certificateDetails), 1, "Expected at least 1 certificate")
found_certs = dict()
for cert in find_all_response.certificateDetails:
found_certs[cert.caid] = cert.certificate
asserts.assert_in(response.caid, found_certs, "FindRootCertificate should contain provisioned root cert")
asserts.assert_equal(found_certs[response.caid], first_cert, "Expected matching certificate detail")
self.step(14)
if not self.skip_cr2:
find_all_response = await cr2_cmd.send_find_root_command()
asserts.assert_greater_equal(len(find_all_response.certificateDetails), 1, "Expected at least 1 certificate")
found_certs = dict()
for cert in find_all_response.certificateDetails:
found_certs[cert.caid] = cert.certificate
asserts.assert_in(response2.caid, found_certs, "FindRootCertificate should contain provisioned root cert")
asserts.assert_equal(found_certs[response2.caid], second_cert, "Expected matching certificate detail")
self.step(15)
await cr1_cmd.send_remove_root_command(caid=response.caid)
if not self.skip_cr2:
await cr2_cmd.send_remove_root_command(caid=response2.caid)
resp = await self.send_single_cmd(cmd=Clusters.OperationalCredentials.Commands.RemoveFabric(fabric_index_cr2), endpoint=0)
asserts.assert_equal(
resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk)
def desc_TC_TLSCERT_3_1_8(self) -> str:
return "[TC-TLSCERT-3.1.8] ProvisionClientCertificate command verification"
def steps_TC_TLSCERT_3_1_8(self) -> list[TestStep]:
steps = [
TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "CR1 opens commissioning window on DUT",
"Commissioning window should open"),
TestStep(3, "Create two distinct, valid, self-signed, DER-encoded x509 certificates"),
TestStep(4, "Read ProvisionedClientCertificates"),
TestStep(5, "Sends the RemoveClientCertificate command for any certificates found in step 4"),
TestStep(6, "Sends the TLSClientCSR command"),
TestStep(7, "Validates the CSR and nonce signature"),
TestStep(8, "Sends the ProvisionClientCertificate command to the TlsCertificateManagement cluster",
"Verify that the DUT sends ProvisionClientCertificateResponse."),
TestStep(9, "Sends another ProvisionClientCertificate command to the TlsCertificateManagement cluster",
"Verify a new ID is generated."),
TestStep(10, "Read ProvisionedClientCertificates to from CR1"),
TestStep(11, "Read ProvisionedClientCertificates to from CR2"),
TestStep(12, "Read ProvisionedClientCertificates to make sure certificates from step 2 and 3 are present"),
TestStep(13, "Sends the FindClientCertificate command specifying a CCDID from step 2",
"Verify the DUT sends FindClientCertificateResponse with both certificates"),
TestStep(14, "Sends the FindClientCertificate command specifying a CCDID from step 3",
"Verify the DUT sends FindClientCertificateResponse with both certificates"),
TestStep(15, "Sends the FindClientCertificate command specifying null for CCDID",
"Verify the DUT sends FindClientCertificateResponse with both certificates"),
TestStep(16, "Sends the RemoveClientCertificate for both certificates added",
"Verify the DUT sends success status"),
]
return steps
@run_if_endpoint_matches(has_cluster(Clusters.TlsCertificateManagement))
async def test_TC_TLSCERT_3_1_8(self):
endpoint = self.get_endpoint(default=1)
attributes = Clusters.TlsCertificateManagement.Attributes
self.step(1)
# Establishing CR1 controller
cr1 = self.default_controller
cr1_cmd = self.TargetedFabric(self, endpoint=endpoint)
# Establishing CR2 controller
cr2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr2_fabric_admin = cr2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=cr1.fabricId + 1)
cr2 = cr2_fabric_admin.NewController(nodeId=cr1.nodeId + 1)
cr2_dut_node_id = self.dut_node_id+1
cr2_cmd = self.TargetedFabric(self, endpoint=endpoint, dev_ctrl=cr2, node_id=cr2_dut_node_id)
self.step(2)
_, noc_resp, _ = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=cr1, newFabricDevCtrl=cr2,
existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id
)
fabric_index_cr2 = noc_resp.fabricIndex
self.step(3)
first_nonce = random.randbytes(32)
first_cert = self.gen_cert()
second_nonce = random.randbytes(32)
second_cert = self.gen_cert()
self.step(4)
attribute_certs = await cr1_cmd.read_tls_cert_attribute(attributes.ProvisionedClientCertificates)
self.step(5)
for cert in attribute_certs:
await cr1_cmd.send_remove_client_command(ccdid=cert.ccdid)
self.step(6)
response = await cr1_cmd.send_csr_command(nonce=first_nonce)
self.assert_valid_ccdid(response.ccdid)
self.step(7)
self.assert_valid_csr(response, first_nonce)
self.step(8)
await cr1_cmd.send_provision_client_command(ccdid=response.ccdid, certificate=first_cert)
self.step(9)
if not self.skip_cr2:
response2 = await cr2_cmd.send_csr_command(nonce=second_nonce)
self.assert_valid_ccdid(response2.ccdid)
self.assert_valid_csr(response2, second_nonce)
asserts.assert_not_equal(response.ccdid, response2.ccdid, "CCDID should be unique")
await cr2_cmd.send_provision_client_command(ccdid=response2.ccdid, certificate=second_cert)
self.step(10)
attribute_certs = await cr1_cmd.read_tls_cert_attribute(attributes.ProvisionedClientCertificates)
asserts.assert_greater_equal(len(attribute_certs), 1, "Expected at least 1 certificate")
found_certs = dict()
# Python controller is using TCP, which supports large payloads; need a way to test
# over MRP that certificate is not present
for cert in attribute_certs:
found_certs[cert.ccdid] = cert.clientCertificate
asserts.assert_in(response.ccdid, found_certs, "ProvisionedClientCertificates should contain provisioned client cert")
asserts.assert_equal(found_certs[response.ccdid], first_cert, "Expected matching certificate detail")
self.step(11)
if not self.skip_cr2:
attribute_certs = await cr2_cmd.read_tls_cert_attribute(attribute=attributes.ProvisionedClientCertificates)
asserts.assert_greater_equal(len(attribute_certs), 1, "Expected at least 1 certificate")
found_certs = dict()
# Python controller is using TCP, which supports large payloads; need a way to test
# over MRP that certificate is not present
for cert in attribute_certs:
found_certs[cert.ccdid] = cert.clientCertificate
asserts.assert_in(response2.ccdid, found_certs, "ProvisionedClientCertificates should contain provisioned client cert")
asserts.assert_equal(found_certs[response2.ccdid], second_cert, "Expected matching certificate detail")
self.step(12)
find_response = await cr1_cmd.send_find_client_command(ccdid=response.ccdid)
asserts.assert_equal(len(find_response.certificateDetails), 1, "Expected single certificate")
asserts.assert_equal(find_response.certificateDetails[0].clientCertificate, first_cert)
self.step(13)
if not self.skip_cr2:
find_response2 = await cr2_cmd.send_find_client_command(ccdid=response.ccdid)
asserts.assert_equal(len(find_response2.certificateDetails), 1, "Expected single certificate")
asserts.assert_equal(find_response2.certificateDetails[0].clientCertificate,
second_cert, "Expected matching certificate detail")
self.step(14)
find_all_response = await cr1_cmd.send_find_client_command()
asserts.assert_greater_equal(len(find_all_response.certificateDetails), 1, "Expected at least 1 certificate")
found_certs = dict()
for cert in find_all_response.certificateDetails:
found_certs[cert.ccdid] = cert.clientCertificate
asserts.assert_in(response.ccdid, found_certs, "FindClientCertificate should contain provisioned client cert")
asserts.assert_equal(found_certs[response.ccdid], first_cert, "Expected matching certificate detail")
self.step(15)
if not self.skip_cr2:
find_all_response = await cr2_cmd.send_find_client_command()
asserts.assert_greater_equal(len(find_all_response.certificateDetails), 1, "Expected at least 1 certificate")
found_certs = dict()
for cert in find_all_response.certificateDetails:
found_certs[cert.ccdid] = cert.clientCertificate
asserts.assert_in(response2.ccdid, found_certs, "FindClientCertificate should contain provisioned client cert")
asserts.assert_equal(found_certs[response2.ccdid], second_cert, "Expected matching certificate detail")
self.step(16)
await cr1_cmd.send_remove_client_command(ccdid=response.ccdid)
if not self.skip_cr2:
await cr2_cmd.send_remove_client_command(ccdid=response2.ccdid)
resp = await self.send_single_cmd(cmd=Clusters.OperationalCredentials.Commands.RemoveFabric(fabric_index_cr2), endpoint=0)
asserts.assert_equal(
resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk)
if __name__ == "__main__":
default_matter_test_main()