Skip to content

Commit 9e0ab58

Browse files
authored
da_revocation: align the revocation set generation algorithm with spec changes (project-chip#36225)
* da_revocation: align the revocation set generation algorithm with spec changes * Add types to few methods * address review comments * add the vid/pid checks for PAI delegated crl signer * get_paa_cert_for_crl_issuer is actually fetching the issuer cert rather than the PAA
1 parent 47780ad commit 9e0ab58

File tree

1 file changed

+208
-101
lines changed

1 file changed

+208
-101
lines changed

credentials/generate-revocation-set.py

+208-101
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import subprocess
2727
import sys
2828
from enum import Enum
29+
from typing import Optional
2930

3031
import click
3132
import requests
@@ -91,6 +92,112 @@ def parse_vid_pid_from_distinguished_name(distinguished_name):
9192
return vid, pid
9293

9394

95+
def get_akid(cert: x509.Certificate) -> Optional[bytes]:
96+
try:
97+
return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
98+
except Exception:
99+
logging.warning("AKID not found in certificate")
100+
return None
101+
102+
103+
def get_skid(cert: x509.Certificate) -> Optional[bytes]:
104+
try:
105+
return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
106+
except Exception:
107+
logging.warning("SKID not found in certificate")
108+
return None
109+
110+
111+
def get_subject_b64(cert: x509.Certificate) -> str:
112+
return base64.b64encode(cert.subject.public_bytes()).decode('utf-8')
113+
114+
115+
def get_issuer_b64(cert: x509.Certificate) -> str:
116+
return base64.b64encode(cert.issuer.public_bytes()).decode('utf-8')
117+
118+
119+
def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool:
120+
'''
121+
Verifies if the cert is signed by root.
122+
'''
123+
124+
cert_akid = get_akid(cert)
125+
root_skid = get_skid(root)
126+
if cert_akid is None or root_skid is None or cert_akid != root_skid:
127+
return False
128+
129+
if cert.issuer != root.subject:
130+
return False
131+
132+
# public_key().verify() do not return anything if signature is valid,
133+
# will raise an exception if signature is invalid
134+
try:
135+
root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm))
136+
except Exception:
137+
logging.warning(f"Signature verification failed for cert subject: {get_subject_b64(cert)}, issuer: {get_issuer_b64(cert)}")
138+
return False
139+
140+
return True
141+
142+
143+
def is_self_signed_certificate(cert: x509.Certificate) -> bool:
144+
return verify_cert(cert, cert)
145+
146+
147+
# delegator is optional so can be None, but crl_signer and paa has to be present
148+
def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509.Certificate, paa: x509.Certificate):
149+
'''
150+
There could be four scenarios:
151+
1. CRL Signer is PAA itself, hence its self-signed certificate
152+
2. CRL Signer is PAI certificate, and we can validate (crl_signer -> paa) chain
153+
3. CRL Signer delegator is PAA, and we can validate (crl_signer -> crl_signer_delegator(paa) -> paa) chain
154+
4. CRL Signer delegator is PAI, and we can validate (crl_signer -> crl_signer_delegator -> paa) chain
155+
'''
156+
157+
if crl_signer_delegator:
158+
return verify_cert(crl_signer, crl_signer_delegator) and verify_cert(crl_signer_delegator, paa)
159+
else:
160+
return verify_cert(crl_signer, paa)
161+
162+
163+
def validate_vid_pid(revocation_point: dict, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool:
164+
crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)
165+
166+
if revocation_point["isPAA"]:
167+
if crl_signer_vid is not None:
168+
if revocation_point["vid"] != crl_signer_vid:
169+
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
170+
return False
171+
else:
172+
vid_to_match = crl_signer_vid
173+
pid_to_match = crl_signer_pid
174+
175+
# if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator
176+
if crl_signer_delegator_certificate:
177+
vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject)
178+
179+
if vid_to_match is None or revocation_point["vid"] != vid_to_match:
180+
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
181+
return False
182+
183+
if pid_to_match is not None:
184+
if revocation_point["pid"] != pid_to_match:
185+
logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...")
186+
return False
187+
188+
return True
189+
190+
191+
def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList:
192+
logging.debug(f"Fetching CRL from {url}")
193+
194+
try:
195+
r = requests.get(url, timeout=timeout)
196+
return x509.load_der_x509_crl(r.content)
197+
except Exception:
198+
logging.error('Failed to fetch a valid CRL')
199+
200+
94201
class DCLDClient:
95202
'''
96203
A client for interacting with DCLD using either the REST API or command line interface (CLI).
@@ -172,30 +279,50 @@ def get_revocation_points(self) -> list[dict]:
172279

173280
return response["PkiRevocationDistributionPoint"]
174281

175-
def get_paa_cert_for_crl_issuer(self, crl_signer_issuer_name_b64, crl_signer_authority_key_id) -> str:
282+
def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:
176283
'''
177-
Get PAA certificate for CRL issuer
284+
Get the issuer certificate for
178285
179286
Parameters
180287
----------
181-
crl_signer_issuer_name_b64: str
182-
The issuer name of the CRL signer.
183-
crl_signer_authority_key_id: str
184-
The authority key ID of the CRL signer.
288+
cert: x509.Certificate
289+
Certificate
185290
186291
Returns
187292
-------
188293
str
189-
PAA certificate in PEM format
294+
Issuer certificate in PEM format
190295
'''
296+
issuer_name_b64 = get_issuer_b64(cert)
297+
akid = get_akid(cert)
298+
if akid is None:
299+
return
300+
301+
# Convert CRL Signer AKID to colon separated hex
302+
akid_hex = akid.hex().upper()
303+
akid_hex = ':'.join([akid_hex[i:i+2] for i in range(0, len(akid_hex), 2)])
304+
305+
logging.debug(
306+
f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}")
307+
191308
if self.use_rest:
192309
response = requests.get(
193-
f"{self.rest_node_url}/dcl/pki/certificates/{crl_signer_issuer_name_b64}/{crl_signer_authority_key_id}").json()
310+
f"{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}").json()
194311
else:
195312
response = self.get_dcld_cmd_output_json(
196-
['query', 'pki', 'x509-cert', '-u', crl_signer_issuer_name_b64, '-k', crl_signer_authority_key_id])
313+
['query', 'pki', 'x509-cert', '-u', issuer_name_b64, '-k', akid_hex])
314+
315+
issuer_certificate = response["approvedCertificates"]["certs"][0]["pemCert"]
316+
317+
logging.debug(f"issuer: {issuer_certificate}")
197318

198-
return response["approvedCertificates"]["certs"][0]["pemCert"]
319+
try:
320+
issuer_certificate_object = x509.load_pem_x509_certificate(bytes(issuer_certificate, 'utf-8'))
321+
except Exception:
322+
logging.error('Failed to parse PAA certificate')
323+
return
324+
325+
return issuer_certificate_object
199326

200327
def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
201328
'''
@@ -211,6 +338,7 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
211338
list[dict]
212339
List of revocation points
213340
'''
341+
214342
if self.use_rest:
215343
response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json()
216344
else:
@@ -268,97 +396,55 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
268396
continue
269397

270398
# 2. Parse the certificate
271-
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))
272-
273-
vid = revocation_point["vid"]
274-
pid = revocation_point["pid"]
275-
is_paa = revocation_point["isPAA"]
276-
277-
# 3. && 4. Validate VID/PID
278-
crl_vid, crl_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)
279-
280-
if is_paa:
281-
if crl_vid is not None:
282-
if vid != crl_vid:
283-
logging.warning("VID is not CRL VID, continue...")
284-
continue
285-
else:
286-
if crl_vid is None or vid != crl_vid:
287-
logging.warning("VID is not CRL VID, continue...")
288-
continue
289-
if crl_pid is not None:
290-
if pid != crl_pid:
291-
logging.warning("PID is not CRL PID, continue...")
292-
continue
293-
294-
# 5. Validate the certification path containing CRLSignerCertificate.
295-
crl_signer_issuer_name = base64.b64encode(crl_signer_certificate.issuer.public_bytes()).decode('utf-8')
296-
297-
crl_signer_authority_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
298-
x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
299-
300-
# Convert CRL Signer AKID to colon separated hex
301-
crl_signer_authority_key_id = crl_signer_authority_key_id.hex().upper()
302-
crl_signer_authority_key_id = ':'.join([crl_signer_authority_key_id[i:i+2]
303-
for i in range(0, len(crl_signer_authority_key_id), 2)])
304-
305-
paa_certificate = dcld_client.get_paa_cert_for_crl_issuer(crl_signer_issuer_name, crl_signer_authority_key_id)
306-
307-
if paa_certificate is None:
308-
logging.warning("PAA Certificate not found, continue...")
399+
try:
400+
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))
401+
except Exception:
402+
logging.warning("CRL Signer Certificate is not valid, continue...")
309403
continue
310404

311-
paa_certificate_object = x509.load_pem_x509_certificate(bytes(paa_certificate, 'utf-8'))
405+
# Parse the crl signer delegator
406+
crl_signer_delegator_cert = None
407+
if "crlSignerDelegator" in revocation_point:
408+
crl_signer_delegator_cert_pem = revocation_point["crlSignerDelegator"]
409+
logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}")
410+
try:
411+
crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8'))
412+
except Exception:
413+
logging.warning("CRL Signer Delegator Certificate not found...")
312414

313-
# TODO: use verify_directly_issued_by() method when we upgrade cryptography to v40.0.0
314-
# Verify issuer matches with subject
315-
if crl_signer_certificate.issuer != paa_certificate_object.subject:
316-
logging.warning("CRL Signer Certificate issuer does not match with PAA Certificate subject, continue...")
415+
# 3. and 4. Validate VID/PID
416+
if not validate_vid_pid(revocation_point, crl_signer_certificate, crl_signer_delegator_cert):
417+
logging.warning("Failed to validate VID/PID, continue...")
317418
continue
318419

319-
# Check crl signers AKID matches with SKID of paa_certificate_object's AKID
320-
paa_skid = paa_certificate_object.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
321-
crl_akid = crl_signer_certificate.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
322-
if paa_skid != crl_akid:
323-
logging.warning("CRL Signer's AKID does not match with PAA Certificate SKID, continue...")
420+
# 5. Validate the certification path containing CRLSignerCertificate.
421+
paa_certificate_object = dcld_client.get_issuer_cert(crl_signer_certificate)
422+
if paa_certificate_object is None:
423+
logging.warning("PAA Certificate not found, continue...")
324424
continue
325425

326-
# verify if PAA singed the crl signer certificate
327-
try:
328-
paa_certificate_object.public_key().verify(crl_signer_certificate.signature,
329-
crl_signer_certificate.tbs_certificate_bytes,
330-
ec.ECDSA(crl_signer_certificate.signature_hash_algorithm))
331-
except Exception:
332-
logging.warning("CRL Signer Certificate is not signed by PAA Certificate, continue...")
426+
if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False:
427+
logging.warning("Failed to validate CRL Signer Certificate chain, continue...")
333428
continue
334429

335430
# 6. Obtain the CRL
336-
logging.debug(f"Fetching CRL from {revocation_point['dataURL']}")
337-
try:
338-
r = requests.get(revocation_point["dataURL"], timeout=5)
339-
except Exception:
340-
logging.error('Failed to fetch CRL')
341-
continue
342-
343-
try:
344-
crl_file = x509.load_der_x509_crl(r.content)
345-
except Exception:
346-
logging.error('Failed to load CRL')
431+
crl_file = fetch_crl_from_url(revocation_point["dataURL"], 5) # timeout in seconds
432+
if crl_file is None:
347433
continue
348434

349435
# 7. Perform CRL File Validation
350-
crl_authority_key_id = crl_file.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
351-
crl_signer_subject_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
352-
x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
353-
if crl_authority_key_id != crl_signer_subject_key_id:
354-
logging.warning("CRL Authority Key ID is not CRL Signer Subject Key ID, continue...")
436+
# a.
437+
crl_signer_skid = get_skid(crl_signer_certificate)
438+
crl_akid = get_akid(crl_file)
439+
if crl_akid != crl_signer_skid:
440+
logging.warning("CRL AKID is not CRL Signer SKID, continue...")
355441
continue
356442

357-
issuer_subject_key_id = ''.join('{:02X}'.format(x) for x in crl_authority_key_id)
443+
crl_akid_hex = ''.join('{:02X}'.format(x) for x in crl_akid)
358444

359445
# b.
360-
same_issuer_points = dcld_client.get_revocations_points_by_skid(issuer_subject_key_id)
361-
count_with_matching_vid_issuer_skid = sum(item.get('vid') == vid for item in same_issuer_points)
446+
same_issuer_points = dcld_client.get_revocations_points_by_skid(crl_akid_hex)
447+
count_with_matching_vid_issuer_skid = sum(item.get('vid') == revocation_point["vid"] for item in same_issuer_points)
362448

363449
if count_with_matching_vid_issuer_skid > 1:
364450
try:
@@ -377,40 +463,61 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
377463
logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...")
378464
continue
379465

380-
# 9. Assign CRL File Issuer
381-
certificate_authority_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
382-
logging.debug(f"CRL File Issuer: {certificate_authority_name}")
466+
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280
467+
468+
# 9. decide on certificate authority name and AKID
469+
if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate):
470+
certificate_authority_name_b64 = get_subject_b64(paa_certificate_object)
471+
certificate_akid = get_skid(paa_certificate_object)
472+
elif crl_signer_delegator_cert:
473+
certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert)
474+
certificate_akid = get_skid(crl_signer_delegator_cert)
475+
else:
476+
certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate)
477+
certificate_akid = get_skid(crl_signer_certificate)
478+
479+
# validate issuer skid matchces with the one in revocation points
480+
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)
481+
482+
logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
483+
logging.debug(f"Certificate AKID: {certificate_akid_hex}")
484+
logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}")
485+
486+
if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex:
487+
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
488+
continue
383489

384490
serialnumber_list = []
385491
# 10. Iterate through the Revoked Certificates List
386492
for revoked_cert in crl_file:
387-
# a.
388493
try:
389494
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
390495
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value
391496

392497
if revoked_cert_issuer is not None:
393-
if revoked_cert_issuer != certificate_authority_name:
498+
# check if this really are the same thing
499+
if revoked_cert_issuer != certificate_authority_name_b64:
394500
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
395501
continue
396502
except Exception:
503+
logging.warning("certificateIssuer entry extension not found in CRL")
397504
pass
398505

399-
# b.
400-
# TODO: Verify that the certificate chain of the entry is linking to the same PAA
401-
# that issued the CRLSignerCertificate for this entry, including path through
402-
# CRLSignerDelegator if present. If the PAAs under which were issued the certificate
403-
# and the CRLSignerCertificate are different, ignore the entry.
404-
405-
# c. and d.
406506
serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))
407507

408-
issuer_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
508+
entry = {
509+
"type": "revocation_set",
510+
"issuer_subject_key_id": certificate_akid_hex,
511+
"issuer_name": certificate_authority_name_b64,
512+
"revoked_serial_numbers": serialnumber_list,
513+
"crl_signer_cert": revocation_point["crlSignerCertificate"],
514+
}
515+
516+
if "crlSignerDelegator" in revocation_point:
517+
entry["crl_signer_delegator"] = revocation_point["crlSignerDelegator"]
409518

410-
revocation_set.append({"type": "revocation_set",
411-
"issuer_subject_key_id": issuer_subject_key_id,
412-
"issuer_name": issuer_name,
413-
"revoked_serial_numbers": serialnumber_list})
519+
logging.debug(f"Entry to append: {entry}")
520+
revocation_set.append(entry)
414521

415522
with open(output, 'w+') as outfile:
416523
json.dump(revocation_set, outfile, indent=4)

0 commit comments

Comments
 (0)