forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch_paa_certs_from_dcl.py
230 lines (184 loc) · 9.06 KB
/
fetch_paa_certs_from_dcl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/python
#
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script that was used to fetch CHIP Development Product Attestation Authority (PAA)
# certificates from DCL.
# For usage please run:.
# python ./credentials/fetch-paa-certs-from-dcl.py --help
import copy
import os
import re
import subprocess
import sys
import click
import requests
from click_option_group import RequiredMutuallyExclusiveOptionGroup, optgroup
from cryptography import x509
from cryptography.hazmat.primitives import serialization
PRODUCTION_NODE_URL = "https://on.dcl.csa-iot.org:26657"
PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org"
TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org"
MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw"
MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5"
def parse_paa_root_certs(cmdpipe, paa_list):
"""
example output of a query to all x509 root certs in DCL:
certs:
- subject: MCExHzAdBgNVBAMMFk1hdHRlciBEZXZlbG9wbWVudCBQQUE=
subjectKeyId: FA:92:CF:09:5E:FA:42:E1:14:30:65:16:32:FE:FE:1B:2C:77:A7:C8
- subject: MDAxGDAWBgNVBAMMD01hdHRlciBUZXN0IFBBQTEUMBIGCisGAQQBgqJ8AgEMBDEyNUQ=
subjectKeyId: E2:90:8D:36:9C:3C:A3:C1:13:BB:09:E2:4D:C1:CC:C5:A6:66:91:D4
- subject: MEsxCzAJBgNVBAYTAlVTMQ8wDQYDVQQKDAZHb29nbGUxFTATBgNVBAMMDE1hdHRlciBQQUEgMTEUMBIGCisGAQQBgqJ8AgEMBDYwMDY=
subjectKeyId: B0:00:56:81:B8:88:62:89:62:80:E1:21:18:A1:A8:BE:09:DE:93:21
- subject: MFUxCzAJBgNVBAYTAlVTMRcwFQYDVQQKEw5EaWdpQ2VydCwgSW5jLjEtMCsGA1UEAxMkRGlnaUNlcnQgVEVTVCBSb290IENBIGZvciBNQVRURVIgUEtJ
subjectKeyId: C0:E0:64:15:00:EC:67:E2:7C:AF:7C:6E:2D:49:94:C7:73:DE:B7:BA
- subject: MDAxLjAsBgNVBAMMJU5vbiBQcm9kdWN0aW9uIE9OTFkgLSBYRk4gUEFBIENsYXNzIDM=
subjectKeyId: F8:99:A9:D5:AD:71:71:E4:C3:81:7F:14:10:7F:78:F0:D9:F7:62:E9
- subject: MEIxGDAWBgNVBAMMD01hdHRlciBUZXN0IFBBQTEQMA4GA1UECgwHU2Ftc3VuZzEUMBIGCisGAQQBgqJ8AgEMBDEwRTE=
subjectKeyId: CF:9E:0A:16:78:8B:40:30:EC:DD:AB:34:B9:C2:EC:7B:E5:34:55:C0
Brief:
This method will search for the first line that contains ': ' char sequence.
From there, it assumes every 2 lines contain subject and subject key id info of
a valid PAA root certificate.
The paa_list parameter will contain a list of all valid PAA Root certificates
from DCL.
"""
result = {}
while True:
line = cmdpipe.stdout.readline()
if not line:
break
else:
if b': ' in line:
key, value = line.split(b': ')
result[key.strip(b' -').decode("utf-8")
] = value.strip().decode("utf-8")
parse_paa_root_certs.counter += 1
if parse_paa_root_certs.counter % 2 == 0:
paa_list.append(copy.deepcopy(result))
def write_cert(certificate, subject):
filename = 'dcld_mirror_' + \
re.sub('[^a-zA-Z0-9_-]', '', re.sub('[=, ]', '_', subject))
with open(filename + '.pem', 'w+') as outfile:
outfile.write(certificate)
# convert pem file to der
try:
with open(filename + '.pem', 'rb') as infile:
pem_certificate = x509.load_pem_x509_certificate(infile.read())
with open(filename + '.der', 'wb+') as outfile:
der_certificate = pem_certificate.public_bytes(
serialization.Encoding.DER)
outfile.write(der_certificate)
except (IOError, ValueError) as e:
print(
f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...")
def parse_paa_root_cert_from_dcld(cmdpipe):
subject = None
certificate = ""
while True:
line = cmdpipe.stdout.readline()
if not line:
break
else:
if b'pemCert: |' in line:
while True:
line = cmdpipe.stdout.readline()
certificate += line.strip(b' \t').decode("utf-8")
if b'-----END CERTIFICATE-----' in line:
break
if b'subjectAsText:' in line:
subject = line.split(b': ')[1].strip().decode("utf-8")
break
return (certificate, subject)
def use_dcld(dcld, production, cmdlist):
return [dcld] + cmdlist + (['--node', PRODUCTION_NODE_URL] if production else [])
@click.command()
@click.help_option('-h', '--help')
@optgroup.group('Input data sources', cls=RequiredMutuallyExclusiveOptionGroup)
@optgroup.option('--use-main-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring MainNet.")
@optgroup.option('--use-test-net-dcld', type=str, default='', metavar='PATH', help="Location of `dcld` binary, to use `dcld` for mirroring TestNet.")
@optgroup.option('--use-main-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public MainNet observer.")
@optgroup.option('--use-test-net-http', is_flag=True, type=str, help="Use RESTful API with HTTPS against public TestNet observer.")
@optgroup.group('Optional arguments')
@optgroup.option('--paa-trust-store-path', default='paa-root-certs', type=str, metavar='PATH', help="PAA trust store path (default: paa-root-certs)")
def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
"""DCL PAA mirroring tools"""
fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path)
def get_cert_from_rest(rest_node_url, subject, subject_key_id):
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{subject}/{subject_key_id}").json()["approvedCertificates"]["certs"][0]
certificate = response["pemCert"].rstrip("\n")
subject = response["subjectAsText"]
return certificate, subject
def fetch_cd_signing_certs(store_path):
''' Only supports using main net http currently.'''
rest_node_url = PRODUCTION_NODE_URL_REST
os.makedirs(store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(store_path)
cd_signer_ids = requests.get(
f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds']
for signer in cd_signer_ids:
subject = signer['subject']
subject_key_id = signer['subjectKeyId']
certificate, subject = get_cert_from_rest(rest_node_url, subject, subject_key_id)
print(f"Downloaded CD signing cert with subject: {subject}")
write_cert(certificate, subject)
os.chdir(original_dir)
def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
production = False
dcld = use_test_net_dcld
if len(use_main_net_dcld) > 0:
dcld = use_main_net_dcld
production = True
use_rest = use_main_net_http or use_test_net_http
if use_main_net_http:
production = True
rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST
os.makedirs(paa_trust_store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(paa_trust_store_path)
if use_rest:
paa_list = requests.get(
f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"]
else:
cmdlist = ['query', 'pki', 'all-x509-root-certs']
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
paa_list = []
parse_paa_root_certs.counter = 0
parse_paa_root_certs(cmdpipe, paa_list)
for paa in paa_list:
if paa['subject'] == MATTER_CERT_CA_SUBJECT and paa['subjectKeyId'] == MATTER_CERT_CA_SUBJECT_KEY_ID:
# Don't include the CD signing cert as a PAA root.
continue
if use_rest:
certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId'])
else:
cmdlist = ['query', 'pki', 'x509-cert', '-u',
paa['subject'], '-k', paa['subjectKeyId']]
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(certificate, subject) = parse_paa_root_cert_from_dcld(cmdpipe)
certificate = certificate.rstrip('\n')
print(f"Downloaded PAA certificate with subject: {subject}")
write_cert(certificate, subject)
os.chdir(original_dir)
if __name__ == "__main__":
if len(sys.argv) == 1:
main.main(['--help'])
else:
main()