blob: 3bcd74b6534e26afeb1c7db612dd3dfe86fc8558 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script that was used to fetch CHIP Development Product Attestation Authority (PAA)
# certificates from DCL.
# For usage please run:.
# python ./credentials/fetch-paa-certs-from-dcl.py --help
import copy
import os
import re
import subprocess
import sys
import click
import requests
from click_option_group import RequiredMutuallyExclusiveOptionGroup, optgroup
from cryptography import x509
from cryptography.hazmat.primitives import serialization
PRODUCTION_NODE_URL = "https://on.dcl.csa-iot.org:26657"
PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org"
TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org"
MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw"
MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5"
def parse_paa_root_certs(cmdpipe, paa_list):
"""
example output of a query to all x509 root certs in DCL:
certs:
- subject: MCExHzAdBgNVBAMMFk1hdHRlciBEZXZlbG9wbWVudCBQQUE=
subjectKeyId: FA:92:CF:09:5E:FA:42:E1:14:30:65:16:32:FE:FE:1B:2C:77:A7:C8
- subject: MDAxGDAWBgNVBAMMD01hdHRlciBUZXN0IFBBQTEUMBIGCisGAQQBgqJ8AgEMBDEyNUQ=
subjectKeyId: E2:90:8D:36:9C:3C:A3:C1:13:BB:09:E2:4D:C1:CC:C5:A6:66:91:D4
- subject: MEsxCzAJBgNVBAYTAlVTMQ8wDQYDVQQKDAZHb29nbGUxFTATBgNVBAMMDE1hdHRlciBQQUEgMTEUMBIGCisGAQQBgqJ8AgEMBDYwMDY=
subjectKeyId: B0:00:56:81:B8:88:62:89:62:80:E1:21:18:A1:A8:BE:09:DE:93:21
- subject: MFUxCzAJBgNVBAYTAlVTMRcwFQYDVQQKEw5EaWdpQ2VydCwgSW5jLjEtMCsGA1UEAxMkRGlnaUNlcnQgVEVTVCBSb290IENBIGZvciBNQVRURVIgUEtJ
subjectKeyId: C0:E0:64:15:00:EC:67:E2:7C:AF:7C:6E:2D:49:94:C7:73:DE:B7:BA
- subject: MDAxLjAsBgNVBAMMJU5vbiBQcm9kdWN0aW9uIE9OTFkgLSBYRk4gUEFBIENsYXNzIDM=
subjectKeyId: F8:99:A9:D5:AD:71:71:E4:C3:81:7F:14:10:7F:78:F0:D9:F7:62:E9
- subject: MEIxGDAWBgNVBAMMD01hdHRlciBUZXN0IFBBQTEQMA4GA1UECgwHU2Ftc3VuZzEUMBIGCisGAQQBgqJ8AgEMBDEwRTE=
subjectKeyId: CF:9E:0A:16:78:8B:40:30:EC:DD:AB:34:B9:C2:EC:7B:E5:34:55:C0
Brief:
This method will search for the first line that contains ': ' char sequence.
From there, it assumes every 2 lines contain subject and subject key id info of
a valid PAA root certificate.
The paa_list parameter will contain a list of all valid PAA Root certificates
from DCL.
"""
result = {}
while True:
line = cmdpipe.stdout.readline()
if not line:
break
else:
if b': ' in line:
key, value = line.split(b': ')
result[key.strip(b' -').decode("utf-8")
] = value.strip().decode("utf-8")
parse_paa_root_certs.counter += 1
if parse_paa_root_certs.counter % 2 == 0:
paa_list.append(copy.deepcopy(result))
def write_cert(certificate, subject):
filename = 'dcld_mirror_' + \
re.sub('[^a-zA-Z0-9_-]', '', re.sub('[=, ]', '_', subject))
with open(filename + '.pem', 'w+') as outfile:
outfile.write(certificate)
# convert pem file to der
try:
with open(filename + '.pem', 'rb') as infile:
pem_certificate = x509.load_pem_x509_certificate(infile.read())
with open(filename + '.der', 'wb+') as outfile:
der_certificate = pem_certificate.public_bytes(
serialization.Encoding.DER)
outfile.write(der_certificate)
except (IOError, ValueError) as e:
print(
f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...")
def parse_paa_root_cert_from_dcld(cmdpipe):
subject = None
certificate = ""
while True:
line = cmdpipe.stdout.readline()
if not line:
break
else:
if b'pemCert: |' in line:
while True:
line = cmdpipe.stdout.readline()
certificate += line.strip(b' \t').decode("utf-8")
if b'-----END CERTIFICATE-----' in line:
break
if b'subjectAsText:' in line:
subject = line.split(b': ')[1].strip().decode("utf-8")
break
return (certificate, subject)
def use_dcld(dcld, production, cmdlist):
return [dcld] + cmdlist + (['--node', PRODUCTION_NODE_URL] if production else [])
@click.command()
@click.help_option('-h', '--help')
@optgroup.group('Input data sources', cls=RequiredMutuallyExclusiveOptionGroup)
@optgroup.option('--use-main-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring MainNet.")
@optgroup.option('--use-test-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring TestNet.")
@optgroup.option('--use-main-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public MainNet observer.")
@optgroup.option('--use-test-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public TestNet observer.")
@optgroup.group('Optional arguments')
@optgroup.option('--paa-trust-store-path', default='paa-root-certs', type=str, metavar='PATH', help="PAA trust store path (default: paa-root-certs)")
def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
"""DCL PAA mirroring tools"""
fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path)
def get_cert_from_rest(rest_node_url, subject, subject_key_id):
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{subject}/{subject_key_id}").json()["approvedCertificates"]["certs"][0]
certificate = response["pemCert"].rstrip("\n")
subject = response["subjectAsText"]
return certificate, subject
def fetch_cd_signing_certs(store_path):
''' Only supports using main net http currently.'''
rest_node_url = PRODUCTION_NODE_URL_REST
os.makedirs(store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(store_path)
cd_signer_ids = requests.get(
f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds']
for signer in cd_signer_ids:
subject = signer['subject']
subject_key_id = signer['subjectKeyId']
certificate, subject = get_cert_from_rest(rest_node_url, subject, subject_key_id)
print(f"Downloaded CD signing cert with subject: {subject}")
write_cert(certificate, subject)
os.chdir(original_dir)
def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
production = False
dcld = use_test_net_dcld
if len(use_main_net_dcld) > 0:
dcld = use_main_net_dcld
production = True
use_rest = use_main_net_http or use_test_net_http
if use_main_net_http:
production = True
rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST
os.makedirs(paa_trust_store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(paa_trust_store_path)
if use_rest:
paa_list = requests.get(
f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"]
else:
cmdlist = ['query', 'pki', 'all-x509-root-certs']
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
paa_list = []
parse_paa_root_certs.counter = 0
parse_paa_root_certs(cmdpipe, paa_list)
for paa in paa_list:
if paa['subject'] == MATTER_CERT_CA_SUBJECT and paa['subjectKeyId'] == MATTER_CERT_CA_SUBJECT_KEY_ID:
# Don't include the CD signing cert as a PAA root.
continue
if use_rest:
certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId'])
else:
cmdlist = ['query', 'pki', 'x509-cert', '-u',
paa['subject'], '-k', paa['subjectKeyId']]
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(certificate, subject) = parse_paa_root_cert_from_dcld(cmdpipe)
certificate = certificate.rstrip('\n')
print(f"Downloaded PAA certificate with subject: {subject}")
write_cert(certificate, subject)
os.chdir(original_dir)
if __name__ == "__main__":
if len(sys.argv) == 1:
main.main(['--help'])
else:
main()