|
| 1 | +""" |
| 2 | +Utils to fetch CHIP Development Product Attestation Authority (PAA) certificates from DCL. |
| 3 | +
|
| 4 | +This is based on the original script from project-chip here: |
| 5 | +https://github.com/project-chip/connectedhomeip/edit/master/credentials/fetch-paa-certs-from-dcl.py |
| 6 | +
|
| 7 | +All rights reserved. |
| 8 | +""" |
| 9 | + |
| 10 | +import asyncio |
| 11 | +import logging |
| 12 | +import pathlib |
| 13 | +import re |
| 14 | + |
| 15 | +from aiohttp import ClientSession, ClientError |
| 16 | +from cryptography import x509 |
| 17 | +from cryptography.hazmat.primitives import serialization |
| 18 | + |
| 19 | +LOGGER = logging.getLogger(__name__) |
| 20 | +PRODUCTION_URL = "https://on.dcl.csa-iot.org" |
| 21 | +TEST_URL = "https://on.test-net.dcl.csa-iot.org" |
| 22 | + |
| 23 | +LAST_CERT_IDS: set[str] = set() |
| 24 | + |
| 25 | + |
| 26 | +async def write_paa_root_cert( |
| 27 | + certificate: str, subject: str, root_path: pathlib.Path |
| 28 | +) -> None: |
| 29 | + """Write certificate from string to file.""" |
| 30 | + |
| 31 | + def _write() -> None: |
| 32 | + filename_base = "dcld_mirror_" + re.sub( |
| 33 | + "[^a-zA-Z0-9_-]", "", re.sub("[=, ]", "_", subject) |
| 34 | + ) |
| 35 | + filepath_base = root_path.joinpath(filename_base) |
| 36 | + # handle PEM certificate file |
| 37 | + file_path_pem = f"{filepath_base}.pem" |
| 38 | + LOGGER.debug("Writing certificate %s", file_path_pem) |
| 39 | + with open(file_path_pem, "w+", encoding="utf-8") as outfile: |
| 40 | + outfile.write(certificate) |
| 41 | + # handle DER certificate file (converted from PEM) |
| 42 | + pem_certificate = x509.load_pem_x509_certificate(certificate.encode()) |
| 43 | + file_path_der = f"{filepath_base}.der" |
| 44 | + LOGGER.debug("Writing certificate %s", file_path_der) |
| 45 | + with open(file_path_der, "wb+") as outfile: |
| 46 | + der_certificate = pem_certificate.public_bytes(serialization.Encoding.DER) |
| 47 | + outfile.write(der_certificate) |
| 48 | + |
| 49 | + return await asyncio.get_running_loop().run_in_executor(None, _write) |
| 50 | + |
| 51 | + |
| 52 | +async def fetch_certificates( |
| 53 | + paa_trust_store_path: pathlib.Path, |
| 54 | + fetch_test_certificates: bool = True, |
| 55 | + fetch_production_certificates: bool = True, |
| 56 | +) -> int: |
| 57 | + """Fetch PAA Certificates.""" |
| 58 | + LOGGER.info("Fetching the latest PAA root certificates from DCL.") |
| 59 | + fetch_count: int = 0 |
| 60 | + base_urls = set() |
| 61 | + # determine which url's need to be queried. |
| 62 | + # if we're going to fetch both prod and test, do test first |
| 63 | + # so any duplicates will be overwritten/preferred by the production version |
| 64 | + # NOTE: While Matter is in BETA we fetch the test certificates by default |
| 65 | + if fetch_test_certificates: |
| 66 | + base_urls.add(TEST_URL) |
| 67 | + if fetch_production_certificates: |
| 68 | + base_urls.add(PRODUCTION_URL) |
| 69 | + |
| 70 | + try: |
| 71 | + async with ClientSession(raise_for_status=True) as http_session: |
| 72 | + for url_base in base_urls: |
| 73 | + # fetch the paa certificates list |
| 74 | + async with http_session.get( |
| 75 | + f"{url_base}/dcl/pki/root-certificates" |
| 76 | + ) as response: |
| 77 | + result = await response.json() |
| 78 | + paa_list = result["approvedRootCertificates"]["certs"] |
| 79 | + # grab each certificate |
| 80 | + for paa in paa_list: |
| 81 | + # do not fetch a certificate if we already fetched it |
| 82 | + if paa["subjectKeyId"] in LAST_CERT_IDS: |
| 83 | + continue |
| 84 | + async with http_session.get( |
| 85 | + f"{url_base}/dcl/pki/certificates/{paa['subject']}/{paa['subjectKeyId']}" |
| 86 | + ) as response: |
| 87 | + result = await response.json() |
| 88 | + |
| 89 | + certificate_data: dict = result["approvedCertificates"]["certs"][0] |
| 90 | + certificate: str = certificate_data["pemCert"] |
| 91 | + subject = certificate_data["subjectAsText"] |
| 92 | + certificate = certificate.rstrip("\n") |
| 93 | + |
| 94 | + await write_paa_root_cert( |
| 95 | + certificate, |
| 96 | + subject, |
| 97 | + paa_trust_store_path, |
| 98 | + ) |
| 99 | + LAST_CERT_IDS.add(paa["subjectKeyId"]) |
| 100 | + fetch_count += 1 |
| 101 | + except ClientError as err: |
| 102 | + LOGGER.warning( |
| 103 | + "Fetching latest certificates failed: error %s", err, exc_info=err |
| 104 | + ) |
| 105 | + else: |
| 106 | + LOGGER.info("Fetched %s PAA root certificates from DCL.", fetch_count) |
| 107 | + return fetch_count |
0 commit comments