diff --git a/credentials/generate_revocation_set.py b/credentials/generate_revocation_set.py index afd69dcc1dd669..6a1374828d43ae 100755 --- a/credentials/generate_revocation_set.py +++ b/credentials/generate_revocation_set.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - # # Copyright (c) 2023-2024 Project CHIP Authors # @@ -15,12 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +# Note: py cryptography supports indirect CRLs trom version 44.0.0 and above. +# You may need to update your cryptography version. +# # Generates a basic RevocationSet from TestNet # Usage: # python ./credentials/generate-revocation-set.py --help import base64 +import dataclasses import json import logging import os @@ -32,10 +34,11 @@ import click import requests -from click_option_group import RequiredMutuallyExclusiveOptionGroup, optgroup +from click_option_group import AllOptionGroup, RequiredMutuallyExclusiveOptionGroup, optgroup from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.x509.extensions import ExtensionNotFound from cryptography.x509.oid import NameOID # Supported log levels, mapping string values required for argument @@ -52,6 +55,45 @@ class RevocationType(Enum): CRL = 1 +class CertVerificationResult(Enum): + SUCCESS = 1 + SKID_NOT_FOUND = 2 + AKID_NOT_FOUND = 3 + SIGNATURE_VERIFICATION_FAILED = 4 + ISSUER_MISMATCH = 5 + AKID_MISMATCH = 6 + + +@dataclasses.dataclass +class RevocationPoint: + vid: int + label: str + issuerSubjectKeyID: str + pid: int + isPAA: bool + crlSignerCertificate: str + dataURL: str + dataFileSize: str + dataDigest: str + dataDigestType: int + revocationType: int + schemaVersion: int + crlSignerDelegator: str + + +@dataclasses.dataclass +class RevocationSet: + type: str + issuer_subject_key_id: str + issuer_name: str + revoked_serial_numbers: [str] + crl_signer_cert: str + crl_signer_delegator: str = None + + def asDict(self): + return dataclasses.asdict(self) + + OID_VENDOR_ID = x509.ObjectIdentifier("1.3.6.1.4.1.37244.2.1") OID_PRODUCT_ID = x509.ObjectIdentifier("1.3.6.1.4.1.37244.2.2") @@ -95,56 +137,51 @@ def parse_vid_pid_from_distinguished_name(distinguished_name): return vid, pid -def get_akid(cert: x509.Certificate) -> Optional[bytes]: - try: - return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier - except Exception: - logging.warning("AKID not found in certificate") - return None - - -def get_skid(cert: x509.Certificate) -> Optional[bytes]: - try: - return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier - except Exception: - logging.warning("SKID not found in certificate") - return None - - -def get_subject_b64(cert: x509.Certificate) -> str: - return base64.b64encode(cert.subject.public_bytes()).decode('utf-8') +def get_akid(cert: x509.Certificate) -> str: + return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier.hex().upper() -def get_issuer_b64(cert: x509.Certificate) -> str: - return base64.b64encode(cert.issuer.public_bytes()).decode('utf-8') +def get_skid(cert: x509.Certificate) -> str: + return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier.hex().upper() -def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool: +def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> CertVerificationResult: ''' Verifies if the cert is signed by root. ''' + try: + cert_akid = get_akid(cert) + except ExtensionNotFound: + return CertVerificationResult.AKID_NOT_FOUND + try: + root_skid = get_skid(root) + except ExtensionNotFound: + return CertVerificationResult.SKID_NOT_FOUND - cert_akid = get_akid(cert) - root_skid = get_skid(root) - if cert_akid is None or root_skid is None or cert_akid != root_skid: - return False + if cert_akid != root_skid: + return CertVerificationResult.AKID_MISMATCH if cert.issuer != root.subject: - return False + return CertVerificationResult.ISSUER_MISMATCH # public_key().verify() do not return anything if signature is valid, # will raise an exception if signature is invalid try: root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm)) except Exception: - logging.warning(f"Signature verification failed for cert subject: {get_subject_b64(cert)}, issuer: {get_issuer_b64(cert)}") - return False + return CertVerificationResult.SIGNATURE_VERIFICATION_FAILED - return True + return CertVerificationResult.SUCCESS def is_self_signed_certificate(cert: x509.Certificate) -> bool: - return verify_cert(cert, cert) + result = verify_cert(cert, cert) + if result == CertVerificationResult.SUCCESS: + return True + else: + logging.debug( + f"Certificate with subject: {cert.subject.rfc4514_string()} is not a valid self-signed certificate. Result: {result.name}") + return False # delegator is optional so can be None, but crl_signer and paa has to be present @@ -158,17 +195,34 @@ def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509 ''' if crl_signer_delegator: - return verify_cert(crl_signer, crl_signer_delegator) and verify_cert(crl_signer_delegator, paa) + result_signer = verify_cert(crl_signer, crl_signer_delegator) + if not result_signer == CertVerificationResult.SUCCESS: + logging.debug( + f"Cannot verify certificate subject: {crl_signer.subject.rfc4514_string()} issued by certificate subject: {crl_signer_delegator.subject.rfc4514_string()}. Result: {result_signer.name}") + return False + + result_delegator = verify_cert(crl_signer_delegator, paa) + if not result_delegator == CertVerificationResult.SUCCESS: + logging.debug( + f"Cannot verify certificate subject: {crl_signer_delegator.subject.rfc4514_string()} issued by certificate subject: {paa.subject.rfc4514_string()}. Result: {result_delegator.name}") + return False + return True else: - return verify_cert(crl_signer, paa) + result = verify_cert(crl_signer, paa) + if not result == CertVerificationResult.SUCCESS: + logging.debug( + f"Cannot verify certificate subject: {crl_signer.subject.rfc4514_string()} issued by certificate subject: {paa.subject.rfc4514_string()}. Result: {result.name}") + return False + return True -def validate_vid_pid(revocation_point: dict, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool: +def validate_vid_pid(revocation_point: RevocationPoint, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool: crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject) - - if revocation_point["isPAA"]: + logging.debug(f"vid: {revocation_point.vid})") + logging.debug(f"crl_signer_vid: {crl_signer_vid})") + if revocation_point.isPAA: if crl_signer_vid is not None: - if revocation_point["vid"] != crl_signer_vid: + if revocation_point.vid != crl_signer_vid: logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...") return False else: @@ -178,45 +232,36 @@ def validate_vid_pid(revocation_point: dict, crl_signer_certificate: x509.Certif # if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator if crl_signer_delegator_certificate: vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject) - - if vid_to_match is None or revocation_point["vid"] != vid_to_match: + logging.debug(f"vid_to_match: {vid_to_match})") + logging.debug(f"pid_to_match: {pid_to_match})") + if vid_to_match is None or revocation_point.vid != vid_to_match: logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...") return False if pid_to_match is not None: - if revocation_point["pid"] != pid_to_match: + if revocation_point.pid != pid_to_match: logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...") return False return True -def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList: - logging.debug(f"Fetching CRL from {url}") - - try: - r = requests.get(url, timeout=timeout) - return x509.load_der_x509_crl(r.content) - except Exception: - logging.error('Failed to fetch a valid CRL') - - def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList, crl_signer_certificate: x509.Certificate, - certificate_authority_name_b64: str, + certificate_authority_name: x509.Name, certificate_akid_hex: str, - crl_signer_delegator_cert: x509.Certificate) -> dict: + crl_signer_delegator_cert: x509.Certificate) -> RevocationSet: """Generate a revocation set from a CRL file. Args: crl_file: The CRL object containing revoked certificates crl_signer_certificate: The certificate object used to sign the CRL - certificate_authority_name_b64: Base64 encoded issuer name + certificate_authority_name: x509.Name of the issuer certificate_akid_hex: Hex encoded Authority Key Identifier crl_signer_delegator_cert: crl signer delegator certificate object Returns: - dict: A dictionary containing the revocation set data with fields: + RevocationSet containing the revocation set data with fields: - type: "revocation_set" - issuer_subject_key_id: Authority Key Identifier (hex) - issuer_name: Issuer name (base64) @@ -229,13 +274,11 @@ def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList, for revoked_cert in crl_file: try: cert_issuer_entry_ext = revoked_cert.extensions.get_extension_for_oid(x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER) - revoked_cert_issuer = cert_issuer_entry_ext.value.get_values_for_type(x509.DirectoryName)[0].public_bytes() - revoked_cert_issuer_b64 = base64.b64encode(revoked_cert_issuer).decode('utf-8') - - if revoked_cert_issuer_b64 is not None: + revoked_cert_issuer = cert_issuer_entry_ext.value.get_values_for_type(x509.DirectoryName)[0] + if revoked_cert_issuer is not None: # check if this really are the same thing - if revoked_cert_issuer_b64 != certificate_authority_name_b64: - logging.warning("CRL Issuer is not CRL File Issuer, continue...") + if revoked_cert_issuer != x509.DirectoryName(certificate_authority_name).value: + logging.warning("CRL entry issuer is not CRL File Issuer, continue...") continue except Exception: pass @@ -248,16 +291,16 @@ def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList, serialnumber = serialnumber if len(serialnumber) % 2 == 0 else '0' + serialnumber serialnumber_list.append(serialnumber) - entry = { - "type": "revocation_set", - "issuer_subject_key_id": certificate_akid_hex, - "issuer_name": certificate_authority_name_b64, - "revoked_serial_numbers": serialnumber_list, - "crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'), - } + entry = RevocationSet( + type='revocation_set', + issuer_subject_key_id=certificate_akid_hex, + issuer_name=get_b64_name(certificate_authority_name), + revoked_serial_numbers=serialnumber_list, + crl_signer_cert=base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'), + ) if crl_signer_delegator_cert: - entry["crl_signer_delegator"] = base64.b64encode( + entry.crl_signer_delegator = base64.b64encode( crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8') return entry @@ -267,7 +310,7 @@ def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList, def get_certificate_authority_details(crl_signer_certificate: x509.Certificate, crl_signer_delegator_cert: x509.Certificate, paa_certificate_object: x509.Certificate, - is_paa: bool) -> tuple[str, str]: + is_paa: bool) -> tuple[x509.Name, str]: """Get certificate authority name and AKID based on certificate hierarchy. Args: @@ -277,7 +320,7 @@ def get_certificate_authority_details(crl_signer_certificate: x509.Certificate, is_paa: Whether this is a PAA certificate Returns: - tuple[str, str]: (certificate_authority_name_b64, certificate_akid_hex) + tuple[str, str]: (certificate_authority_name, certificate_akid_hex) """ if is_paa and not is_self_signed_certificate(crl_signer_certificate): cert_for_details = paa_certificate_object @@ -289,40 +332,159 @@ def get_certificate_authority_details(crl_signer_certificate: x509.Certificate, cert_for_details = crl_signer_certificate logging.debug("Using CRL Signer certificate for details") - certificate_authority_name_b64 = get_subject_b64(cert_for_details) - certificate_akid = get_skid(cert_for_details) - certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid) + certificate_authority_name = cert_for_details.subject + try: + certificate_akid = get_skid(cert_for_details) + logging.debug(f"Certificate Authority Name: {certificate_authority_name}") + logging.debug(f"Certificate AKID: {certificate_akid}") + + return certificate_authority_name, certificate_akid + except ExtensionNotFound: + logging.warning("Certificate SKID not found in authoarity certificate.") + + +def get_b64_name(name: x509.name.Name) -> str: + ''' + Get base64 encoded name + ''' + return base64.b64encode(name.public_bytes()).decode('utf-8') + - logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}") - logging.debug(f"Certificate AKID: {certificate_akid_hex}") +def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList: + logging.debug(f"Fetching CRL from {url}") - return certificate_authority_name_b64, certificate_akid_hex + try: + r = requests.get(url, timeout=timeout) + logging.debug(f"Fetched CRL: {r.content}") + return x509.load_der_x509_crl(r.content) + except Exception as e: + logging.error('Failed to fetch a valid CRL', e) -class DCLDClient: +class DclClientInterface: ''' - A client for interacting with DCLD using either the REST API or command line interface (CLI). + An interface for interacting with DCLD. + ''' + + def send_get_request(self, url: str) -> dict: + ''' + Send a GET request for a json object. + ''' + try: + response = requests.get(url).json() + return response + except Exception as e: + logging.error(f"Failed to fetch {url}: {e}") + return None + + def get_revocation_points(self) -> list[RevocationPoint]: + ''' + Get revocation points from DCL + + Returns + ------- + list[RevocationPoint] + List of revocation points + ''' + raise NotImplementedError + + def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: + ''' + Get revocation points by subject key ID + + Parameters + ---------- + issuer_subject_key_id: str + Subject key ID + + Returns + ------- + list[RevocationPoint] + List of revocation points + ''' + raise NotImplementedError + + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: + ''' + Get certificate from DCL. + ''' + raise NotImplementedError + + def get_only_approved_certificate(self, response: dict, skid_hex: str) -> tuple[bool, Optional[x509.Certificate]]: + ''' + Get only approved certificate from DCL resposne. + ''' + if response is None or not response.get("approvedCertificates", {}).get("certs", []): + raise requests.exception.NotFound(f"No certificate found for {skid_hex}") + if len(response["approvedCertificates"]["certs"]) > 1: + raise ValueError(f"Multiple certificates found for {skid_hex}") + issuer_certificate = x509.load_pem_x509_certificate(bytes(response["approvedCertificates"]["certs"][0]["pemCert"], "utf-8")) + return response["approvedCertificates"]["certs"][0]["isRoot"], issuer_certificate + + def get_paa_cert(self, initial_cert: x509.Certificate) -> Optional[x509.Certificate]: + ''' + Get the PAA certificate for the CRL Signer Certificate. + ''' + issuer_name = initial_cert.issuer + try: + akid = get_akid(initial_cert) + except ExtensionNotFound: + logging.warning('Certificate AKID not found.') + return + paa_certificate = None + while not paa_certificate: + try: + is_root, issuer_certificate = self.get_approved_certificate(issuer_name, akid) + if is_root: + paa_certificate = issuer_certificate + break + + except Exception as e: + logging.error('Failed to get PAA certificate', e) + return + logging.debug(f"issuer_name: {issuer_certificate.subject.rfc4514_string()}") + issuer_name = issuer_certificate.issuer + try: + akid = get_akid(issuer_certificate) + except ExtensionNotFound: + logging.warning('Issuer Certificate AKID not found.') + logging.debug(f"akid: {akid}") + if paa_certificate is None: + logging.warning("PAA Certificate not found, continue...") + return paa_certificate + + def get_crl_file(self, + revocation_point: RevocationPoint, + crl_signer_certificate: x509.Certificate) -> x509.CertificateRevocationList: + """Obtain the CRL.""" + try: + r = requests.get(revocation_point.dataURL, timeout=5) + logging.debug(f"Fetched CRL: {r.content}") + return x509.load_der_x509_crl(r.content) + except Exception: + logging.warning(f"Failed to fetch a valid CRL for': {crl_signer_certificate.subject.rfc4514_string()}") + def get_formatted_hex_skid(self, skid_hex: str) -> str: + return ':'.join([skid_hex[i:i+2] for i in range(0, len(skid_hex), 2)]) + + +class NodeDclClient(DclClientInterface): + ''' + A client for interacting with DCLD using command line interface (CLI). ''' - def __init__(self, use_rest: bool, dcld_exe: str, production: bool, rest_node_url: str): + def __init__(self, dcld_exe: str, use_test_net: bool): ''' - Initialize the client + Initialize the client. - use_rest: bool - Use RESTful API with HTTPS against `rest_node_url` dcld_exe: str - Path to `dcld` executable - production: bool - Use MainNet DCL URL with dcld executable - rest_node_url: str - RESTful API URL + Path to `dcld` executable. + use_test_net: bool + Indicates if the client should use TestNet or MainNet URL with dcld executable. ''' - self.use_rest = use_rest self.dcld_exe = dcld_exe - self.production = production - self.rest_node_url = rest_node_url + self.use_test_net = use_test_net def build_dcld_command_line(self, cmdlist: list[str]) -> list[str]: ''' @@ -331,15 +493,15 @@ def build_dcld_command_line(self, cmdlist: list[str]) -> list[str]: Parameters ---------- cmdlist: list[str] - List of command line arguments to append to some predefined arguments + List of command line arguments to append to some predefined arguments. Returns ------- list[str] - The complete command list including the DCLD executable and node option if in production + The complete command list including the DCLD executable and node option if in production. ''' - return [self.dcld_exe] + cmdlist + (['--node', PRODUCTION_NODE_URL] if self.production else []) + return [self.dcld_exe] + cmdlist + ([] if self.use_test_net else ['--node', PRODUCTION_NODE_URL]) def get_dcld_cmd_output_json(self, cmdlist: list[str]) -> dict: ''' @@ -348,12 +510,12 @@ def get_dcld_cmd_output_json(self, cmdlist: list[str]) -> dict: Parameters ---------- cmdlist: list[str] - List of command line arguments to append to some predefined arguments + List of command line arguments to append to some predefined arguments. Returns ------- dict - The JSON output from the command + The JSON output from the command. ''' # Set the output as JSON @@ -363,90 +525,331 @@ def get_dcld_cmd_output_json(self, cmdlist: list[str]) -> dict: stdout=subprocess.PIPE, stderr=subprocess.PIPE) return json.loads(cmdpipe.stdout.read()) - def get_revocation_points(self) -> list[dict]: + def get_revocation_points(self) -> list[RevocationPoint]: ''' - Get revocation points from DCL + Get revocation points from DCL. Returns ------- - list[dict] - List of revocation points + list[RevocationPoint] + List of revocation points. + ''' + + response = self.get_dcld_cmd_output_json(['query', 'pki', 'all-revocation-points']) + return [RevocationPoint(**r) for r in response["PkiRevocationDistributionPoint"]] + + def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: ''' + Get revocation points by subject key ID. - if self.use_rest: - response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points").json() - else: - response = self.get_dcld_cmd_output_json(['query', 'pki', 'all-revocation-points']) + Parameters + ---------- + issuer_subject_key_id: str + Subject key ID. + + Returns + ------- + list[RevocationPoint] + List of revocation points. + ''' - return response["PkiRevocationDistributionPoint"] + response = self.get_dcld_cmd_output_json(['query', 'pki', 'revocation-points', + '--issuer-subject-key-id', issuer_subject_key_id]) + logging.debug(f"Response revocation points: {response}") + return [RevocationPoint(**r) for r in response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]] - def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]: + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: ''' - Get the issuer certificate for + Get certificate from DCL. Parameters ---------- - cert: x509.Certificate - Certificate + subject_name: x509.name.Name + Subject Name object. + skid_hex: str + Subject Key ID in hex format. Returns ------- - str - Issuer certificate in PEM format + tuple[bool, x509.Certificate] + Tuple of is_paa and the certificate from the DCL. ''' - issuer_name_b64 = get_issuer_b64(cert) - akid = get_akid(cert) - if akid is None: - return + subject_name_b64 = get_b64_name(subject_name) + query_cmd_list = ['query', 'pki', 'x509-cert', '-u', subject_name_b64, '-k', skid_hex] + logging.debug( + f"Fetching issuer from dcl query{' '.join(query_cmd_list)}") + response = self.get_dcld_cmd_output_json(query_cmd_list) + return self.get_only_approved_certificate(response, skid_hex) + + +class RestDclClient(DclClientInterface): + ''' + A client for interacting with DCLD using the REST API. + ''' - # Convert CRL Signer AKID to colon separated hex - akid_hex = akid.hex().upper() - akid_hex = ':'.join([akid_hex[i:i+2] for i in range(0, len(akid_hex), 2)]) + def __init__(self, use_test_net: bool): + ''' + Initialize the client. + + use_test_net: bool + Indicates if the client should use TestNet or MainNet REST API URL. + ''' + self.rest_node_url = TEST_NODE_URL_REST if use_test_net else PRODUCTION_NODE_URL_REST + + def get_revocation_points(self) -> list[RevocationPoint]: + ''' + Get revocation points from DCL. + + Returns + ------- + list[RevocationPoint] + List of revocation points. + ''' + + response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points") + return [RevocationPoint(**r) for r in response["PkiRevocationDistributionPoint"]] + + def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: + ''' + Get revocation points by subject key ID. + + Parameters + ---------- + issuer_subject_key_id: str + Subject key ID. + + Returns + ------- + list[RevocationPoint] + List of revocation points. + ''' + + response = self.send_get_request(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}") + return [RevocationPoint(**r) for r in response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]] + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: + ''' + Get certificate from DCL. + + Parameters + ---------- + subject_name: x509.name.Name + Subject Name object. + skid_hex: str + Subject Key ID in hex format. + + Returns + ------- + tuple[bool, x509.Certificate] + Tuple of is_paa and the certificate from the DCL. + ''' logging.debug( - f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}") + f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{get_b64_name(subject_name)}/{self.get_formatted_hex_skid(skid_hex)}") + response = self.send_get_request( + f"{self.rest_node_url}/dcl/pki/certificates/{get_b64_name(subject_name)}/{self.get_formatted_hex_skid(skid_hex)}") + logging.debug(f"Response certificate: {response}") + return self.get_only_approved_certificate(response, skid_hex) + + +class LocalFilesDclClient(DclClientInterface): + ''' + A client for interacting with local DLCD response data. + ''' + + def __init__(self, crls: [], dcl_certificates: [], revocation_points_response_file: str): + ''' + Initialize the client. - if self.use_rest: - response = requests.get( - f"{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}").json() - else: - response = self.get_dcld_cmd_output_json( - ['query', 'pki', 'x509-cert', '-u', issuer_name_b64, '-k', akid_hex]) + Parameters + ---------- + crls: list + List of CRL files. + dcl_certificates: list + List of certificate files. + revocation_points_response_file: str + Path to the get-revocation-points response json file. + ''' - issuer_certificate = response["approvedCertificates"]["certs"][0]["pemCert"] + logging.debug(f"Loading certificates from {dcl_certificates}") + logging.debug(f"Loading crls from {crls}") + logging.debug(f"Loading revocation points response from {revocation_points_response_file}") + self.crls = self.get_crls(crls) + self.revocation_points = [RevocationPoint(**r) + for r in json.load(revocation_points_response_file)["PkiRevocationDistributionPoint"]] + self.authoritative_certs = self.get_authoritative_certificates(dcl_certificates) - logging.debug(f"issuer: {issuer_certificate}") + def get_lookup_key(self, certificate: x509.Certificate) -> str: + ''' + Get key used in this class to lookup certificates. + + Parameters + ---------- + certificate: x509.Certificate + Certificate object. + Returns + ------- + str: + lookup key derived from the certificate. + ''' + base64_name = get_b64_name(certificate.subject) try: - issuer_certificate_object = x509.load_pem_x509_certificate(bytes(issuer_certificate, 'utf-8')) - except Exception: - logging.error('Failed to parse PAA certificate') - return + skid = get_skid(certificate) + return self.format_lookup_key(base64_name, skid) + except ExtensionNotFound: + logging.warning("CertificateSKID not found, continue...") + + def format_lookup_key(self, base64_name: str, skid_hex: str) -> str: + ''' + Get formatted key used in this class to lookup certificates. + + Parameters + ---------- + base64_name: str + Base64 encoded subject name. + skid_hex: str + Subject Key ID in hex format. + + Returns + ------- + str: + Key used in this class to lookup certificates. + ''' + delimiter = '/' + skid_hex_formatted = self.get_formatted_hex_skid(skid_hex) + return delimiter.join([base64_name, skid_hex_formatted]) - return issuer_certificate_object + def get_crls(self, unread_crls: []) -> list[x509.CertificateRevocationList]: + ''' + Get CRLs from list of files. + + Parameters + ---------- + unread_crls: list + List of CRL files. + + Returns + ------- + list[x509.CertificateRevocationList] + List of CRLs. + ''' + crls = [] + for file in unread_crls: + crl_content = file.read() + crl_file = x509.load_der_x509_crl(crl_content) + crls.append(crl_file) + return crls + + def get_authoritative_certificates(self, dcl_certificates: []) -> dict[str, x509.Certificate]: + ''' + Get certificates from revocation points response file and list of provided dcl certificates. + + Parameters + ---------- + dcl_certificates: list + List of certificate files. + + Returns + ------- + dict[str, x509.CertificateRevocationList] + Dictionary of certificates, keyed by lookup key. + ''' + certificates = {} + logging.debug(f"Loading certificates from {dcl_certificates}") + if dcl_certificates: + for file in dcl_certificates: + logging.debug(f"Loading certificate from {file}") + # with open(file, "r") as f: + certificate = x509.load_pem_x509_certificate(file.read()) + certificates[self.get_lookup_key(certificate)] = certificate + + logging.debug("Loading certificates from revocation_points_response file.") + for point in self.revocation_points: + if point.crlSignerDelegator: + certificate = x509.load_pem_x509_certificate(bytes(point.crlSignerDelegator, 'utf-8')) + certificates[self.get_lookup_key(certificate)] = certificate + elif point.crlSignerCertificate: + certificate = x509.load_pem_x509_certificate(bytes(point.crlSignerCertificate, 'utf-8')) + certificates[self.get_lookup_key(certificate)] = certificate + return certificates + + def get_revocation_points(self) -> list[RevocationPoint]: + ''' + Get revocation points from DCL. + + Returns + ------- + list[RevocationPoint] + List of revocation points. + ''' + return self.revocation_points - def get_revocations_points_by_skid(self, issuer_subject_key_id: str) -> list[dict]: + def get_revocation_points_by_skid(self, issuer_subject_key_id) -> list[RevocationPoint]: ''' Get revocation points by subject key ID Parameters ---------- issuer_subject_key_id: str - Subject key ID + Subject key ID. Returns ------- - list[dict] - List of revocation points + list[RevocationPoint] + List of revocation points with the same issuer subject key ID. + ''' + same_issuer_points = [] + for point in self.revocation_points: + if point.issuerSubjectKeyID == issuer_subject_key_id: + same_issuer_points.append(point) + return same_issuer_points + + def get_approved_certificate(self, subject_name: x509.name.Name, skid_hex: str) -> tuple[bool, x509.Certificate]: + ''' + Get certificate from DCL + + Parameters + ---------- + subject_name: x509.name.Name + Subject name object. + + skid_hex: str + Subject Key ID in hex format. + + Returns + ------- + tuple[bool, x509.Certificate] + Tuple of is_paa and the certificate from the DCL. + ''' + lookup_key = self.format_lookup_key(get_b64_name(subject_name), skid_hex) + if lookup_key in self.authoritative_certs: + return is_self_signed_certificate(self.authoritative_certs[lookup_key]), self.authoritative_certs[lookup_key] + return False, None + + def get_crl_file(self, + unused_revocation_point: RevocationPoint, + crl_signer_certificate: x509.Certificate) -> x509.CertificateRevocationList: ''' + Obtain the CRL. + + Parameters + ---------- + unused_revocation_point: RevocationPoint + Revocation point. Not used. - if self.use_rest: - response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json() - else: - response = self.get_dcld_cmd_output_json(['query', 'pki', 'revocation-points', - '--issuer-subject-key-id', issuer_subject_key_id]) + crl_signer_certificate: x509.Certificate + Crl signer certificate. - return response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"] + Returns + ------- + x509.CertificateRevocationList + CRL signed by the CRL signer certificate. + ''' + for crl in self.crls: + if crl.issuer.public_bytes() == crl_signer_certificate.subject.public_bytes(): + logging.debug(f"Found CRL for issuer: {crl.issuer.rfc4514_string()}") + return crl + return None @click.group() @@ -461,56 +864,51 @@ def cli(): @optgroup.option('--use-test-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring TestNet.") @optgroup.option('--use-main-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public MainNet observer.") @optgroup.option('--use-test-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public TestNet observer.") -@optgroup.group('Optional arguments') +@optgroup.option('--use-local-data', is_flag=True, type=bool, help="Fake response directory: see \" DATA_DIR/",) +@optgroup.group('Required arguments if use-local-data is used', cls=AllOptionGroup) +@optgroup.option('--certificates', type=click.File('rb'), multiple=True, help='Paths to PEM formated certificates (i.e. PAA) in DCL but missing from the revocation-points-response file.') +@optgroup.option('--crls', type=click.File('rb'), multiple=True, help='Paths to the crl der files') +@optgroup.option('--revocation-points-response', type=click.File('rb'), help='Path to the get-revocation-points response json file.') +@optgroup.group('Optional output arguments') @optgroup.option('--output', default='sample_revocation_set_list.json', type=str, metavar='FILEPATH', help="Output filename (default: sample_revocation_set_list.json)") @optgroup.option('--log-level', default='INFO', show_default=True, type=click.Choice(__LOG_LEVELS__.keys(), case_sensitive=False), callback=lambda c, p, v: __LOG_LEVELS__[v], help='Determines the verbosity of script output') -def from_dcl(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, output, log_level): - """Generate revocation set from DCL""" +def from_dcl(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, use_local_data: bool, revocation_points_response: str, crls: [], certificates: [], output: str, log_level: str): + """Generate revocation set from DCL using generation algorithm from Matter Spec section 6.2.4.1.""" logging.basicConfig( level=log_level, format='%(asctime)s %(name)s %(levelname)-7s %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) - production = False - dcld = use_test_net_dcld - - if len(use_main_net_dcld) > 0: - dcld = use_main_net_dcld - production = True - - use_rest = use_main_net_http or use_test_net_http - if use_main_net_http: - production = True - - rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST - - dcld_client = DCLDClient(use_rest, dcld, production, rest_node_url) + if use_local_data: + dcld_client = LocalFilesDclClient(crls, certificates, revocation_points_response) + elif use_main_net_http or use_test_net_http: + dcld_client = RestDclClient(True if use_test_net_http else False) + else: + dcld_client = NodeDclClient(use_main_net_dcld or use_test_net_dcld, True if use_test_net_dcld else False) revocation_point_list = dcld_client.get_revocation_points() - revocation_set = [] - for revocation_point in revocation_point_list: # 1. Validate Revocation Type - if revocation_point["revocationType"] != RevocationType.CRL.value: + if revocation_point.revocationType != RevocationType.CRL.value: logging.warning("Revocation Type is not CRL, continue...") continue # 2. Parse the certificate try: - crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8')) + crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point.crlSignerCertificate, 'utf-8')) except Exception: logging.warning("CRL Signer Certificate is not valid, continue...") continue # Parse the crl signer delegator crl_signer_delegator_cert = None - if "crlSignerDelegator" in revocation_point: - crl_signer_delegator_cert_pem = revocation_point["crlSignerDelegator"] + if revocation_point.crlSignerDelegator: + crl_signer_delegator_cert_pem = revocation_point.crlSignerDelegator logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}") try: crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8')) @@ -523,7 +921,7 @@ def from_dcl(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_n continue # 5. Validate the certification path containing CRLSignerCertificate. - paa_certificate_object = dcld_client.get_issuer_cert(crl_signer_certificate) + paa_certificate_object = dcld_client.get_paa_cert(crl_signer_certificate) if paa_certificate_object is None: logging.warning("PAA Certificate not found, continue...") continue @@ -533,35 +931,42 @@ def from_dcl(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_n continue # 6. Obtain the CRL - crl_file = fetch_crl_from_url(revocation_point["dataURL"], 5) # timeout in seconds + crl_file = dcld_client.get_crl_file(revocation_point, crl_signer_certificate) if crl_file is None: + logging.warning("CRL file not found for revocation point, continue...") continue # 7. Perform CRL File Validation # a. - crl_signer_skid = get_skid(crl_signer_certificate) - crl_akid = get_akid(crl_file) + try: + crl_signer_skid = get_skid(crl_signer_certificate) + except ExtensionNotFound: + logging.warning("CRL Signer SKID not found, continue...") + continue + try: + crl_akid = get_akid(crl_file) + except ExtensionNotFound: + logging.warning("CRL AKID is not found, continue...") + continue if crl_akid != crl_signer_skid: logging.warning("CRL AKID is not CRL Signer SKID, continue...") continue - crl_akid_hex = ''.join('{:02X}'.format(x) for x in crl_akid) - # b. - same_issuer_points = dcld_client.get_revocations_points_by_skid(crl_akid_hex) - count_with_matching_vid_issuer_skid = sum(item.get('vid') == revocation_point["vid"] for item in same_issuer_points) + same_issuer_points = dcld_client.get_revocation_points_by_skid(crl_akid) + count_with_matching_vid_issuer_skid = sum(item.vid == revocation_point.vid for item in same_issuer_points) if count_with_matching_vid_issuer_skid > 1: try: issuing_distribution_point = crl_file.extensions.get_extension_for_oid( - x509.OID_ISSUING_DISTRIBUTION_POINT).value + x509.oid.ExtensionOID.ISSUING_DISTRIBUTION_POINT).value except Exception: logging.warning("CRL Issuing Distribution Point not found, continue...") continue uri_list = issuing_distribution_point.full_name if len(uri_list) == 1 and isinstance(uri_list[0], x509.UniformResourceIdentifier): - if uri_list[0].value != revocation_point["dataURL"]: + if uri_list[0].value != revocation_point.dataURL: logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...") continue else: @@ -570,44 +975,25 @@ def from_dcl(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_n # TODO: 8. Validate CRL as per Section 6.3 of RFC 5280 - # 9. decide on certificate authority name and AKID - certificate_authority_name_b64, certificate_akid_hex = get_certificate_authority_details( - crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point["isPAA"]) + # 9. Decide on certificate authority name and AKID + certificate_authority_name, certificate_akid_hex = get_certificate_authority_details( + crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point.isPAA) # validate issuer skid matchces with the one in revocation points - logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}") + logging.debug(f"revocation_point.issuerSubjectKeyID: {revocation_point.issuerSubjectKeyID}") - if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex: + if revocation_point.issuerSubjectKeyID != certificate_akid_hex: logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...") continue # 10. Iterate through the Revoked Certificates List entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate, - certificate_authority_name_b64, certificate_akid_hex, crl_signer_delegator_cert) + certificate_authority_name, certificate_akid_hex, crl_signer_delegator_cert) logging.debug(f"Entry to append: {entry}") revocation_set.append(entry) with open(output, 'w+') as outfile: - json.dump(revocation_set, outfile, indent=4) - - -@cli.command('from-crl') -@click.option('--crl', required=True, type=click.File('rb'), help='Path to the CRL file') -@click.option('--crl-signer', required=True, type=click.File('rb'), help='Path to the signer certificate') -@click.option('--delegator', type=click.File('rb'), help='Path to the delegator certificate (optional)') -@click.option('--paa', type=click.File('rb'), help='Path to the PAA certificate (optional)') -@click.option('--output', default='revocation_set.json', type=click.File('w'), help='Output filename (default: revocation_set.json)') -@click.option('--is-paa', default=False, is_flag=True, help='Indicates if the CRL issuer is the PAA') -def from_crl(crl, crl_signer, delegator, paa, output, is_paa): - """Generate revocation set from a single CRL file""" - crl = x509.load_pem_x509_crl(crl.read()) - crl_signer = x509.load_pem_x509_certificate(crl_signer.read()) - delegator = x509.load_pem_x509_certificate(delegator.read()) if delegator else None - paa = x509.load_pem_x509_certificate(paa.read()) if paa else None - - ca_name_b64, ca_akid_hex = get_certificate_authority_details(crl_signer, delegator, paa, is_paa) - revocation_set = generate_revocation_set_from_crl(crl, crl_signer, ca_name_b64, ca_akid_hex, delegator) - output.write(json.dumps([revocation_set], indent=4)) + json.dump([revocation.asDict() for revocation in revocation_set], outfile, indent=4) class TestRevocationSetGeneration(unittest.TestCase): @@ -622,25 +1008,23 @@ def get_test_file_path(self, filename): def compare_revocation_sets(self, generated_set, expected_file): with open(os.path.join(self.test_base_dir, expected_file), 'r') as f: - expected_set = json.load(f) + expected_set = [RevocationSet(**r) for r in json.load(f)] # Compare the contents self.assertEqual(len([generated_set]), len(expected_set)) expected = expected_set[0] # Compare required fields - self.assertEqual(generated_set['type'], expected['type']) - self.assertEqual(generated_set['issuer_subject_key_id'], expected['issuer_subject_key_id']) - self.assertEqual(generated_set['issuer_name'], expected['issuer_name']) - self.assertEqual(set(generated_set['revoked_serial_numbers']), set(expected['revoked_serial_numbers'])) - self.assertEqual(generated_set['crl_signer_cert'], expected['crl_signer_cert']) + self.assertEqual(generated_set.type, expected.type) + self.assertEqual(generated_set.issuer_subject_key_id, expected.issuer_subject_key_id) + self.assertEqual(generated_set.issuer_name, expected.issuer_name) + self.assertEqual(set(generated_set.revoked_serial_numbers), set(expected.revoked_serial_numbers)) + self.assertEqual(generated_set.crl_signer_cert, expected.crl_signer_cert) # Compare optional fields if present in either set - if 'crl_signer_delegator' in generated_set and 'crl_signer_delegator' in expected: - self.assertEqual(generated_set['crl_signer_delegator'], expected['crl_signer_delegator'], - "CRL signer delegator certificates do not match") - elif 'crl_signer_delegator' in generated_set or 'crl_signer_delegator' in expected: - self.fail("CRL signer delegator certificate is missing in one of the sets") + if generated_set.crl_signer_delegator or expected.crl_signer_delegator: + self.assertEqual(generated_set.crl_signer_delegator, expected.crl_signer_delegator, + f'CRL signer delegator certificates do not match, expected: {expected.crl_signer_delegator}, actual: {generated_set.crl_signer_delegator}') def test_paa_revocation_set(self): """Test generation of PAA revocation set"""