23
23
import base64
24
24
import json
25
25
import logging
26
+ import os
26
27
import subprocess
27
28
import sys
29
+ import unittest
28
30
from enum import Enum
29
31
from typing import Optional
30
32
@@ -199,6 +201,98 @@ def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList
199
201
logging .error ('Failed to fetch a valid CRL' )
200
202
201
203
204
+ def generate_revocation_set_from_crl (crl_file : x509 .CertificateRevocationList ,
205
+ crl_signer_certificate : x509 .Certificate ,
206
+ certificate_authority_name_b64 : str ,
207
+ certificate_akid_hex : str ,
208
+ crl_signer_delegator_cert : x509 .Certificate ) -> dict :
209
+ """Generate a revocation set from a CRL file.
210
+
211
+ Args:
212
+ crl_file: The CRL object containing revoked certificates
213
+ crl_signer_certificate: The certificate object used to sign the CRL
214
+ certificate_authority_name_b64: Base64 encoded issuer name
215
+ certificate_akid_hex: Hex encoded Authority Key Identifier
216
+ crl_signer_delegator_cert: crl signer delegator certificate object
217
+
218
+ Returns:
219
+ dict: A dictionary containing the revocation set data with fields:
220
+ - type: "revocation_set"
221
+ - issuer_subject_key_id: Authority Key Identifier (hex)
222
+ - issuer_name: Issuer name (base64)
223
+ - revoked_serial_numbers: List of revoked serial numbers
224
+ - crl_signer_cert: CRL signer certificate (base64 DER)
225
+ - crl_signer_delegator: Optional delegator certificate (base64 DER)
226
+ """
227
+ serialnumber_list = []
228
+
229
+ for revoked_cert in crl_file :
230
+ try :
231
+ cert_issuer_entry_ext = revoked_cert .extensions .get_extension_for_oid (x509 .CRLEntryExtensionOID .CERTIFICATE_ISSUER )
232
+ revoked_cert_issuer = cert_issuer_entry_ext .value .get_values_for_type (x509 .DirectoryName )[0 ].public_bytes ()
233
+ revoked_cert_issuer_b64 = base64 .b64encode (revoked_cert_issuer ).decode ('utf-8' )
234
+
235
+ if revoked_cert_issuer_b64 is not None :
236
+ # check if this really are the same thing
237
+ if revoked_cert_issuer_b64 != certificate_authority_name_b64 :
238
+ logging .warning ("CRL Issuer is not CRL File Issuer, continue..." )
239
+ continue
240
+ except Exception :
241
+ pass
242
+
243
+ serialnumber_list .append (bytes (str ('{:02X}' .format (revoked_cert .serial_number )), 'utf-8' ).decode ('utf-8' ))
244
+
245
+ entry = {
246
+ "type" : "revocation_set" ,
247
+ "issuer_subject_key_id" : certificate_akid_hex ,
248
+ "issuer_name" : certificate_authority_name_b64 ,
249
+ "revoked_serial_numbers" : serialnumber_list ,
250
+ "crl_signer_cert" : base64 .b64encode (crl_signer_certificate .public_bytes (serialization .Encoding .DER )).decode ('utf-8' ),
251
+ }
252
+
253
+ if crl_signer_delegator_cert :
254
+ entry ["crl_signer_delegator" ] = base64 .b64encode (
255
+ crl_signer_delegator_cert .public_bytes (serialization .Encoding .DER )).decode ('utf-8' )
256
+
257
+ return entry
258
+
259
+
260
+ # This is implemented as per point (9) in 6.2.4.1. Conceptual algorithm for revocation set construction
261
+ def get_certificate_authority_details (crl_signer_certificate : x509 .Certificate ,
262
+ crl_signer_delegator_cert : x509 .Certificate ,
263
+ paa_certificate_object : x509 .Certificate ,
264
+ is_paa : bool ) -> tuple [str , str ]:
265
+ """Get certificate authority name and AKID based on certificate hierarchy.
266
+
267
+ Args:
268
+ crl_signer_certificate: The CRL signer certificate
269
+ crl_signer_delegator_cert: Optional delegator certificate
270
+ paa_certificate_object: Optional PAA certificate
271
+ is_paa: Whether this is a PAA certificate
272
+
273
+ Returns:
274
+ tuple[str, str]: (certificate_authority_name_b64, certificate_akid_hex)
275
+ """
276
+ if is_paa and not is_self_signed_certificate (crl_signer_certificate ):
277
+ cert_for_details = paa_certificate_object
278
+ logging .debug ("Using PAA certificate for details" )
279
+ elif crl_signer_delegator_cert :
280
+ cert_for_details = crl_signer_delegator_cert
281
+ logging .debug ("Using CRL Signer Delegator certificate for details" )
282
+ else :
283
+ cert_for_details = crl_signer_certificate
284
+ logging .debug ("Using CRL Signer certificate for details" )
285
+
286
+ certificate_authority_name_b64 = get_subject_b64 (cert_for_details )
287
+ certificate_akid = get_skid (cert_for_details )
288
+ certificate_akid_hex = '' .join ('{:02X}' .format (x ) for x in certificate_akid )
289
+
290
+ logging .debug (f"Certificate Authority Name: { certificate_authority_name_b64 } " )
291
+ logging .debug (f"Certificate AKID: { certificate_akid_hex } " )
292
+
293
+ return certificate_authority_name_b64 , certificate_akid_hex
294
+
295
+
202
296
class DCLDClient :
203
297
'''
204
298
A client for interacting with DCLD using either the REST API or command line interface (CLI).
@@ -325,7 +419,7 @@ def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:
325
419
326
420
return issuer_certificate_object
327
421
328
- def get_revocations_points_by_skid (self , issuer_subject_key_id ) -> list [dict ]:
422
+ def get_revocations_points_by_skid (self , issuer_subject_key_id : str ) -> list [dict ]:
329
423
'''
330
424
Get revocation points by subject key ID
331
425
@@ -349,7 +443,12 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
349
443
return response ["pkiRevocationDistributionPointsByIssuerSubjectKeyID" ]["points" ]
350
444
351
445
352
- @click .command ()
446
+ @click .group ()
447
+ def cli ():
448
+ pass
449
+
450
+
451
+ @cli .command ('from-dcl' )
353
452
@click .help_option ('-h' , '--help' )
354
453
@optgroup .group ('Input data sources' , cls = RequiredMutuallyExclusiveOptionGroup )
355
454
@optgroup .option ('--use-main-net-dcld' , type = str , default = '' , metavar = 'PATH' , help = "Location of `dcld` binary, to use `dcld` for mirroring MainNet." )
@@ -362,9 +461,8 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
362
461
@optgroup .option ('--log-level' , default = 'INFO' , show_default = True , type = click .Choice (__LOG_LEVELS__ .keys (),
363
462
case_sensitive = False ), callback = lambda c , p , v : __LOG_LEVELS__ [v ],
364
463
help = 'Determines the verbosity of script output' )
365
- def main (use_main_net_dcld : str , use_test_net_dcld : str , use_main_net_http : bool , use_test_net_http : bool , output : str , log_level : str ):
366
- """Tool to construct revocation set from DCL"""
367
-
464
+ def from_dcl (use_main_net_dcld , use_test_net_dcld , use_main_net_http , use_test_net_http , output , log_level ):
465
+ """Generate revocation set from DCL"""
368
466
logging .basicConfig (
369
467
level = log_level ,
370
468
format = '%(asctime)s %(name)s %(levelname)-7s %(message)s' ,
@@ -467,65 +565,120 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
467
565
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280
468
566
469
567
# 9. decide on certificate authority name and AKID
470
- if revocation_point ["isPAA" ] and not is_self_signed_certificate (crl_signer_certificate ):
471
- certificate_authority_name_b64 = get_subject_b64 (paa_certificate_object )
472
- certificate_akid = get_skid (paa_certificate_object )
473
- elif crl_signer_delegator_cert :
474
- certificate_authority_name_b64 = get_subject_b64 (crl_signer_delegator_cert )
475
- certificate_akid = get_skid (crl_signer_delegator_cert )
476
- else :
477
- certificate_authority_name_b64 = get_subject_b64 (crl_signer_certificate )
478
- certificate_akid = get_skid (crl_signer_certificate )
568
+ certificate_authority_name_b64 , certificate_akid_hex = get_certificate_authority_details (
569
+ crl_signer_certificate , crl_signer_delegator_cert , paa_certificate_object , revocation_point ["isPAA" ])
479
570
480
571
# validate issuer skid matchces with the one in revocation points
481
- certificate_akid_hex = '' .join ('{:02X}' .format (x ) for x in certificate_akid )
482
-
483
- logging .debug (f"Certificate Authority Name: { certificate_authority_name_b64 } " )
484
- logging .debug (f"Certificate AKID: { certificate_akid_hex } " )
485
572
logging .debug (f"revocation_point['issuerSubjectKeyID']: { revocation_point ['issuerSubjectKeyID' ]} " )
486
573
487
574
if revocation_point ["issuerSubjectKeyID" ] != certificate_akid_hex :
488
575
logging .warning ("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue..." )
489
576
continue
490
577
491
- serialnumber_list = []
492
578
# 10. Iterate through the Revoked Certificates List
493
- for revoked_cert in crl_file :
494
- try :
495
- revoked_cert_issuer = revoked_cert .extensions .get_extension_for_oid (
496
- x509 .CRLEntryExtensionOID .CERTIFICATE_ISSUER ).value .get_values_for_type (x509 .DirectoryName ).value
497
-
498
- if revoked_cert_issuer is not None :
499
- # check if this really are the same thing
500
- if revoked_cert_issuer != certificate_authority_name_b64 :
501
- logging .warning ("CRL Issuer is not CRL File Issuer, continue..." )
502
- continue
503
- except Exception :
504
- logging .warning ("certificateIssuer entry extension not found in CRL" )
505
- pass
506
-
507
- serialnumber_list .append (bytes (str ('{:02X}' .format (revoked_cert .serial_number )), 'utf-8' ).decode ('utf-8' ))
508
-
509
- entry = {
510
- "type" : "revocation_set" ,
511
- "issuer_subject_key_id" : certificate_akid_hex ,
512
- "issuer_name" : certificate_authority_name_b64 ,
513
- "revoked_serial_numbers" : serialnumber_list ,
514
- "crl_signer_cert" : base64 .b64encode (crl_signer_certificate .public_bytes (serialization .Encoding .DER )).decode ('utf-8' ),
515
- }
516
-
517
- if crl_signer_delegator_cert :
518
- entry ["crl_signer_delegator" ] = base64 .b64encode (
519
- crl_signer_delegator_cert .public_bytes (serialization .Encoding .DER )).decode ('utf-8' ),
579
+ entry = generate_revocation_set_from_crl (crl_file , crl_signer_certificate ,
580
+ certificate_authority_name_b64 , certificate_akid_hex , crl_signer_delegator_cert )
520
581
logging .debug (f"Entry to append: { entry } " )
521
582
revocation_set .append (entry )
522
583
523
584
with open (output , 'w+' ) as outfile :
524
585
json .dump (revocation_set , outfile , indent = 4 )
525
586
526
587
588
+ @cli .command ('from-crl' )
589
+ @click .option ('--crl' , required = True , type = click .File ('rb' ), help = 'Path to the CRL file' )
590
+ @click .option ('--crl-signer' , required = True , type = click .File ('rb' ), help = 'Path to the signer certificate' )
591
+ @click .option ('--delegator' , type = click .File ('rb' ), help = 'Path to the delegator certificate (optional)' )
592
+ @click .option ('--paa' , type = click .File ('rb' ), help = 'Path to the PAA certificate (optional)' )
593
+ @click .option ('--output' , default = 'revocation_set.json' , type = click .File ('w' ), help = 'Output filename (default: revocation_set.json)' )
594
+ @click .option ('--is-paa' , default = False , is_flag = True , help = 'Indicates if the CRL issuer is the PAA' )
595
+ def from_crl (crl , crl_signer , delegator , paa , output , is_paa ):
596
+ """Generate revocation set from a single CRL file"""
597
+ crl = x509 .load_pem_x509_crl (crl .read ())
598
+ crl_signer = x509 .load_pem_x509_certificate (crl_signer .read ())
599
+ delegator = x509 .load_pem_x509_certificate (delegator .read ()) if delegator else None
600
+ paa = x509 .load_pem_x509_certificate (paa .read ()) if paa else None
601
+
602
+ ca_name_b64 , ca_akid_hex = get_certificate_authority_details (crl_signer , delegator , paa , is_paa )
603
+ revocation_set = generate_revocation_set_from_crl (crl , crl_signer , ca_name_b64 , ca_akid_hex , delegator )
604
+ output .write (json .dumps ([revocation_set ], indent = 4 ))
605
+
606
+
607
+ class TestRevocationSetGeneration (unittest .TestCase ):
608
+ """Test class for revocation set generation"""
609
+
610
+ def setUp (self ):
611
+ # Get the directory containing this file
612
+ self .test_base_dir = os .path .dirname (os .path .abspath (__file__ ))
613
+
614
+ def get_test_file_path (self , filename ):
615
+ return os .path .join (self .test_base_dir , 'test' , filename )
616
+
617
+ def compare_revocation_sets (self , generated_set , expected_file ):
618
+ with open (os .path .join (self .test_base_dir , expected_file ), 'r' ) as f :
619
+ expected_set = json .load (f )
620
+
621
+ # Compare the contents
622
+ self .assertEqual (len ([generated_set ]), len (expected_set ))
623
+ expected = expected_set [0 ]
624
+
625
+ # Compare required fields
626
+ self .assertEqual (generated_set ['type' ], expected ['type' ])
627
+ self .assertEqual (generated_set ['issuer_subject_key_id' ], expected ['issuer_subject_key_id' ])
628
+ self .assertEqual (generated_set ['issuer_name' ], expected ['issuer_name' ])
629
+ self .assertEqual (set (generated_set ['revoked_serial_numbers' ]), set (expected ['revoked_serial_numbers' ]))
630
+ self .assertEqual (generated_set ['crl_signer_cert' ], expected ['crl_signer_cert' ])
631
+
632
+ # Compare optional fields if present in either set
633
+ if 'crl_signer_delegator' in generated_set and 'crl_signer_delegator' in expected :
634
+ self .assertEqual (generated_set ['crl_signer_delegator' ], expected ['crl_signer_delegator' ],
635
+ "CRL signer delegator certificates do not match" )
636
+ elif 'crl_signer_delegator' in generated_set or 'crl_signer_delegator' in expected :
637
+ self .fail ("CRL signer delegator certificate is missing in one of the sets" )
638
+
639
+ def test_paa_revocation_set (self ):
640
+ """Test generation of PAA revocation set"""
641
+ with open (self .get_test_file_path ('revoked-attestation-certificates/Chip-Test-PAA-FFF1-CRL.pem' ), 'rb' ) as f :
642
+ crl = x509 .load_pem_x509_crl (f .read ())
643
+ with open (self .get_test_file_path ('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem' ), 'rb' ) as f :
644
+ crl_signer = x509 .load_pem_x509_certificate (f .read ())
645
+
646
+ ca_name_b64 , ca_akid_hex = get_certificate_authority_details (
647
+ crl_signer , None , None , True )
648
+ revocation_set = generate_revocation_set_from_crl (
649
+ crl , crl_signer , ca_name_b64 , ca_akid_hex , None )
650
+
651
+ self .compare_revocation_sets (
652
+ revocation_set ,
653
+ 'test/revoked-attestation-certificates/revocation-sets/revocation-set-for-paa.json'
654
+ )
655
+
656
+ def test_pai_revocation_set (self ):
657
+ """Test generation of PAI revocation set"""
658
+ with open (self .get_test_file_path ('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-CRL.pem' ), 'rb' ) as f :
659
+ crl = x509 .load_pem_x509_crl (f .read ())
660
+ with open (self .get_test_file_path ('revoked-attestation-certificates/Matter-Development-PAI-FFF1-noPID-Cert.pem' ), 'rb' ) as f :
661
+ crl_signer = x509 .load_pem_x509_certificate (f .read ())
662
+ with open (self .get_test_file_path ('revoked-attestation-certificates/Chip-Test-PAA-FFF1-Cert.pem' ), 'rb' ) as f :
663
+ paa = x509 .load_pem_x509_certificate (f .read ())
664
+
665
+ ca_name_b64 , ca_akid_hex = get_certificate_authority_details (
666
+ crl_signer , None , paa , False )
667
+ revocation_set = generate_revocation_set_from_crl (
668
+ crl , crl_signer , ca_name_b64 , ca_akid_hex , None )
669
+
670
+ self .compare_revocation_sets (
671
+ revocation_set ,
672
+ 'test/revoked-attestation-certificates/revocation-sets/revocation-set-for-pai.json'
673
+ )
674
+
675
+
527
676
if __name__ == "__main__" :
528
- if len (sys .argv ) == 1 :
529
- main .main (['--help' ])
677
+ if len (sys .argv ) > 1 and sys .argv [1 ] == 'test' :
678
+ # Remove the 'test' argument and run tests
679
+ sys .argv .pop (1 )
680
+ unittest .main ()
681
+ elif len (sys .argv ) == 1 :
682
+ cli .main (['--help' ])
530
683
else :
531
- main ()
684
+ cli ()
0 commit comments