blob: 1d23883237eeeb7b55f04460dfe1dbbb408a6f05 [file] [log] [blame]
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from glob import glob
from pathlib import Path
from typing import List, Optional
import chip.clusters as Clusters
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509 import AuthorityKeyIdentifier, Certificate, SubjectKeyIdentifier, load_der_x509_certificate
from matter_testing_support import (MatterBaseTest, TestStep, async_test_body, bytes_from_hex, default_matter_test_main,
from mobly import asserts
# Those are SDK samples that are known to be non-production.
# List of certificate names that are known to have some issues, but not yet
# updated in DCL. They will fail the test at runtime if seen, but not in CI.
def load_all_paa(paa_path: Path) -> dict:"Loading all PAAs in %s" % paa_path)
paa_by_skid = {}
for filename in glob(str(paa_path.joinpath("*.der"))):
with open(filename, "rb") as derfile:"Loading PAA: {filename}")
# Load cert
paa_der =
paa_cert = load_der_x509_certificate(paa_der)
# Find the subject key identifier (if present), and record it
for extension in paa_cert.extensions:
if extension.oid == SubjectKeyIdentifier.oid:
skid = extension.value.key_identifier
paa_by_skid[skid] = (Path(filename).name, paa_cert)
except (ValueError, IOError) as e:
logging.error(f"Failed to load {filename}: {str(e)}")
if Path(filename).name not in ALLOWED_SKIPPED_FILENAMES:
logging.error(f"Re-raising error and failing: found new invalid PAA: {filename}")
return paa_by_skid
def extract_akid(cert: Certificate) -> Optional[bytes]:
# Find the authority key identifier (if present)
for extension in cert.extensions:
if extension.oid == AuthorityKeyIdentifier.oid:
return extension.value.key_identifier
return None
class TC_DA_1_7(MatterBaseTest):
''' TC-DA-1.7
This test requires two instances of the DUT with the same PID/VID to confirm that the individual
devices are provisioned with different device attestation keys even in the same product line.
In order to fully test this locally, it requires two separate instances of the example app
with different keys, different discrimimators, different ports, and different KVSs.
The example apps use the SDK authority key, so the script needs the allow_sdk_dac arg set.
One suggested method of testing:
Terminal 1: (first DUT - default port 5540)
rm kvs1
${PATH_TO_ALL_CLUSTERS}/chip-all-clusters-app --dac_provider \
credentials/development/commissioner_dut/struct_cd_authorized_paa_list_count1_valid/test_case_vector.json \
--product-id 32768 --KVS kvs1 --discriminator 12
Terminal 2: (second DUT - port 5541)
rm kvs2
${PATH_TO_ALL_CLUSTERS}/chip-all-clusters-app --dac_provider \
credentials/development/commissioner_dut/struct_cd_authorized_paa_list_count2_valid/test_case_vector.json \
--product-id 32768 --KVS kvs2 --discriminator 34 --secured-device-port 5541
Terminal 3: (test application)
./scripts/tests/ --script "src/python_testing/" \
--script-args "--storage-path admin_storage.json --commissioning-method on-network \
--discriminator 12 34 --passcode 20202021 20202021 --bool-arg allow_sdk_dac:true"
def setup_class(self):
self.allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)
self.post_cert_test = self.user_params.get("post_cert_test", False)
def expected_number_of_DUTs(self) -> int:
return 1 if (self.allow_sdk_dac or self.post_cert_test) else 2
def steps_one_dut(self, DUT: int) -> List[TestStep]:
return [TestStep(f'{DUT}', f'Test DUT{DUT} DAC chain as follows:'),
TestStep(f'{DUT}.1', f'TH sends CertificateChainRequest Command to DUT{DUT} with the CertificateType set to PAICertificate',
'Verify that the DUT returns a CertificateChainResponse. Save the returned Certificate as `pai_cert`.'),
TestStep(f'{DUT}.2', 'TH sends CertificateChainRequest Command to DUT1 with the CertificateType set to DACCertificate',
'Verify that the DUT returns a CertificateChainResponse. Save the returned Certificate as `dac_cert`.'),
TestStep(f'{DUT}.3', 'TH extracts the Authority Key Identifier from the PAI certificate',
('* Verify that the Authority Key Identifier is signed by a PAA in the DCL. (Ensure that it is not SDK’s test PAAs)\n'
'* Verify that PAI authority key ID must not be one of:\n'
' - 78: 5C: E7: 05: B8: 6B: 8F: 4E: 6F: C7: 93: AA: 60: CB: 43: EA: 69: 68:82: D5\n'
' - 6A: FD: 22: 77: 1F: 51: 1F: EC: BF: 16: 41: 97: 67: 10: DC: DC: 31: A1: 71: 7E\n'
'* Save the selected PAA certificate as `paa_cert`\n')),
TestStep(f'{DUT}.4', 'TH extracts ASN.1 DER bytes for the entire issuer field of `dac_cert` and subject field of `pai_cert`.',
'Verify that the `dac_cert` `issuer field is byte-forbyte equivalent to the `pai_cert`subject field.'),
TestStep(f'{DUT}.5', 'TH extracts ASN.1 DER bytes for the entire issuer field of `pai_cert` and subject field of `paa_cert`.',
'Verify that the `pai_cert` issuer field is byte-forbyte equivalent to the `paa_cert` subject field.'),
TestStep(f'{DUT}.6', f'TH extracts the public key from the DAC and saves as `pk_{DUT}`.')
def steps_TC_DA_1_7(self):
steps = [TestStep(0, "Commission DUT if not already done", is_commissioning=True)]
steps += self.steps_one_dut(1)
if self.expected_number_of_DUTs() == 2:
steps += self.steps_one_dut(2)
steps += [TestStep(3, "Verify that `pk_1` is not equal to `pk_2",
"Public keys do not match")]
return steps
async def test_TC_DA_1_7(self):
# post_cert_tests (or sdk) can use the qr or manual code
# We don't currently support this in cert because the base doesn't support multiple QR/manual
num = 0
if self.matter_test_config.discriminators:
num += len(self.matter_test_config.discriminators)
if self.matter_test_config.qr_code_content:
num += 1
if self.matter_test_config.manual_code:
num += 1
if num != self.expected_number_of_DUTs():
if self.allow_sdk_dac:
msg = "The allow_sdk_dac flag is only for use in CI. When using this test in CI, please specify a single discriminator, manual-code or qr-code-content"
elif self.post_cert_test:
msg = "The post_cert_test flag is only for use post-certification. When using this flag, please specify a single discriminator, manual-code or qr-code-content"
msg = "This test requires two devices for use at certification. Please specify two device discriminators ex. --discriminator 1234 5678"
pk = []
# Commissioning - already done.
for i in range(len(self.matter_test_config.dut_node_ids)):
pk.append(await self.single_DUT(i+1, self.matter_test_config.dut_node_ids[i]))
asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs")
async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes:"Pre-condition: load all PAAs SKIDs")
conf = self.matter_test_config
paa_by_skid = load_all_paa(conf.paa_trust_store_path)"Found %d PAAs" % len(paa_by_skid))
# Test plan step introducing test for each DUT
dev_ctrl = self.default_controller
result = await dev_ctrl.SendCommand(dut_node_id, 0,
pai = result.certificate
asserts.assert_less_equal(len(pai), 600, "PAI cert must be at most 600 bytes")
key = 'pai_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pai)})
result = await dev_ctrl.SendCommand(dut_node_id, 0,
dac = result.certificate
asserts.assert_less_equal(len(dac), 600, "DAC cert must be at most 600 bytes")
key = 'dac_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(dac)})
self.step(f'{dut_index}.3')"DUT {} Step 3 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index))
pai_cert = load_der_x509_certificate(pai)
pai_akid = extract_akid(pai_cert)
if pai_akid not in paa_by_skid:"DUT %d PAI (%s) not matched in PAA trust store" % (dut_index, hex_from_bytes(pai_akid)))
filename, paa_cert = paa_by_skid[pai_akid]"Matched PAA file %s, subject: %s" % (filename, paa_cert.subject))
public_key = paa_cert.public_key()
public_key.verify(signature=pai_cert.signature, data=pai_cert.tbs_certificate_bytes,
except InvalidSignature as e:"DUT %d: Failed to verify PAI signature against PAA public key: %s" % (dut_index, str(e)))"Validated PAI signature against PAA")"DUT {} Step 3 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index))
if self.allow_sdk_dac:
logging.warning("===> TEST STEP SKIPPED: Allowing SDK DACs!")
for candidate in FORBIDDEN_AKID:
asserts.assert_not_equal(hex_from_bytes(pai_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")
# dac issuer == pai subject
dac_cert = load_der_x509_certificate(dac)
asserts.assert_equal(dac_cert.issuer, pai_cert.subject, "DAC issuer does not match PAI subject")
# pai issues == paa subject
asserts.assert_equal(pai_cert.issuer, paa_cert.subject, "PAI issuer does not match PAA subject")
pk = dac_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)"Subject public key pk: %s" % hex_from_bytes(pk))
key = 'pk_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pk)})
return pk
if __name__ == "__main__":