blob: bb2f5b6e9a8d33d622c5250b937e6f0d95d2eea0 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2023-2024 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Note: py cryptography supports indirect CRLs trom version 44.0.0 and above.
# You may need to update your cryptography version.
#
# Generates a basic RevocationSet from TestNet
# Usage:
# python ./credentials/generate-revocation-set.py --help
import base64
import dataclasses
import json
import logging
import os
import subprocess
import sys
import unittest
from enum import Enum
from typing import Optional
import click
import requests
from click_option_group import AllOptionGroup, RequiredMutuallyExclusiveOptionGroup, optgroup
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.extensions import ExtensionNotFound
from cryptography.x509.oid import NameOID
# Supported log levels, mapping string values required for argument
# parsing into logging constants
__LOG_LEVELS__ = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warn': logging.WARN,
'fatal': logging.FATAL,
}
class RevocationType(Enum):
CRL = 1
class CertVerificationResult(Enum):
SUCCESS = 1
SKID_NOT_FOUND = 2
AKID_NOT_FOUND = 3
SIGNATURE_VERIFICATION_FAILED = 4
ISSUER_MISMATCH = 5
AKID_MISMATCH = 6
@dataclasses.dataclass
class RevocationPoint:
vid: int
label: str
issuerSubjectKeyID: str
pid: int
isPAA: bool
crlSignerCertificate: str
dataURL: str
dataFileSize: str
dataDigest: str
dataDigestType: int
revocationType: int
schemaVersion: int
crlSignerDelegator: str
@dataclasses.dataclass
class RevocationSet:
type: str
issuer_subject_key_id: str
issuer_name: str
revoked_serial_numbers: [str]
crl_signer_cert: str
crl_signer_delegator: str = None
def asDict(self):
return dataclasses.asdict(self)
OID_VENDOR_ID = x509.ObjectIdentifier("1.3.6.1.4.1.37244.2.1")
OID_PRODUCT_ID = x509.ObjectIdentifier("1.3.6.1.4.1.37244.2.2")
PRODUCTION_NODE_URL = "https://on.dcl.csa-iot.org:26657"
PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org"
TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org"
def extract_single_integer_attribute(subject, oid):
attribute_list = subject.get_attributes_for_oid(oid)
if len(attribute_list) == 1:
return int(attribute_list[0].value, 16)
return None
def extract_fallback_tag_from_common_name(cn, marker):
val_len = 4
start_idx = cn.find(marker)
if start_idx != -1:
val_start_idx = start_idx + len(marker)
val = cn[val_start_idx:val_start_idx + val_len]
return int(val, 16) if len(val) == 4 else None
return None
def parse_vid_pid_from_distinguished_name(distinguished_name):
# VID/PID encoded using Matter specific RDNs
vid = extract_single_integer_attribute(distinguished_name, OID_VENDOR_ID)
pid = extract_single_integer_attribute(distinguished_name, OID_PRODUCT_ID)
# Fallback method to get the VID/PID, encoded in CN as "Mvid:FFFF Mpid:1234"
if vid is None and pid is None:
cn = distinguished_name.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
vid = extract_fallback_tag_from_common_name(cn, 'Mvid:')
pid = extract_fallback_tag_from_common_name(cn, 'Mpid:')
return vid, pid
def get_akid(cert: x509.Certificate) -> str:
return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier.hex().upper()
def get_skid(cert: x509.Certificate) -> str:
return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier.hex().upper()
def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> CertVerificationResult:
'''
Verifies if the cert is signed by root.
'''
try:
cert_akid = get_akid(cert)
except ExtensionNotFound:
return CertVerificationResult.AKID_NOT_FOUND
try:
root_skid = get_skid(root)
except ExtensionNotFound:
return CertVerificationResult.SKID_NOT_FOUND
if cert_akid != root_skid:
return CertVerificationResult.AKID_MISMATCH
if cert.issuer != root.subject:
return CertVerificationResult.ISSUER_MISMATCH
# public_key().verify() do not return anything if signature is valid,
# will raise an exception if signature is invalid
try:
root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm))
except Exception:
return CertVerificationResult.SIGNATURE_VERIFICATION_FAILED
return CertVerificationResult.SUCCESS
def is_self_signed_certificate(cert: x509.Certificate) -> bool:
result = verify_cert(cert, cert)
if result == CertVerificationResult.SUCCESS:
return True
else:
logging.debug(
f"Certificate with subject: {cert.subject.rfc4514_string()} is not a valid self-signed certificate. Result: {result.name}")
return False
# delegator is optional so can be None, but crl_signer and paa has to be present
def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509.Certificate, paa: x509.Certificate):
'''
There could be four scenarios:
1. CRL Signer is PAA itself, hence its self-signed certificate
2. CRL Signer is PAI certificate, and we can validate (crl_signer -> paa) chain
3. CRL Signer delegator is PAA, and we can validate (crl_signer -> crl_signer_delegator(paa) -> paa) chain
4. CRL Signer delegator is PAI, and we can validate (crl_signer -> crl_signer_delegator -> paa) chain
'''
if crl_signer_delegator:
result_signer = verify_cert(crl_signer, crl_signer_delegator)
if not result_signer == CertVerificationResult.SUCCESS:
logging.debug(
f"Cannot verify certificate subject: {crl_signer.subject.rfc4514_string()} issued by certificate subject: {crl_signer_delegator.subject.rfc4514_string()}. Result: {result_signer.name}")
return False
result_delegator = verify_cert(crl_signer_delegator, paa)
if not result_delegator == CertVerificationResult.SUCCESS:
logging.debug(
f"Cannot verify certificate subject: {crl_signer_delegator.subject.rfc4514_string()} issued by certificate subject: {paa.subject.rfc4514_string()}. Result: {result_delegator.name}")
return False
return True
else:
result = verify_cert(crl_signer, paa)
if not result == CertVerificationResult.SUCCESS:
logging.debug(
f"Cannot verify certificate subject: {crl_signer.subject.rfc4514_string()} issued by certificate subject: {paa.subject.rfc4514_string()}. Result: {result.name}")
return False
return True
def validate_vid_pid(revocation_point: RevocationPoint, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool:
crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)
logging.debug(f"vid: {revocation_point.vid})")
logging.debug(f"crl_signer_vid: {crl_signer_vid})")
if revocation_point.isPAA:
if crl_signer_vid is not None:
if revocation_point.vid != crl_signer_vid:
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
return False
else:
vid_to_match = crl_signer_vid
pid_to_match = crl_signer_pid
# if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator
if crl_signer_delegator_certificate:
vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject)
logging.debug(f"vid_to_match: {vid_to_match})")
logging.debug(f"pid_to_match: {pid_to_match})")
if vid_to_match is None or revocation_point.vid != vid_to_match:
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
return False
if pid_to_match is not None:
if revocation_point.pid != pid_to_match:
logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...")
return False
return True
def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList,
crl_signer_certificate: x509.Certificate,
certificate_authority_name: x509.Name,
certificate_akid_hex: str,
crl_signer_delegator_cert: x509.Certificate) -> RevocationSet:
"""Generate a revocation set from a CRL file.
Args:
crl_file: The CRL object containing revoked certificates
crl_signer_certificate: The certificate object used to sign the CRL
certificate_authority_name: x509.Name of the issuer
certificate_akid_hex: Hex encoded Authority Key Identifier
crl_signer_delegator_cert: crl signer delegator certificate object
Returns:
RevocationSet containing the revocation set data with fields:
- type: "revocation_set"
- issuer_subject_key_id: Authority Key Identifier (hex)
- issuer_name: Issuer name (base64)
- revoked_serial_numbers: List of revoked serial numbers
- crl_signer_cert: CRL signer certificate (base64 DER)
- crl_signer_delegator: Optional delegator certificate (base64 DER)
"""
serialnumber_list = []
for revoked_cert in crl_file:
try:
cert_issuer_entry_ext = revoked_cert.extensions.get_extension_for_oid(x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER)
revoked_cert_issuer = cert_issuer_entry_ext.value.get_values_for_type(x509.DirectoryName)[0]
if revoked_cert_issuer is not None:
# check if this really are the same thing
if revoked_cert_issuer != x509.DirectoryName(certificate_authority_name).value:
logging.warning("CRL entry issuer is not CRL File Issuer, continue...")
continue
except Exception:
pass
# Ensure the serial number is always a 2-byte aligned hex string.
# TestDACRevocationDelegateImpl encodes the serial number as an even-length hex string
# using BytesToHex in src/lib/support/BytesToHex.cpp.
# As the primary consumer of this data, we should use the same here.
serialnumber = '{:02X}'.format(revoked_cert.serial_number)
serialnumber = serialnumber if len(serialnumber) % 2 == 0 else '0' + serialnumber
serialnumber_list.append(serialnumber)
entry = RevocationSet(
type='revocation_set',
issuer_subject_key_id=certificate_akid_hex,
issuer_name=get_b64_name(certificate_authority_name),
revoked_serial_numbers=serialnumber_list,
crl_signer_cert=base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
)
if crl_signer_delegator_cert:
entry.crl_signer_delegator = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8')
return entry
# This is implemented as per point (9) in 6.2.4.1. Conceptual algorithm for revocation set construction
def get_certificate_authority_details(crl_signer_certificate: x509.Certificate,
crl_signer_delegator_cert: x509.Certificate,
paa_certificate_object: x509.Certificate,
is_paa: bool) -> tuple[x509.Name, str]:
"""Get certificate authority name and AKID based on certificate hierarchy.
Args:
crl_signer_certificate: The CRL signer certificate
crl_signer_delegator_cert: Optional delegator certificate
paa_certificate_object: Optional PAA certificate
is_paa: Whether this is a PAA certificate
Returns:
tuple[str, str]: (certificate_authority_name, certificate_akid_hex)
"""
if is_paa and not is_self_signed_certificate(crl_signer_certificate):
cert_for_details = paa_certificate_object
logging.debug("Using PAA certificate for details")
elif crl_signer_delegator_cert:
cert_for_details = crl_signer_delegator_cert
logging.debug("Using CRL Signer Delegator certificate for details")
else:
cert_for_details = crl_signer_certificate
logging.debug("Using CRL Signer certificate for details")
certificate_authority_name = cert_for_details.subject
try:
certificate_akid = get_skid(cert_for_details)
logging.debug(f"Certificate Authority Name: {certificate_authority_name}")
logging.debug(f"Certificate AKID: {certificate_akid}")
return certificate_authority_name, certificate_akid
except ExtensionNotFound:
logging.warning("Certificate SKID not found in authoarity certificate.")
def get_b64_name(name: x509.name.Name) -> str:
'''
Get base64 encoded name
'''
return base64.b64encode(name.public_bytes()).decode('utf-8')
def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList:
logging.debug(f"Fetching CRL from {url}")
try:
r = requests.get(url, timeout=timeout)
logging.debug(f"Fetched CRL: {r.content}")
return x509.load_der_x509_crl(r.content)
except Exception as e:
logging.error('Failed to fetch a valid CRL', e)
class DclClientInterface:
'''
An interface for interacting with DCLD.
'''
def send_get_request(self, url: str) -> dict:
'''
Send a GET request for a json object.
'''
try:
response = requests.get(url).json()
return response
except Exception as e:
logging.error(f"Failed to fetch {url}: {e}")
return None
def get_revocation_points(self) -> list[RevocationPoint]:
'''
Get revocation points from DCL
Returns
-------
list[RevocationPoint]
List of revocation points
'''
raise NotImplementedError
def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]:
'''
Get revocation points by subject key ID
Parameters
----------
issuer_subject_key_id: str
Subject key ID
Returns
-------
list[RevocationPoint]
List of revocation points
'''
raise NotImplementedError
def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]:
'''
Get certificate from DCL.
'''
raise NotImplementedError
def get_only_approved_certificate(self, response: dict, skid_hex: str) -> tuple[bool, Optional[x509.Certificate]]:
'''
Get only approved certificate from DCL resposne.
'''
if response is None or not response.get("approvedCertificates", {}).get("certs", []):
raise requests.exception.NotFound(f"No certificate found for {skid_hex}")
if len(response["approvedCertificates"]["certs"]) > 1:
raise ValueError(f"Multiple certificates found for {skid_hex}")
issuer_certificate = x509.load_pem_x509_certificate(bytes(response["approvedCertificates"]["certs"][0]["pemCert"], "utf-8"))
return response["approvedCertificates"]["certs"][0]["isRoot"], issuer_certificate
def get_paa_cert(self, initial_cert: x509.Certificate) -> Optional[x509.Certificate]:
'''
Get the PAA certificate for the CRL Signer Certificate.
'''
issuer_name = initial_cert.issuer
try:
akid = get_akid(initial_cert)
except ExtensionNotFound:
logging.warning('Certificate AKID not found.')
return
paa_certificate = None
while not paa_certificate:
try:
is_root, issuer_certificate = self.get_approved_certificate(issuer_name, akid)
if is_root:
paa_certificate = issuer_certificate
break
except Exception as e:
logging.error('Failed to get PAA certificate', e)
return
logging.debug(f"issuer_name: {issuer_certificate.subject.rfc4514_string()}")
issuer_name = issuer_certificate.issuer
try:
akid = get_akid(issuer_certificate)
except ExtensionNotFound:
logging.warning('Issuer Certificate AKID not found.')
logging.debug(f"akid: {akid}")
if paa_certificate is None:
logging.warning("PAA Certificate not found, continue...")
return paa_certificate
def get_crl_file(self,
revocation_point: RevocationPoint,
crl_signer_certificate: x509.Certificate) -> x509.CertificateRevocationList:
"""Obtain the CRL."""
try:
r = requests.get(revocation_point.dataURL, timeout=5)
logging.debug(f"Fetched CRL: {r.content}")
return x509.load_der_x509_crl(r.content)
except Exception:
logging.warning(f"Failed to fetch a valid CRL for': {crl_signer_certificate.subject.rfc4514_string()}")
def get_formatted_hex_skid(self, skid_hex: str) -> str:
return ':'.join([skid_hex[i:i+2] for i in range(0, len(skid_hex), 2)])
class NodeDclClient(DclClientInterface):
'''
A client for interacting with DCLD using command line interface (CLI).
'''
def __init__(self, dcld_exe: str, use_test_net: bool):
'''
Initialize the client.
dcld_exe: str
Path to `dcld` executable.
use_test_net: bool
Indicates if the client should use TestNet or MainNet URL with dcld executable.
'''
self.dcld_exe = dcld_exe
self.use_test_net = use_test_net
def build_dcld_command_line(self, cmdlist: list[str]) -> list[str]:
'''
Build command line for `dcld` executable.
Parameters
----------
cmdlist: list[str]
List of command line arguments to append to some predefined arguments.
Returns
-------
list[str]
The complete command list including the DCLD executable and node option if in production.
'''
return [self.dcld_exe] + cmdlist + ([] if self.use_test_net else ['--node', PRODUCTION_NODE_URL])
def get_dcld_cmd_output_json(self, cmdlist: list[str]) -> dict:
'''
Executes a DCLD CLI command and returns the JSON output.
Parameters
----------
cmdlist: list[str]
List of command line arguments to append to some predefined arguments.
Returns
-------
dict
The JSON output from the command.
'''
# Set the output as JSON
subprocess.Popen([self.dcld_exe, 'config', 'output', 'json'])
cmdpipe = subprocess.Popen(self.build_dcld_command_line(cmdlist),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return json.loads(cmdpipe.stdout.read())
def get_revocation_points(self) -> list[RevocationPoint]:
'''
Get revocation points from DCL.
Returns
-------
list[RevocationPoint]
List of revocation points.
'''
response = self.get_dcld_cmd_output_json(['query', 'pki', 'all-revocation-points'])
return [RevocationPoint(**r) for r in response["PkiRevocationDistributionPoint"]]
def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]:
'''
Get revocation points by subject key ID.
Parameters
----------
issuer_subject_key_id: str
Subject key ID.
Returns
-------
list[RevocationPoint]
List of revocation points.
'''
response = self.get_dcld_cmd_output_json(['query', 'pki', 'revocation-points',
'--issuer-subject-key-id', issuer_subject_key_id])
logging.debug(f"Response revocation points: {response}")
return [RevocationPoint(**r) for r in response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]]
def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]:
'''
Get certificate from DCL.
Parameters
----------
subject_name: x509.name.Name
Subject Name object.
skid_hex: str
Subject Key ID in hex format.
Returns
-------
tuple[bool, x509.Certificate]
Tuple of is_paa and the certificate from the DCL.
'''
subject_name_b64 = get_b64_name(subject_name)
query_cmd_list = ['query', 'pki', 'x509-cert', '-u', subject_name_b64, '-k', skid_hex]
logging.debug(
f"Fetching issuer from dcl query{' '.join(query_cmd_list)}")
response = self.get_dcld_cmd_output_json(query_cmd_list)
return self.get_only_approved_certificate(response, skid_hex)
class RestDclClient(DclClientInterface):
'''
A client for interacting with DCLD using the REST API.
'''
def __init__(self, use_test_net: bool):
'''
Initialize the client.
use_test_net: bool
Indicates if the client should use TestNet or MainNet REST API URL.
'''
self.rest_node_url = TEST_NODE_URL_REST if use_test_net else PRODUCTION_NODE_URL_REST
def get_revocation_points(self) -> list[RevocationPoint]:
'''
Get revocation points from DCL.
Returns
-------
list[RevocationPoint]
List of revocation points.
'''
response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points")
return [RevocationPoint(**r) for r in response["PkiRevocationDistributionPoint"]]
def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]:
'''
Get revocation points by subject key ID.
Parameters
----------
issuer_subject_key_id: str
Subject key ID.
Returns
-------
list[RevocationPoint]
List of revocation points.
'''
response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}")
return [RevocationPoint(**r) for r in response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]]
def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]:
'''
Get certificate from DCL.
Parameters
----------
subject_name: x509.name.Name
Subject Name object.
skid_hex: str
Subject Key ID in hex format.
Returns
-------
tuple[bool, x509.Certificate]
Tuple of is_paa and the certificate from the DCL.
'''
logging.debug(
f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{get_b64_name(subject_name)}/{self.get_formatted_hex_skid(skid_hex)}")
response = self.send_get_request(
f"{self.rest_node_url}/dcl/pki/certificates/{get_b64_name(subject_name)}/{self.get_formatted_hex_skid(skid_hex)}")
logging.debug(f"Response certificate: {response}")
return self.get_only_approved_certificate(response, skid_hex)
class LocalFilesDclClient(DclClientInterface):
'''
A client for interacting with local DLCD response data.
'''
def __init__(self, crls: [], dcl_certificates: [], revocation_points_response_file: str):
'''
Initialize the client.
Parameters
----------
crls: list
List of CRL files.
dcl_certificates: list
List of certificate files.
revocation_points_response_file: str
Path to the get-revocation-points response json file.
'''
logging.debug(f"Loading certificates from {dcl_certificates}")
logging.debug(f"Loading crls from {crls}")
logging.debug(f"Loading revocation points response from {revocation_points_response_file}")
self.crls = self.get_crls(crls)
self.revocation_points = [RevocationPoint(**r)
for r in json.load(revocation_points_response_file)["PkiRevocationDistributionPoint"]]
self.authoritative_certs = self.get_authoritative_certificates(dcl_certificates)
def get_lookup_key(self, certificate: x509.Certificate) -> str:
'''
Get key used in this class to lookup certificates.
Parameters
----------
certificate: x509.Certificate
Certificate object.
Returns
-------
str:
lookup key derived from the certificate.
'''
base64_name = get_b64_name(certificate.subject)
try:
skid = get_skid(certificate)
return self.format_lookup_key(base64_name, skid)
except ExtensionNotFound:
logging.warning("CertificateSKID not found, continue...")
def format_lookup_key(self, base64_name: str, skid_hex: str) -> str:
'''
Get formatted key used in this class to lookup certificates.
Parameters
----------
base64_name: str
Base64 encoded subject name.
skid_hex: str
Subject Key ID in hex format.
Returns
-------
str:
Key used in this class to lookup certificates.
'''
delimiter = '/'
skid_hex_formatted = self.get_formatted_hex_skid(skid_hex)
return delimiter.join([base64_name, skid_hex_formatted])
def get_crls(self, unread_crls: []) -> list[x509.CertificateRevocationList]:
'''
Get CRLs from list of files.
Parameters
----------
unread_crls: list
List of CRL files.
Returns
-------
list[x509.CertificateRevocationList]
List of CRLs.
'''
crls = []
for file in unread_crls:
crl_content = file.read()
crl_file = x509.load_der_x509_crl(crl_content)
crls.append(crl_file)
return crls
def get_authoritative_certificates(self, dcl_certificates: []) -> dict[str, x509.Certificate]:
'''
Get certificates from revocation points response file and list of provided dcl certificates.
Parameters
----------
dcl_certificates: list
List of certificate files.
Returns
-------
dict[str, x509.CertificateRevocationList]
Dictionary of certificates, keyed by lookup key.
'''
certificates = {}
logging.debug(f"Loading certificates from {dcl_certificates}")
if dcl_certificates:
for file in dcl_certificates:
logging.debug(f"Loading certificate from {file}")
# with open(file, "r") as f:
certificate = x509.load_pem_x509_certificate(file.read())
certificates[self.get_lookup_key(certificate)] = certificate
logging.debug("Loading certificates from revocation_points_response file.")
for point in self.revocation_points:
if point.crlSignerDelegator:
certificate = x509.load_pem_x509_certificate(bytes(point.crlSignerDelegator, 'utf-8'))
certificates[self.get_lookup_key(certificate)] = certificate
elif point.crlSignerCertificate:
certificate = x509.load_pem_x509_certificate(bytes(point.crlSignerCertificate, 'utf-8'))
certificates[self.get_lookup_key(certificate)] = certificate
return certificates
def get_revocation_points(self) -> list[RevocationPoint]:
'''
Get revocation points from DCL.
Returns
-------
list[RevocationPoint]
List of revocation points.
'''
return self.revocation_points
def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]:
'''
Get revocation points by subject key ID
Parameters
----------
issuer_subject_key_id: str
Subject key ID.
Returns
-------
list[RevocationPoint]
List of revocation points with the same issuer subject key ID.
'''
same_issuer_points = []
for point in self.revocation_points:
if point.issuerSubjectKeyID == issuer_subject_key_id:
same_issuer_points.append(point)
return same_issuer_points
def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]:
'''
Get certificate from DCL
Parameters
----------
subject_name: x509.name.Name
Subject name object.
skid_hex: str
Subject Key ID in hex format.
Returns
-------
tuple[bool, x509.Certificate]
Tuple of is_paa and the certificate from the DCL.
'''
lookup_key = self.format_lookup_key(get_b64_name(subject_name), skid_hex)
if lookup_key in self.authoritative_certs:
return is_self_signed_certificate(self.authoritative_certs[lookup_key]), self.authoritative_certs[lookup_key]
return False, None
def get_crl_file(self,
unused_revocation_point: RevocationPoint,
crl_signer_certificate: x509.Certificate) -> x509.CertificateRevocationList:
'''
Obtain the CRL.
Parameters
----------
unused_revocation_point: RevocationPoint
Revocation point. Not used.
crl_signer_certificate: x509.Certificate
Crl signer certificate.
Returns
-------
x509.CertificateRevocationList
CRL signed by the CRL signer certificate.
'''
for crl in self.crls:
if crl.issuer.public_bytes() == crl_signer_certificate.subject.public_bytes():
logging.debug(f"Found CRL for issuer: {crl.issuer.rfc4514_string()}")
return crl
return None
@click.group()
def cli():
pass
@cli.command('from-dcl')
@click.help_option('-h', '--help')
@optgroup.group('Input data sources', cls=RequiredMutuallyExclusiveOptionGroup)
@optgroup.option('--use-main-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring MainNet.")
@optgroup.option('--use-test-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring TestNet.")
@optgroup.option('--use-main-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public MainNet observer.")
@optgroup.option('--use-test-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public TestNet observer.")
@optgroup.option('--use-local-data', is_flag=True, type=bool, help="Fake response directory: see \" DATA_DIR/",)
@optgroup.group('Required arguments if use-local-data is used', cls=AllOptionGroup)
@optgroup.option('--certificates', type=click.File('rb'), multiple=True, help='Paths to PEM formated certificates (i.e. PAA) in DCL but missing from the revocation-points-response file.')
@optgroup.option('--crls', type=click.File('rb'), multiple=True, help='Paths to the crl der files')
@optgroup.option('--revocation-points-response', type=click.File('rb'), help='Path to the get-revocation-points response json file.')
@optgroup.group('Optional output arguments')
@optgroup.option('--output', default='sample_revocation_set_list.json', type=str, metavar='FILEPATH',
help="Output filename (default: sample_revocation_set_list.json)")
@optgroup.option('--log-level', default='INFO', show_default=True, type=click.Choice(__LOG_LEVELS__.keys(),
case_sensitive=False), callback=lambda c, p, v: __LOG_LEVELS__[v],
help='Determines the verbosity of script output')
def from_dcl(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, use_local_data: bool, revocation_points_response: str, crls: [], certificates: [], output: str, log_level: str):
"""Generate revocation set from DCL using generation algorithm from Matter Spec section 6.2.4.1."""
logging.basicConfig(
level=log_level,
format='%(asctime)s %(name)s %(levelname)-7s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
if use_local_data:
dcld_client = LocalFilesDclClient(crls, certificates, revocation_points_response)
elif use_main_net_http or use_test_net_http:
dcld_client = RestDclClient(True if use_test_net_http else False)
else:
dcld_client = NodeDclClient(use_main_net_dcld or use_test_net_dcld, True if use_test_net_dcld else False)
revocation_point_list = dcld_client.get_revocation_points()
revocation_set = []
for revocation_point in revocation_point_list:
# 1. Validate Revocation Type
if revocation_point.revocationType != RevocationType.CRL.value:
logging.warning("Revocation Type is not CRL, continue...")
continue
# 2. Parse the certificate
try:
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point.crlSignerCertificate, 'utf-8'))
except Exception:
logging.warning("CRL Signer Certificate is not valid, continue...")
continue
# Parse the crl signer delegator
crl_signer_delegator_cert = None
if revocation_point.crlSignerDelegator:
crl_signer_delegator_cert_pem = revocation_point.crlSignerDelegator
logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}")
try:
crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8'))
except Exception:
logging.warning("CRL Signer Delegator Certificate not found...")
# 3. and 4. Validate VID/PID
if not validate_vid_pid(revocation_point, crl_signer_certificate, crl_signer_delegator_cert):
logging.warning("Failed to validate VID/PID, continue...")
continue
# 5. Validate the certification path containing CRLSignerCertificate.
paa_certificate_object = dcld_client.get_paa_cert(crl_signer_certificate)
if paa_certificate_object is None:
logging.warning("PAA Certificate not found, continue...")
continue
if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False:
logging.warning("Failed to validate CRL Signer Certificate chain, continue...")
continue
# 6. Obtain the CRL
crl_file = dcld_client.get_crl_file(revocation_point, crl_signer_certificate)
if crl_file is None:
logging.warning("CRL file not found for revocation point, continue...")
continue
# 7. Perform CRL File Validation
# a.
try:
crl_signer_skid = get_skid(crl_signer_certificate)
except ExtensionNotFound:
logging.warning("CRL Signer SKID not found, continue...")
continue
try:
crl_akid = get_akid(crl_file)
except ExtensionNotFound:
logging.warning("CRL AKID is not found, continue...")
continue
if crl_akid != crl_signer_skid:
logging.warning("CRL AKID is not CRL Signer SKID, continue...")
continue
# b.
same_issuer_points = dcld_client.get_revocation_points_by_skid(crl_akid)
count_with_matching_vid_issuer_skid = sum(item.vid == revocation_point.vid for item in same_issuer_points)
if count_with_matching_vid_issuer_skid > 1:
try:
issuing_distribution_point = crl_file.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.ISSUING_DISTRIBUTION_POINT).value
except Exception:
logging.warning("CRL Issuing Distribution Point not found, continue...")
continue
uri_list = issuing_distribution_point.full_name
if len(uri_list) == 1 and isinstance(uri_list[0], x509.UniformResourceIdentifier):
if uri_list[0].value != revocation_point.dataURL:
logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...")
continue
else:
logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...")
continue
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280
# 9. Decide on certificate authority name and AKID
certificate_authority_name, certificate_akid_hex = get_certificate_authority_details(
crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point.isPAA)
# validate issuer skid matchces with the one in revocation points
logging.debug(f"revocation_point.issuerSubjectKeyID: {revocation_point.issuerSubjectKeyID}")
if revocation_point.issuerSubjectKeyID != certificate_akid_hex:
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
continue
# 10. Iterate through the Revoked Certificates List
entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate,
certificate_authority_name, certificate_akid_hex, crl_signer_delegator_cert)
logging.debug(f"Entry to append: {entry}")
revocation_set.append(entry)
with open(output, 'w+') as outfile:
json.dump([revocation.asDict() for revocation in revocation_set], outfile, indent=4)
class TestRevocationSetGeneration(unittest.TestCase):
"""Test class for revocation set generation"""
def setUp(self):
# Get the directory containing this file
self.test_base_dir = os.path.dirname(os.path.abspath(__file__))
def get_test_file_path(self, filename):
return os.path.join(self.test_base_dir, 'test', filename)
def get_expected_revocation_set(self, idx):
with open(os.path.join(self.test_base_dir, 'test/revoked-attestation-certificates/revocation-sets/revocation-set.json'), 'r') as f:
return RevocationSet(**json.load(f)[idx])
def compare_revocation_sets(self, generated_set, expected):
# Compare required fields
self.assertEqual(generated_set.type, expected.type)
self.assertEqual(generated_set.issuer_subject_key_id, expected.issuer_subject_key_id)
self.assertEqual(generated_set.issuer_name, expected.issuer_name)
self.assertEqual(set(generated_set.revoked_serial_numbers), set(expected.revoked_serial_numbers))
self.assertEqual(generated_set.crl_signer_cert, expected.crl_signer_cert)
# Compare optional fields if present in either set
if generated_set.crl_signer_delegator or expected.crl_signer_delegator:
self.assertEqual(generated_set.crl_signer_delegator, expected.crl_signer_delegator,
f'CRL signer delegator certificates do not match, expected: {expected.crl_signer_delegator}, actual: {generated_set.crl_signer_delegator}')
def test_paa_revocation_set(self):
"""Test generation of PAA revocation set"""
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-CRL.pem'), 'rb') as f:
crl = x509.load_pem_x509_crl(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f:
crl_signer = x509.load_pem_x509_certificate(f.read())
ca_name_b64, ca_akid_hex = get_certificate_authority_details(
crl_signer, None, None, True)
revocation_set = generate_revocation_set_from_crl(
crl, crl_signer, ca_name_b64, ca_akid_hex, None)
self.compare_revocation_sets(revocation_set, self.get_expected_revocation_set(0))
def test_pai_revocation_set(self):
"""Test generation of PAI revocation set"""
with open(self.get_test_file_path('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-CRL.pem'), 'rb') as f:
crl = x509.load_pem_x509_crl(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-Cert.pem'), 'rb') as f:
crl_signer = x509.load_pem_x509_certificate(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f:
paa = x509.load_pem_x509_certificate(f.read())
ca_name_b64, ca_akid_hex = get_certificate_authority_details(
crl_signer, None, paa, False)
revocation_set = generate_revocation_set_from_crl(
crl, crl_signer, ca_name_b64, ca_akid_hex, None)
self.compare_revocation_sets(revocation_set, self.get_expected_revocation_set(1))
def test_revoked_pai_revocation_set(self):
"""Test generation of revocation set of revoked PAI"""
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAI-FFF1-noPID-Revoked-CRL.pem'), 'rb') as f:
crl = x509.load_pem_x509_crl(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAI-FFF1-noPID-Revoked-Cert.pem'), 'rb') as f:
crl_signer = x509.load_pem_x509_certificate(f.read())
with open(self.get_test_file_path('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem'), 'rb') as f:
paa = x509.load_pem_x509_certificate(f.read())
ca_name_b64, ca_akid_hex = get_certificate_authority_details(
crl_signer, None, paa, False)
revocation_set = generate_revocation_set_from_crl(
crl, crl_signer, ca_name_b64, ca_akid_hex, None)
self.compare_revocation_sets(revocation_set, self.get_expected_revocation_set(2))
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == 'test':
# Remove the 'test' argument and run tests
sys.argv.pop(1)
unittest.main()
elif len(sys.argv) == 1:
cli.main(['--help'])
else:
cli()