|  | #!/usr/bin/env python3 | 
|  | # | 
|  | # Copyright (c) 2023-2024 Project CHIP Authors | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  | # Note: py cryptography supports indirect CRLs trom version 44.0.0 and above. | 
|  | #       You may need to update your cryptography version. | 
|  | # | 
|  | # Generates a basic RevocationSet from TestNet | 
|  | # Usage: | 
|  | #     python ./credentials/generate-revocation-set.py --help | 
|  |  | 
|  | import base64 | 
|  | import dataclasses | 
|  | import json | 
|  | import logging | 
|  | import os | 
|  | import subprocess | 
|  | import sys | 
|  | import unittest | 
|  | from enum import Enum | 
|  | from typing import Optional | 
|  |  | 
|  | import click | 
|  | import requests | 
|  | from click_option_group import AllOptionGroup, RequiredMutuallyExclusiveOptionGroup, optgroup | 
|  | from cryptography import x509 | 
|  | from cryptography.hazmat.primitives import serialization | 
|  | from cryptography.hazmat.primitives.asymmetric import ec | 
|  | from cryptography.x509.extensions import ExtensionNotFound | 
|  | from cryptography.x509.oid import NameOID | 
|  |  | 
|  | # Supported log levels, mapping string values required for argument | 
|  | # parsing into logging constants | 
|  | __LOG_LEVELS__ = { | 
|  | 'debug': logging.DEBUG, | 
|  | 'info': logging.INFO, | 
|  | 'warn': logging.WARN, | 
|  | 'fatal': logging.FATAL, | 
|  | } | 
|  |  | 
|  |  | 
|  | class RevocationType(Enum): | 
|  | CRL = 1 | 
|  |  | 
|  |  | 
|  | class CertVerificationResult(Enum): | 
|  | SUCCESS = 1 | 
|  | SKID_NOT_FOUND = 2 | 
|  | AKID_NOT_FOUND = 3 | 
|  | SIGNATURE_VERIFICATION_FAILED = 4 | 
|  | ISSUER_MISMATCH = 5 | 
|  | AKID_MISMATCH = 6 | 
|  |  | 
|  |  | 
|  | @dataclasses.dataclass | 
|  | class RevocationPoint: | 
|  | vid: int | 
|  | label: str | 
|  | issuerSubjectKeyID: str | 
|  | pid: int | 
|  | isPAA: bool | 
|  | crlSignerCertificate: str | 
|  | dataURL: str | 
|  | dataFileSize: str | 
|  | dataDigest: str | 
|  | dataDigestType: int | 
|  | revocationType: int | 
|  | schemaVersion: int | 
|  | crlSignerDelegator: str | 
|  |  | 
|  |  | 
|  | @dataclasses.dataclass | 
|  | class RevocationSet: | 
|  | type: str | 
|  | issuer_subject_key_id: str | 
|  | issuer_name: str | 
|  | revoked_serial_numbers: [str] | 
|  | crl_signer_cert: str | 
|  | crl_signer_delegator: str = None | 
|  |  | 
|  | def asDict(self): | 
|  | return dataclasses.asdict(self) | 
|  |  | 
|  |  | 
|  | OID_VENDOR_ID = x509.ObjectIdentifier("1.3.6.1.4.1.37244.2.1") | 
|  | OID_PRODUCT_ID = x509.ObjectIdentifier("1.3.6.1.4.1.37244.2.2") | 
|  |  | 
|  | PRODUCTION_NODE_URL = "https://on.dcl.csa-iot.org:26657" | 
|  | PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org" | 
|  | TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org" | 
|  |  | 
|  |  | 
|  | def extract_single_integer_attribute(subject, oid): | 
|  | attribute_list = subject.get_attributes_for_oid(oid) | 
|  |  | 
|  | if len(attribute_list) == 1: | 
|  | return int(attribute_list[0].value, 16) | 
|  |  | 
|  | return None | 
|  |  | 
|  |  | 
|  | def extract_fallback_tag_from_common_name(cn, marker): | 
|  | val_len = 4 | 
|  | start_idx = cn.find(marker) | 
|  |  | 
|  | if start_idx != -1: | 
|  | val_start_idx = start_idx + len(marker) | 
|  | val = cn[val_start_idx:val_start_idx + val_len] | 
|  | return int(val, 16) if len(val) == 4 else None | 
|  |  | 
|  | return None | 
|  |  | 
|  |  | 
|  | def parse_vid_pid_from_distinguished_name(distinguished_name): | 
|  | # VID/PID encoded using Matter specific RDNs | 
|  | vid = extract_single_integer_attribute(distinguished_name, OID_VENDOR_ID) | 
|  | pid = extract_single_integer_attribute(distinguished_name, OID_PRODUCT_ID) | 
|  |  | 
|  | # Fallback method to get the VID/PID, encoded in CN as "Mvid:FFFF Mpid:1234" | 
|  | if vid is None and pid is None: | 
|  | cn = distinguished_name.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value | 
|  | vid = extract_fallback_tag_from_common_name(cn, 'Mvid:') | 
|  | pid = extract_fallback_tag_from_common_name(cn, 'Mpid:') | 
|  |  | 
|  | return vid, pid | 
|  |  | 
|  |  | 
|  | def get_akid(cert: x509.Certificate) -> str: | 
|  | return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier.hex().upper() | 
|  |  | 
|  |  | 
|  | def get_skid(cert: x509.Certificate) -> str: | 
|  | return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier.hex().upper() | 
|  |  | 
|  |  | 
|  | def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> CertVerificationResult: | 
|  | ''' | 
|  | Verifies if the cert is signed by root. | 
|  | ''' | 
|  | try: | 
|  | cert_akid = get_akid(cert) | 
|  | except ExtensionNotFound: | 
|  | return CertVerificationResult.AKID_NOT_FOUND | 
|  | try: | 
|  | root_skid = get_skid(root) | 
|  | except ExtensionNotFound: | 
|  | return CertVerificationResult.SKID_NOT_FOUND | 
|  |  | 
|  | if cert_akid != root_skid: | 
|  | return CertVerificationResult.AKID_MISMATCH | 
|  |  | 
|  | if cert.issuer != root.subject: | 
|  | return CertVerificationResult.ISSUER_MISMATCH | 
|  |  | 
|  | # public_key().verify() do not return anything if signature is valid, | 
|  | # will raise an exception if signature is invalid | 
|  | try: | 
|  | root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm)) | 
|  | except Exception: | 
|  | return CertVerificationResult.SIGNATURE_VERIFICATION_FAILED | 
|  |  | 
|  | return CertVerificationResult.SUCCESS | 
|  |  | 
|  |  | 
|  | def is_self_signed_certificate(cert: x509.Certificate) -> bool: | 
|  | result = verify_cert(cert, cert) | 
|  | if result == CertVerificationResult.SUCCESS: | 
|  | return True | 
|  | else: | 
|  | logging.debug( | 
|  | f"Certificate with subject: {cert.subject.rfc4514_string()} is not a valid self-signed certificate. Result: {result.name}") | 
|  | return False | 
|  |  | 
|  |  | 
|  | # delegator is optional so can be None, but crl_signer and paa has to be present | 
|  | def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509.Certificate, paa: x509.Certificate): | 
|  | ''' | 
|  | There could be four scenarios: | 
|  | 1. CRL Signer is PAA itself, hence its self-signed certificate | 
|  | 2. CRL Signer is PAI certificate, and we can validate (crl_signer -> paa) chain | 
|  | 3. CRL Signer delegator is PAA, and we can validate (crl_signer -> crl_signer_delegator(paa) -> paa) chain | 
|  | 4. CRL Signer delegator is PAI, and we can validate (crl_signer -> crl_signer_delegator -> paa) chain | 
|  | ''' | 
|  |  | 
|  | if crl_signer_delegator: | 
|  | result_signer = verify_cert(crl_signer, crl_signer_delegator) | 
|  | if not result_signer == CertVerificationResult.SUCCESS: | 
|  | logging.debug( | 
|  | f"Cannot verify certificate subject: {crl_signer.subject.rfc4514_string()} issued by certificate subject: {crl_signer_delegator.subject.rfc4514_string()}. Result: {result_signer.name}") | 
|  | return False | 
|  |  | 
|  | result_delegator = verify_cert(crl_signer_delegator, paa) | 
|  | if not result_delegator == CertVerificationResult.SUCCESS: | 
|  | logging.debug( | 
|  | f"Cannot verify certificate subject: {crl_signer_delegator.subject.rfc4514_string()} issued by certificate subject: {paa.subject.rfc4514_string()}. Result: {result_delegator.name}") | 
|  | return False | 
|  | return True | 
|  | else: | 
|  | result = verify_cert(crl_signer, paa) | 
|  | if not result == CertVerificationResult.SUCCESS: | 
|  | logging.debug( | 
|  | f"Cannot verify certificate subject: {crl_signer.subject.rfc4514_string()} issued by certificate subject: {paa.subject.rfc4514_string()}. Result: {result.name}") | 
|  | return False | 
|  | return True | 
|  |  | 
|  |  | 
|  | def validate_vid_pid(revocation_point: RevocationPoint, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool: | 
|  | crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject) | 
|  | logging.debug(f"vid: {revocation_point.vid})") | 
|  | logging.debug(f"crl_signer_vid: {crl_signer_vid})") | 
|  | if revocation_point.isPAA: | 
|  | if crl_signer_vid is not None: | 
|  | if revocation_point.vid != crl_signer_vid: | 
|  | logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...") | 
|  | return False | 
|  | else: | 
|  | vid_to_match = crl_signer_vid | 
|  | pid_to_match = crl_signer_pid | 
|  |  | 
|  | # if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator | 
|  | if crl_signer_delegator_certificate: | 
|  | vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject) | 
|  | logging.debug(f"vid_to_match: {vid_to_match})") | 
|  | logging.debug(f"pid_to_match: {pid_to_match})") | 
|  | if vid_to_match is None or revocation_point.vid != vid_to_match: | 
|  | logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...") | 
|  | return False | 
|  |  | 
|  | if pid_to_match is not None: | 
|  | if revocation_point.pid != pid_to_match: | 
|  | logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...") | 
|  | return False | 
|  |  | 
|  | return True | 
|  |  | 
|  |  | 
|  | def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList, | 
|  | crl_signer_certificate: x509.Certificate, | 
|  | certificate_authority_name: x509.Name, | 
|  | certificate_akid_hex: str, | 
|  | crl_signer_delegator_cert: x509.Certificate) -> RevocationSet: | 
|  | """Generate a revocation set from a CRL file. | 
|  |  | 
|  | Args: | 
|  | crl_file: The CRL object containing revoked certificates | 
|  | crl_signer_certificate: The certificate object used to sign the CRL | 
|  | certificate_authority_name: x509.Name of the issuer | 
|  | certificate_akid_hex: Hex encoded Authority Key Identifier | 
|  | crl_signer_delegator_cert: crl signer delegator certificate object | 
|  |  | 
|  | Returns: | 
|  | RevocationSet containing the revocation set data with fields: | 
|  | - type: "revocation_set" | 
|  | - issuer_subject_key_id: Authority Key Identifier (hex) | 
|  | - issuer_name: Issuer name (base64) | 
|  | - revoked_serial_numbers: List of revoked serial numbers | 
|  | - crl_signer_cert: CRL signer certificate (base64 DER) | 
|  | - crl_signer_delegator: Optional delegator certificate (base64 DER) | 
|  | """ | 
|  | serialnumber_list = [] | 
|  |  | 
|  | for revoked_cert in crl_file: | 
|  | try: | 
|  | cert_issuer_entry_ext = revoked_cert.extensions.get_extension_for_oid(x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER) | 
|  | revoked_cert_issuer = cert_issuer_entry_ext.value.get_values_for_type(x509.DirectoryName)[0] | 
|  | if revoked_cert_issuer is not None: | 
|  | # check if this really are the same thing | 
|  | if revoked_cert_issuer != x509.DirectoryName(certificate_authority_name).value: | 
|  | logging.warning("CRL entry issuer is not CRL File Issuer, continue...") | 
|  | continue | 
|  | except Exception: | 
|  | pass | 
|  |  | 
|  | # Ensure the serial number is always a 2-byte aligned hex string. | 
|  | # TestDACRevocationDelegateImpl encodes the serial number as an even-length hex string | 
|  | # using BytesToHex in src/lib/support/BytesToHex.cpp. | 
|  | # As the primary consumer of this data, we should use the same here. | 
|  | serialnumber = '{:02X}'.format(revoked_cert.serial_number) | 
|  | serialnumber = serialnumber if len(serialnumber) % 2 == 0 else '0' + serialnumber | 
|  | serialnumber_list.append(serialnumber) | 
|  |  | 
|  | entry = RevocationSet( | 
|  | type='revocation_set', | 
|  | issuer_subject_key_id=certificate_akid_hex, | 
|  | issuer_name=get_b64_name(certificate_authority_name), | 
|  | revoked_serial_numbers=serialnumber_list, | 
|  | crl_signer_cert=base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'), | 
|  | ) | 
|  |  | 
|  | if crl_signer_delegator_cert: | 
|  | entry.crl_signer_delegator = base64.b64encode( | 
|  | crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8') | 
|  |  | 
|  | return entry | 
|  |  | 
|  |  | 
|  | # This is implemented as per point (9) in 6.2.4.1. Conceptual algorithm for revocation set construction | 
|  | def get_certificate_authority_details(crl_signer_certificate: x509.Certificate, | 
|  | crl_signer_delegator_cert: x509.Certificate, | 
|  | paa_certificate_object: x509.Certificate, | 
|  | is_paa: bool) -> tuple[x509.Name, str]: | 
|  | """Get certificate authority name and AKID based on certificate hierarchy. | 
|  |  | 
|  | Args: | 
|  | crl_signer_certificate: The CRL signer certificate | 
|  | crl_signer_delegator_cert: Optional delegator certificate | 
|  | paa_certificate_object: Optional PAA certificate | 
|  | is_paa: Whether this is a PAA certificate | 
|  |  | 
|  | Returns: | 
|  | tuple[str, str]: (certificate_authority_name, certificate_akid_hex) | 
|  | """ | 
|  | if is_paa and not is_self_signed_certificate(crl_signer_certificate): | 
|  | cert_for_details = paa_certificate_object | 
|  | logging.debug("Using PAA certificate for details") | 
|  | elif crl_signer_delegator_cert: | 
|  | cert_for_details = crl_signer_delegator_cert | 
|  | logging.debug("Using CRL Signer Delegator certificate for details") | 
|  | else: | 
|  | cert_for_details = crl_signer_certificate | 
|  | logging.debug("Using CRL Signer certificate for details") | 
|  |  | 
|  | certificate_authority_name = cert_for_details.subject | 
|  | try: | 
|  | certificate_akid = get_skid(cert_for_details) | 
|  | logging.debug(f"Certificate Authority Name: {certificate_authority_name}") | 
|  | logging.debug(f"Certificate AKID: {certificate_akid}") | 
|  |  | 
|  | return certificate_authority_name, certificate_akid | 
|  | except ExtensionNotFound: | 
|  | logging.warning("Certificate SKID not found in authoarity certificate.") | 
|  |  | 
|  |  | 
|  | def get_b64_name(name: x509.name.Name) -> str: | 
|  | ''' | 
|  | Get base64 encoded name | 
|  | ''' | 
|  | return base64.b64encode(name.public_bytes()).decode('utf-8') | 
|  |  | 
|  |  | 
|  | def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList: | 
|  | logging.debug(f"Fetching CRL from {url}") | 
|  |  | 
|  | try: | 
|  | r = requests.get(url, timeout=timeout) | 
|  | logging.debug(f"Fetched CRL: {r.content}") | 
|  | return x509.load_der_x509_crl(r.content) | 
|  | except Exception as e: | 
|  | logging.error('Failed to fetch a valid CRL', e) | 
|  |  | 
|  |  | 
|  | class DclClientInterface: | 
|  | ''' | 
|  | An interface for interacting with DCLD. | 
|  | ''' | 
|  |  | 
|  | def send_get_request(self, url: str) -> dict: | 
|  | ''' | 
|  | Send a GET request for a json object. | 
|  | ''' | 
|  | try: | 
|  | response = requests.get(url).json() | 
|  | return response | 
|  | except Exception as e: | 
|  | logging.error(f"Failed to fetch {url}: {e}") | 
|  | return None | 
|  |  | 
|  | def get_revocation_points(self) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points from DCL | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points | 
|  | ''' | 
|  | raise NotImplementedError | 
|  |  | 
|  | def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points by subject key ID | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | issuer_subject_key_id: str | 
|  | Subject key ID | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points | 
|  | ''' | 
|  | raise NotImplementedError | 
|  |  | 
|  | def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: | 
|  | ''' | 
|  | Get certificate from DCL. | 
|  | ''' | 
|  | raise NotImplementedError | 
|  |  | 
|  | def get_only_approved_certificate(self, response: dict, skid_hex: str) -> tuple[bool, Optional[x509.Certificate]]: | 
|  | ''' | 
|  | Get only approved certificate from DCL resposne. | 
|  | ''' | 
|  | if response is None or not response.get("approvedCertificates", {}).get("certs", []): | 
|  | raise requests.exception.NotFound(f"No certificate found for {skid_hex}") | 
|  | if len(response["approvedCertificates"]["certs"]) > 1: | 
|  | raise ValueError(f"Multiple certificates found for {skid_hex}") | 
|  | issuer_certificate = x509.load_pem_x509_certificate(bytes(response["approvedCertificates"]["certs"][0]["pemCert"], "utf-8")) | 
|  | return response["approvedCertificates"]["certs"][0]["isRoot"], issuer_certificate | 
|  |  | 
|  | def get_paa_cert(self, initial_cert: x509.Certificate) -> Optional[x509.Certificate]: | 
|  | ''' | 
|  | Get the PAA certificate for the CRL Signer Certificate. | 
|  | ''' | 
|  | issuer_name = initial_cert.issuer | 
|  | try: | 
|  | akid = get_akid(initial_cert) | 
|  | except ExtensionNotFound: | 
|  | logging.warning('Certificate AKID not found.') | 
|  | return | 
|  | paa_certificate = None | 
|  | while not paa_certificate: | 
|  | try: | 
|  | is_root, issuer_certificate = self.get_approved_certificate(issuer_name, akid) | 
|  | if is_root: | 
|  | paa_certificate = issuer_certificate | 
|  | break | 
|  |  | 
|  | except Exception as e: | 
|  | logging.error('Failed to get PAA certificate', e) | 
|  | return | 
|  | logging.debug(f"issuer_name: {issuer_certificate.subject.rfc4514_string()}") | 
|  | issuer_name = issuer_certificate.issuer | 
|  | try: | 
|  | akid = get_akid(issuer_certificate) | 
|  | except ExtensionNotFound: | 
|  | logging.warning('Issuer Certificate AKID not found.') | 
|  | logging.debug(f"akid: {akid}") | 
|  | if paa_certificate is None: | 
|  | logging.warning("PAA Certificate not found, continue...") | 
|  | return paa_certificate | 
|  |  | 
|  | def get_crl_file(self, | 
|  | revocation_point: RevocationPoint, | 
|  | crl_signer_certificate: x509.Certificate) -> x509.CertificateRevocationList: | 
|  | """Obtain the CRL.""" | 
|  | try: | 
|  | r = requests.get(revocation_point.dataURL, timeout=5) | 
|  | logging.debug(f"Fetched CRL: {r.content}") | 
|  | return x509.load_der_x509_crl(r.content) | 
|  | except Exception: | 
|  | logging.warning(f"Failed to fetch a valid CRL for': {crl_signer_certificate.subject.rfc4514_string()}") | 
|  |  | 
|  | def get_formatted_hex_skid(self, skid_hex: str) -> str: | 
|  | return ':'.join([skid_hex[i:i+2] for i in range(0, len(skid_hex), 2)]) | 
|  |  | 
|  |  | 
|  | class NodeDclClient(DclClientInterface): | 
|  | ''' | 
|  | A client for interacting with DCLD using command line interface (CLI). | 
|  | ''' | 
|  |  | 
|  | def __init__(self, dcld_exe: str, use_test_net: bool): | 
|  | ''' | 
|  | Initialize the client. | 
|  |  | 
|  | dcld_exe: str | 
|  | Path to `dcld` executable. | 
|  | use_test_net: bool | 
|  | Indicates if the client should use TestNet or MainNet URL with dcld executable. | 
|  | ''' | 
|  |  | 
|  | self.dcld_exe = dcld_exe | 
|  | self.use_test_net = use_test_net | 
|  |  | 
|  | def build_dcld_command_line(self, cmdlist: list[str]) -> list[str]: | 
|  | ''' | 
|  | Build command line for `dcld` executable. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | cmdlist: list[str] | 
|  | List of command line arguments to append to some predefined arguments. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[str] | 
|  | The complete command list including the DCLD executable and node option if in production. | 
|  | ''' | 
|  |  | 
|  | return [self.dcld_exe] + cmdlist + ([] if self.use_test_net else ['--node', PRODUCTION_NODE_URL]) | 
|  |  | 
|  | def get_dcld_cmd_output_json(self, cmdlist: list[str]) -> dict: | 
|  | ''' | 
|  | Executes a DCLD CLI command and returns the JSON output. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | cmdlist: list[str] | 
|  | List of command line arguments to append to some predefined arguments. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | dict | 
|  | The JSON output from the command. | 
|  | ''' | 
|  |  | 
|  | # Set the output as JSON | 
|  | subprocess.Popen([self.dcld_exe, 'config', 'output', 'json']) | 
|  |  | 
|  | cmdpipe = subprocess.Popen(self.build_dcld_command_line(cmdlist), | 
|  | stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 
|  | return json.loads(cmdpipe.stdout.read()) | 
|  |  | 
|  | def get_revocation_points(self) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points from DCL. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points. | 
|  | ''' | 
|  |  | 
|  | response = self.get_dcld_cmd_output_json(['query', 'pki', 'all-revocation-points']) | 
|  | return [RevocationPoint(**r) for r in response["PkiRevocationDistributionPoint"]] | 
|  |  | 
|  | def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points by subject key ID. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | issuer_subject_key_id: str | 
|  | Subject key ID. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points. | 
|  | ''' | 
|  |  | 
|  | response = self.get_dcld_cmd_output_json(['query', 'pki', 'revocation-points', | 
|  | '--issuer-subject-key-id', issuer_subject_key_id]) | 
|  | logging.debug(f"Response revocation points: {response}") | 
|  | return [RevocationPoint(**r) for r in response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]] | 
|  |  | 
|  | def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: | 
|  | ''' | 
|  | Get certificate from DCL. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | subject_name: x509.name.Name | 
|  | Subject Name object. | 
|  | skid_hex: str | 
|  | Subject Key ID in hex format. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | tuple[bool, x509.Certificate] | 
|  | Tuple of is_paa and the certificate from the DCL. | 
|  | ''' | 
|  | subject_name_b64 = get_b64_name(subject_name) | 
|  | query_cmd_list = ['query', 'pki', 'x509-cert', '-u', subject_name_b64, '-k', skid_hex] | 
|  | logging.debug( | 
|  | f"Fetching issuer from dcl query{' '.join(query_cmd_list)}") | 
|  | response = self.get_dcld_cmd_output_json(query_cmd_list) | 
|  | return self.get_only_approved_certificate(response, skid_hex) | 
|  |  | 
|  |  | 
|  | class RestDclClient(DclClientInterface): | 
|  | ''' | 
|  | A client for interacting with DCLD using the REST API. | 
|  | ''' | 
|  |  | 
|  | def __init__(self, use_test_net: bool): | 
|  | ''' | 
|  | Initialize the client. | 
|  |  | 
|  | use_test_net: bool | 
|  | Indicates if the client should use TestNet or MainNet REST API URL. | 
|  | ''' | 
|  | self.rest_node_url = TEST_NODE_URL_REST if use_test_net else PRODUCTION_NODE_URL_REST | 
|  |  | 
|  | def get_revocation_points(self) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points from DCL. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points. | 
|  | ''' | 
|  |  | 
|  | response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points") | 
|  | return [RevocationPoint(**r) for r in response["PkiRevocationDistributionPoint"]] | 
|  |  | 
|  | def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points by subject key ID. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | issuer_subject_key_id: str | 
|  | Subject key ID. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points. | 
|  | ''' | 
|  |  | 
|  | response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}") | 
|  | return [RevocationPoint(**r) for r in response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]] | 
|  |  | 
|  | def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: | 
|  | ''' | 
|  | Get certificate from DCL. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | subject_name: x509.name.Name | 
|  | Subject Name object. | 
|  | skid_hex: str | 
|  | Subject Key ID in hex format. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | tuple[bool, x509.Certificate] | 
|  | Tuple of is_paa and the certificate from the DCL. | 
|  | ''' | 
|  | logging.debug( | 
|  | f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{get_b64_name(subject_name)}/{self.get_formatted_hex_skid(skid_hex)}") | 
|  | response = self.send_get_request( | 
|  | f"{self.rest_node_url}/dcl/pki/certificates/{get_b64_name(subject_name)}/{self.get_formatted_hex_skid(skid_hex)}") | 
|  | logging.debug(f"Response certificate: {response}") | 
|  | return self.get_only_approved_certificate(response, skid_hex) | 
|  |  | 
|  |  | 
|  | class LocalFilesDclClient(DclClientInterface): | 
|  | ''' | 
|  | A client for interacting with local DLCD response data. | 
|  | ''' | 
|  |  | 
|  | def __init__(self, crls: [], dcl_certificates: [], revocation_points_response_file: str): | 
|  | ''' | 
|  | Initialize the client. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | crls: list | 
|  | List of CRL files. | 
|  | dcl_certificates: list | 
|  | List of certificate files. | 
|  | revocation_points_response_file: str | 
|  | Path to the get-revocation-points response json file. | 
|  | ''' | 
|  |  | 
|  | logging.debug(f"Loading certificates from {dcl_certificates}") | 
|  | logging.debug(f"Loading crls from {crls}") | 
|  | logging.debug(f"Loading revocation points response from {revocation_points_response_file}") | 
|  | self.crls = self.get_crls(crls) | 
|  | self.revocation_points = [RevocationPoint(**r) | 
|  | for r in json.load(revocation_points_response_file)["PkiRevocationDistributionPoint"]] | 
|  | self.authoritative_certs = self.get_authoritative_certificates(dcl_certificates) | 
|  |  | 
|  | def get_lookup_key(self, certificate: x509.Certificate) -> str: | 
|  | ''' | 
|  | Get key used in this class to lookup certificates. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | certificate: x509.Certificate | 
|  | Certificate object. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | str: | 
|  | lookup key derived from the certificate. | 
|  | ''' | 
|  | base64_name = get_b64_name(certificate.subject) | 
|  | try: | 
|  | skid = get_skid(certificate) | 
|  | return self.format_lookup_key(base64_name, skid) | 
|  | except ExtensionNotFound: | 
|  | logging.warning("CertificateSKID not found, continue...") | 
|  |  | 
|  | def format_lookup_key(self, base64_name: str, skid_hex: str) -> str: | 
|  | ''' | 
|  | Get formatted key used in this class to lookup certificates. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | base64_name: str | 
|  | Base64 encoded subject name. | 
|  | skid_hex: str | 
|  | Subject Key ID in hex format. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | str: | 
|  | Key used in this class to lookup certificates. | 
|  | ''' | 
|  | delimiter = '/' | 
|  | skid_hex_formatted = self.get_formatted_hex_skid(skid_hex) | 
|  | return delimiter.join([base64_name, skid_hex_formatted]) | 
|  |  | 
|  | def get_crls(self, unread_crls: []) -> list[x509.CertificateRevocationList]: | 
|  | ''' | 
|  | Get CRLs from list of files. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | unread_crls: list | 
|  | List of CRL files. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[x509.CertificateRevocationList] | 
|  | List of CRLs. | 
|  | ''' | 
|  | crls = [] | 
|  | for file in unread_crls: | 
|  | crl_content = file.read() | 
|  | crl_file = x509.load_der_x509_crl(crl_content) | 
|  | crls.append(crl_file) | 
|  | return crls | 
|  |  | 
|  | def get_authoritative_certificates(self, dcl_certificates: []) -> dict[str, x509.Certificate]: | 
|  | ''' | 
|  | Get certificates from revocation points response file and list of provided dcl certificates. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | dcl_certificates: list | 
|  | List of certificate files. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | dict[str, x509.CertificateRevocationList] | 
|  | Dictionary of certificates, keyed by lookup key. | 
|  | ''' | 
|  | certificates = {} | 
|  | logging.debug(f"Loading certificates from {dcl_certificates}") | 
|  | if dcl_certificates: | 
|  | for file in dcl_certificates: | 
|  | logging.debug(f"Loading certificate from {file}") | 
|  | # with open(file, "r") as f: | 
|  | certificate = x509.load_pem_x509_certificate(file.read()) | 
|  | certificates[self.get_lookup_key(certificate)] = certificate | 
|  |  | 
|  | logging.debug("Loading certificates from revocation_points_response file.") | 
|  | for point in self.revocation_points: | 
|  | if point.crlSignerDelegator: | 
|  | certificate = x509.load_pem_x509_certificate(bytes(point.crlSignerDelegator, 'utf-8')) | 
|  | certificates[self.get_lookup_key(certificate)] = certificate | 
|  | elif point.crlSignerCertificate: | 
|  | certificate = x509.load_pem_x509_certificate(bytes(point.crlSignerCertificate, 'utf-8')) | 
|  | certificates[self.get_lookup_key(certificate)] = certificate | 
|  | return certificates | 
|  |  | 
|  | def get_revocation_points(self) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points from DCL. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points. | 
|  | ''' | 
|  | return self.revocation_points | 
|  |  | 
|  | def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: | 
|  | ''' | 
|  | Get revocation points by subject key ID | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | issuer_subject_key_id: str | 
|  | Subject key ID. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | list[RevocationPoint] | 
|  | List of revocation points with the same issuer subject key ID. | 
|  | ''' | 
|  | same_issuer_points = [] | 
|  | for point in self.revocation_points: | 
|  | if point.issuerSubjectKeyID == issuer_subject_key_id: | 
|  | same_issuer_points.append(point) | 
|  | return same_issuer_points | 
|  |  | 
|  | def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: | 
|  | ''' | 
|  | Get certificate from DCL | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | subject_name: x509.name.Name | 
|  | Subject name object. | 
|  |  | 
|  | skid_hex: str | 
|  | Subject Key ID in hex format. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | tuple[bool, x509.Certificate] | 
|  | Tuple of is_paa and the certificate from the DCL. | 
|  | ''' | 
|  | lookup_key = self.format_lookup_key(get_b64_name(subject_name), skid_hex) | 
|  | if lookup_key in self.authoritative_certs: | 
|  | return is_self_signed_certificate(self.authoritative_certs[lookup_key]), self.authoritative_certs[lookup_key] | 
|  | return False, None | 
|  |  | 
|  | def get_crl_file(self, | 
|  | unused_revocation_point: RevocationPoint, | 
|  | crl_signer_certificate: x509.Certificate) -> x509.CertificateRevocationList: | 
|  | ''' | 
|  | Obtain the CRL. | 
|  |  | 
|  | Parameters | 
|  | ---------- | 
|  | unused_revocation_point: RevocationPoint | 
|  | Revocation point. Not used. | 
|  |  | 
|  | crl_signer_certificate: x509.Certificate | 
|  | Crl signer certificate. | 
|  |  | 
|  | Returns | 
|  | ------- | 
|  | x509.CertificateRevocationList | 
|  | CRL signed by the CRL signer certificate. | 
|  | ''' | 
|  | for crl in self.crls: | 
|  | if crl.issuer.public_bytes() == crl_signer_certificate.subject.public_bytes(): | 
|  | logging.debug(f"Found CRL for issuer: {crl.issuer.rfc4514_string()}") | 
|  | return crl | 
|  | return None | 
|  |  | 
|  |  | 
|  | @click.group() | 
|  | def cli(): | 
|  | pass | 
|  |  | 
|  |  | 
|  | @cli.command('from-dcl') | 
|  | @click.help_option('-h', '--help') | 
|  | @optgroup.group('Input data sources', cls=RequiredMutuallyExclusiveOptionGroup) | 
|  | @optgroup.option('--use-main-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring MainNet.") | 
|  | @optgroup.option('--use-test-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring TestNet.") | 
|  | @optgroup.option('--use-main-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public MainNet observer.") | 
|  | @optgroup.option('--use-test-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public TestNet observer.") | 
|  | @optgroup.option('--use-local-data', is_flag=True, type=bool, help="Fake response directory: see \" DATA_DIR/",) | 
|  | @optgroup.group('Required arguments if use-local-data is used', cls=AllOptionGroup) | 
|  | @optgroup.option('--certificates', type=click.File('rb'), multiple=True, help='Paths to PEM formated certificates (i.e. PAA) in DCL but missing from the revocation-points-response file.') | 
|  | @optgroup.option('--crls', type=click.File('rb'), multiple=True, help='Paths to the crl der files') | 
|  | @optgroup.option('--revocation-points-response', type=click.File('rb'), help='Path to the get-revocation-points response json file.') | 
|  | @optgroup.group('Optional output arguments') | 
|  | @optgroup.option('--output', default='sample_revocation_set_list.json', type=str, metavar='FILEPATH', | 
|  | help="Output filename (default: sample_revocation_set_list.json)") | 
|  | @optgroup.option('--log-level', default='INFO', show_default=True, type=click.Choice(__LOG_LEVELS__.keys(), | 
|  | case_sensitive=False), callback=lambda c, p, v: __LOG_LEVELS__[v], | 
|  | help='Determines the verbosity of script output') | 
|  | def from_dcl(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, use_local_data: bool, revocation_points_response: str, crls: [], certificates: [], output: str, log_level: str): | 
|  | """Generate revocation set from DCL using generation algorithm from Matter Spec section 6.2.4.1.""" | 
|  | logging.basicConfig( | 
|  | level=log_level, | 
|  | format='%(asctime)s %(name)s %(levelname)-7s %(message)s', | 
|  | datefmt='%Y-%m-%d %H:%M:%S' | 
|  | ) | 
|  |  | 
|  | if use_local_data: | 
|  | dcld_client = LocalFilesDclClient(crls, certificates, revocation_points_response) | 
|  | elif use_main_net_http or use_test_net_http: | 
|  | dcld_client = RestDclClient(True if use_test_net_http else False) | 
|  | else: | 
|  | dcld_client = NodeDclClient(use_main_net_dcld or use_test_net_dcld, True if use_test_net_dcld else False) | 
|  |  | 
|  | revocation_point_list = dcld_client.get_revocation_points() | 
|  | revocation_set = [] | 
|  | for revocation_point in revocation_point_list: | 
|  | # 1. Validate Revocation Type | 
|  | if revocation_point.revocationType != RevocationType.CRL.value: | 
|  | logging.warning("Revocation Type is not CRL, continue...") | 
|  | continue | 
|  |  | 
|  | # 2. Parse the certificate | 
|  | try: | 
|  | crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point.crlSignerCertificate, 'utf-8')) | 
|  | except Exception: | 
|  | logging.warning("CRL Signer Certificate is not valid, continue...") | 
|  | continue | 
|  |  | 
|  | # Parse the crl signer delegator | 
|  | crl_signer_delegator_cert = None | 
|  | if revocation_point.crlSignerDelegator: | 
|  | crl_signer_delegator_cert_pem = revocation_point.crlSignerDelegator | 
|  | logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}") | 
|  | try: | 
|  | crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8')) | 
|  | except Exception: | 
|  | logging.warning("CRL Signer Delegator Certificate not found...") | 
|  |  | 
|  | # 3. and 4. Validate VID/PID | 
|  | if not validate_vid_pid(revocation_point, crl_signer_certificate, crl_signer_delegator_cert): | 
|  | logging.warning("Failed to validate VID/PID, continue...") | 
|  | continue | 
|  |  | 
|  | # 5. Validate the certification path containing CRLSignerCertificate. | 
|  | paa_certificate_object = dcld_client.get_paa_cert(crl_signer_certificate) | 
|  | if paa_certificate_object is None: | 
|  | logging.warning("PAA Certificate not found, continue...") | 
|  | continue | 
|  |  | 
|  | if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False: | 
|  | logging.warning("Failed to validate CRL Signer Certificate chain, continue...") | 
|  | continue | 
|  |  | 
|  | # 6. Obtain the CRL | 
|  | crl_file = dcld_client.get_crl_file(revocation_point, crl_signer_certificate) | 
|  | if crl_file is None: | 
|  | logging.warning("CRL file not found for revocation point, continue...") | 
|  | continue | 
|  |  | 
|  | # 7. Perform CRL File Validation | 
|  | # a. | 
|  | try: | 
|  | crl_signer_skid = get_skid(crl_signer_certificate) | 
|  | except ExtensionNotFound: | 
|  | logging.warning("CRL Signer SKID not found, continue...") | 
|  | continue | 
|  | try: | 
|  | crl_akid = get_akid(crl_file) | 
|  | except ExtensionNotFound: | 
|  | logging.warning("CRL AKID is not found, continue...") | 
|  | continue | 
|  | if crl_akid != crl_signer_skid: | 
|  | logging.warning("CRL AKID is not CRL Signer SKID, continue...") | 
|  | continue | 
|  |  | 
|  | # b. | 
|  | same_issuer_points = dcld_client.get_revocation_points_by_skid(crl_akid) | 
|  | count_with_matching_vid_issuer_skid = sum(item.vid == revocation_point.vid for item in same_issuer_points) | 
|  |  | 
|  | if count_with_matching_vid_issuer_skid > 1: | 
|  | try: | 
|  | issuing_distribution_point = crl_file.extensions.get_extension_for_oid( | 
|  | x509.oid.ExtensionOID.ISSUING_DISTRIBUTION_POINT).value | 
|  | except Exception: | 
|  | logging.warning("CRL Issuing Distribution Point not found, continue...") | 
|  | continue | 
|  |  | 
|  | uri_list = issuing_distribution_point.full_name | 
|  | if len(uri_list) == 1 and isinstance(uri_list[0], x509.UniformResourceIdentifier): | 
|  | if uri_list[0].value != revocation_point.dataURL: | 
|  | logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...") | 
|  | continue | 
|  | else: | 
|  | logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...") | 
|  | continue | 
|  |  | 
|  | # TODO: 8. Validate CRL as per Section 6.3 of RFC 5280 | 
|  |  | 
|  | # 9. Decide on certificate authority name and AKID | 
|  | certificate_authority_name, certificate_akid_hex = get_certificate_authority_details( | 
|  | crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point.isPAA) | 
|  |  | 
|  | # validate issuer skid matchces with the one in revocation points | 
|  | logging.debug(f"revocation_point.issuerSubjectKeyID: {revocation_point.issuerSubjectKeyID}") | 
|  |  | 
|  | if revocation_point.issuerSubjectKeyID != certificate_akid_hex: | 
|  | logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...") | 
|  | continue | 
|  |  | 
|  | # 10. Iterate through the Revoked Certificates List | 
|  | entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate, | 
|  | certificate_authority_name, certificate_akid_hex, crl_signer_delegator_cert) | 
|  | logging.debug(f"Entry to append: {entry}") | 
|  | revocation_set.append(entry) | 
|  |  | 
|  | with open(output, 'w+') as outfile: | 
|  | json.dump([revocation.asDict() for revocation in revocation_set], outfile, indent=4) | 
|  |  | 
|  |  | 
|  | class TestRevocationSetGeneration(unittest.TestCase): | 
|  | """Test class for revocation set generation""" | 
|  |  | 
|  | def setUp(self): | 
|  | # Get the directory containing this file | 
|  | self.test_base_dir = os.path.dirname(os.path.abspath(__file__)) | 
|  |  | 
|  | def get_test_file_path(self, filename): | 
|  | return os.path.join(self.test_base_dir, 'test', filename) | 
|  |  | 
|  | def get_expected_revocation_set(self, idx): | 
|  | with open(os.path.join(self.test_base_dir, 'test/revoked-attestation-certificates/revocation-sets/revocation-set.json'), 'r') as f: | 
|  | return RevocationSet(**json.load(f)[idx]) | 
|  |  | 
|  | def compare_revocation_sets(self, generated_set, expected): | 
|  | # Compare required fields | 
|  | self.assertEqual(generated_set.type, expected.type) | 
|  | self.assertEqual(generated_set.issuer_subject_key_id, expected.issuer_subject_key_id) | 
|  | self.assertEqual(generated_set.issuer_name, expected.issuer_name) | 
|  | self.assertEqual(set(generated_set.revoked_serial_numbers), set(expected.revoked_serial_numbers)) | 
|  | self.assertEqual(generated_set.crl_signer_cert, expected.crl_signer_cert) | 
|  |  | 
|  | # Compare optional fields if present in either set | 
|  | if generated_set.crl_signer_delegator or expected.crl_signer_delegator: | 
|  | self.assertEqual(generated_set.crl_signer_delegator, expected.crl_signer_delegator, | 
|  | f'CRL signer delegator certificates do not match, expected: {expected.crl_signer_delegator}, actual: {generated_set.crl_signer_delegator}') | 
|  |  | 
|  | def test_paa_revocation_set(self): | 
|  | """Test generation of PAA revocation set""" | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-CRL.pem'), 'rb') as f: | 
|  | crl = x509.load_pem_x509_crl(f.read()) | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f: | 
|  | crl_signer = x509.load_pem_x509_certificate(f.read()) | 
|  |  | 
|  | ca_name_b64, ca_akid_hex = get_certificate_authority_details( | 
|  | crl_signer, None, None, True) | 
|  | revocation_set = generate_revocation_set_from_crl( | 
|  | crl, crl_signer, ca_name_b64, ca_akid_hex, None) | 
|  |  | 
|  | self.compare_revocation_sets(revocation_set, self.get_expected_revocation_set(0)) | 
|  |  | 
|  | def test_pai_revocation_set(self): | 
|  | """Test generation of PAI revocation set""" | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-CRL.pem'), 'rb') as f: | 
|  | crl = x509.load_pem_x509_crl(f.read()) | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-Cert.pem'), 'rb') as f: | 
|  | crl_signer = x509.load_pem_x509_certificate(f.read()) | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f: | 
|  | paa = x509.load_pem_x509_certificate(f.read()) | 
|  |  | 
|  | ca_name_b64, ca_akid_hex = get_certificate_authority_details( | 
|  | crl_signer, None, paa, False) | 
|  | revocation_set = generate_revocation_set_from_crl( | 
|  | crl, crl_signer, ca_name_b64, ca_akid_hex, None) | 
|  |  | 
|  | self.compare_revocation_sets(revocation_set, self.get_expected_revocation_set(1)) | 
|  |  | 
|  | def test_revoked_pai_revocation_set(self): | 
|  | """Test generation of revocation set of revoked PAI""" | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAI-FFF1-noPID-Revoked-CRL.pem'), 'rb') as f: | 
|  | crl = x509.load_pem_x509_crl(f.read()) | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAI-FFF1-noPID-Revoked-Cert.pem'), 'rb') as f: | 
|  | crl_signer = x509.load_pem_x509_certificate(f.read()) | 
|  | with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f: | 
|  | paa = x509.load_pem_x509_certificate(f.read()) | 
|  |  | 
|  | ca_name_b64, ca_akid_hex = get_certificate_authority_details( | 
|  | crl_signer, None, paa, False) | 
|  | revocation_set = generate_revocation_set_from_crl( | 
|  | crl, crl_signer, ca_name_b64, ca_akid_hex, None) | 
|  |  | 
|  | self.compare_revocation_sets(revocation_set, self.get_expected_revocation_set(2)) | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | if len(sys.argv) > 1 and sys.argv[1] == 'test': | 
|  | # Remove the 'test' argument and run tests | 
|  | sys.argv.pop(1) | 
|  | unittest.main() | 
|  | elif len(sys.argv) == 1: | 
|  | cli.main(['--help']) | 
|  | else: | 
|  | cli() |