blob: 567d757760428822108d89222f23faa197421196 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import random
import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.interaction_model import InteractionModelError, Status
from chip.tlv import TLVReader
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils
from ecdsa.curves import curve_by_name
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, hex_from_bytes, type_matches
from mobly import asserts
from pyasn1.codec.der.decoder import decode as der_decoder
from pyasn1.error import PyAsn1Error
from pyasn1_modules import rfc2986, rfc3279, rfc5480
class TC_DA_1_5(MatterBaseTest):
@async_test_body
async def test_TC_DA_1_5(self):
opcreds = Clusters.Objects.OperationalCredentials
gcomm = Clusters.Objects.GeneralCommissioning
self.print_step(1, "Commissioning, already done")
self.print_step(2, "Save attestation challenge")
proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False)
attestation_challenge = proxy.attestationChallenge
self.print_step(3, "Send CertificateChainRequest for DAC")
certtype = opcreds.Enums.CertificateChainTypeEnum.kDACCertificate
dac_resp = await self.send_single_cmd(cmd=opcreds.Commands.CertificateChainRequest(certificateType=certtype))
asserts.assert_true(type_matches(dac_resp, opcreds.Commands.CertificateChainResponse),
"Certificate request returned incorrect type")
der_dac = dac_resp.certificate
# This throws an exception for a non-x509 cert
try:
dac = x509.load_der_x509_certificate(der_dac)
except ValueError:
asserts.assert_true(False, "Unable to parse certificate from CertificateChainResponse")
self.print_step(4, "Send ArmFailSafe")
await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=1))
self.print_step(5, "Send CSRRequest")
csr_nonce = random.randbytes(32)
csr_resp = await self.send_single_cmd(cmd=opcreds.Commands.CSRRequest(CSRNonce=csr_nonce, isForUpdateNOC=False))
nocsr_elements = csr_resp.NOCSRElements
nocsr_attestation_signature_raw = csr_resp.attestationSignature
self.print_step(6, "Extract TLV")
decoded = TLVReader(nocsr_elements).get()["Any"]
# CSR is field 1, nonce is field 2
asserts.assert_in(1, decoded.keys(), "CSR is not present in the NOSCRElements")
asserts.assert_in(2, decoded.keys(), "Nonce is not present in the NOSCRElements")
csr_raw = decoded[1]
csr_nonce_returned = decoded[2]
vendor1 = None
vendor2 = None
vendor3 = None
if 3 in decoded.keys():
vendor1 = decoded[3]
if 4 in decoded.keys():
vendor2 = decoded[4]
if 5 in decoded.keys():
vendor3 = decoded[5]
# Verify that length of nocsr_elements is <= 900
asserts.assert_less_equal(len(nocsr_elements), 900, "NOCSRElements is more than 900 bytes")
# Verify der encoded and PKCS #10 (rfc2986 is PKCS #10) - next two requirements
try:
temp, _ = der_decoder(csr_raw, asn1Spec=rfc2986.CertificationRequest())
except PyAsn1Error:
asserts.fail("Unable to decode CSR - improperly formatted DER file")
layer1 = dict(temp)
info = dict(layer1['certificationRequestInfo'])
# Verify public key is id-ecPublicKey with prime256v1
requested_pk_algo = dict(dict(info['subjectPKInfo'])['algorithm'])
asserts.assert_equal(requested_pk_algo['algorithm'], rfc5480.id_ecPublicKey, "Incorrect public key algorithm")
der_parameters = requested_pk_algo['parameters']
temp, _ = der_decoder(bytes(der_parameters), asn1Spec=rfc3279.EcpkParameters())
parameters = dict(temp)
asserts.assert_in('namedCurve', parameters.keys(), "Unable to find namedCurve in EcpkParameters")
asserts.assert_equal(parameters['namedCurve'], rfc3279.prime256v1, "Incorrect curve specified for public key algorithm")
# Verify public key is 256 bytes
csr = x509.load_der_x509_csr(csr_raw)
csr_pubkey = csr.public_key()
asserts.assert_equal(csr_pubkey.key_size, 256, "Incorrect key size")
# Verify signature algorithm is ecdsa-with-SHA156
signature_algorithm = dict(layer1['signatureAlgorithm'])['algorithm']
asserts.assert_equal(signature_algorithm, rfc5480.ecdsa_with_SHA256, "CSR specifies incorrect signature key algorithm")
# Verify signature is valid
asserts.assert_true(csr.is_signature_valid, "Signature is invalid")
# Verify csr_nonce_returned is octet string of length 32
try:
# csr_nonce_returned is an octet string if it can be converted to an int
int(hex_from_bytes(csr_nonce_returned), 16)
except ValueError:
asserts.fail("Returned CSR nonce is not an octet string")
# Verify returned nonce matches sent nonce
asserts.assert_equal(csr_nonce_returned, csr_nonce, "Returned nonce is incorrect")
nocsr_tbs = nocsr_elements + attestation_challenge
self.print_step(7, "Verify signature")
baselen = curve_by_name("NIST256p").baselen
attestation_raw_r = int(hex_from_bytes(nocsr_attestation_signature_raw[:baselen]), 16)
attestation_raw_s = int(hex_from_bytes(nocsr_attestation_signature_raw[baselen:]), 16)
nocsr_attestation = utils.encode_dss_signature(attestation_raw_r, attestation_raw_s)
dac.public_key().verify(signature=nocsr_attestation, data=nocsr_tbs, signature_algorithm=ec.ECDSA(hashes.SHA256()))
self.print_step(8, "Verify that attestation challenge does not appear in the vendor fields")
if vendor1:
asserts.assert_not_in(attestation_challenge, vendor1, "Attestation challenge appears in vendor 1")
if vendor2:
asserts.assert_not_in(attestation_challenge, vendor2, "Attestation challenge appears in vendor 2")
if vendor3:
asserts.assert_not_in(attestation_challenge, vendor3, "Attestation challenge appears in vendor 3")
self.print_step(9, "Disarm failsafe")
await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1))
self.print_step(10, "Arm failsafe to 900s")
await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=1))
self.print_step(11, "Send CSRRequest wtih 31-byte nonce")
bad_nonce = random.randbytes(32)
try:
await self.send_single_cmd(cmd=opcreds.Commands.CSRRequest(CSRNonce=bad_nonce, isForUpdateNOC=False))
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.InvalidCommand, "Received incorrect error from CSRRequest command with bad nonce")
self.print_step(12, "Disarm failsafe")
await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1))
self.print_step(13, "Open commissioning window")
params = self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)
self.print_step(14, "Commission to TH2")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH2 = new_fabric_admin.NewController(nodeId=112233)
errcode = TH2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)
asserts.assert_true(errcode.is_success, 'Commissioning on TH2 did not complete successfully')
self.print_step(15, "Read NOCs list for TH1")
temp = await self.read_single_attribute_check_success(
cluster=Clusters.OperationalCredentials,
attribute=Clusters.OperationalCredentials.Attributes.NOCs)
asserts.assert_equal(len(temp), 1, "Returned NOC list does not contain one entry")
th1_noc = temp[0].noc
self.print_step(16, "Read NOCs list for TH2")
temp = await self.read_single_attribute_check_success(
cluster=Clusters.OperationalCredentials,
attribute=Clusters.OperationalCredentials.Attributes.NOCs, dev_ctrl=TH2)
asserts.assert_equal(len(temp), 1, "Returned NOC list does not contain one entry")
th2_noc = temp[0].noc
self.print_step(17, "Extract the public keys")
# NOCs are TLV encoded, public key is field 9
th1_decoded = TLVReader(th1_noc).get()["Any"]
th2_decoded = TLVReader(th2_noc).get()["Any"]
th1_pk = th1_decoded[9]
th2_pk = th2_decoded[9]
asserts.assert_not_equal(th1_pk, th2_pk, "Publc keys are the same")
self.print_step(17, "Read the fabric index for TH2")
th2_idx = await self.read_single_attribute_check_success(
Clusters.OperationalCredentials,
attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=TH2)
self.print_step(18, "Remove TH2")
await self.send_single_cmd(cmd=Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=th2_idx))
if __name__ == "__main__":
default_matter_test_main()