Skip to content

Commit e0d4634

Browse files
committed
Proper checksum check on the downloaded OTA
1 parent a9edbf4 commit e0d4634

File tree

1 file changed

+60
-33
lines changed

1 file changed

+60
-33
lines changed

src/python_testing/post_certification_tests/post-cert-checks.py

+60-33
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
# pip install opencv-python requests click_option_group
3535

3636
import base64
37+
import hashlib
3738
import importlib
3839
import logging
3940
import os
@@ -110,7 +111,7 @@ def step_success(self, logger, logs, duration: int, request):
110111
pass
111112

112113
def step_failure(self, logger, logs, duration: int, request, received):
113-
self.failures[self.current_test] = Failure(self.current_step)
114+
self.failures[self.current_test] = Failure(self.current_step, None)
114115

115116
def step_unknown(self):
116117
pass
@@ -138,15 +139,19 @@ async def setup_class(self):
138139
self.software_version = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.SoftwareVersion)
139140
self.url = fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST
140141

142+
self.vid_str = f'vid = 0x{self.vid:04X}'
143+
self.vid_pid_str = f'{self.vid_str} pid = 0x{self.pid:04X}'
144+
self.vid_pid_sv_str = f'{self.vid_pid_str} software version = {self.software_version}'
145+
141146
def steps_Vendor(self):
142147
return [TestStep(1, "Check if device VID is listed in the DCL vendor schema", "Listing found")]
143148

144149
def test_Vendor(self):
145150
self.step(1)
146151
entry = requests.get(f"{self.url}/dcl/vendorinfo/vendors/{self.vid}").json()
147152
key = 'vendorInfo'
148-
asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid:04x}")
149-
logging.info(f'Found vendor key 0x{self.vid:04X} in DCL:')
153+
asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid_str}")
154+
logging.info(f'Found vendor key for {self.vid_str} in the DCL:')
150155
logging.info(f'{entry[key]}')
151156

152157
def steps_Model(self):
@@ -156,8 +161,8 @@ def test_Model(self):
156161
self.step(1)
157162
key = 'model'
158163
entry = requests.get(f"{self.url}/dcl/model/models/{self.vid}/{self.pid}").json()
159-
asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid:04x} {self.pid:04x}")
160-
logging.info(f'Found model entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:')
164+
asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid_pid_str}")
165+
logging.info(f'Found model entry for {self.vid_pid_str} in the DCL:')
161166
logging.info(f'{entry[key]}')
162167

163168
def steps_Compliance(self):
@@ -168,9 +173,9 @@ def test_Compliance(self):
168173
key = 'complianceInfo'
169174
entry = requests.get(f"{self.url}/dcl/compliance/compliance-info/{self.vid}/{self.pid}/{self.software_version}/matter").json()
170175
asserts.assert_true(key in entry.keys(),
171-
f"Unable to find compliance entry for {self.vid:04x} {self.pid:04x} {self.software_version}")
176+
f"Unable to find compliance entry for {self.vid_pid_sv_str}")
172177
logging.info(
173-
f'Found compliance info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:')
178+
f'Found compliance info for {self.vid_pid_sv_str} in the DCL:')
174179
logging.info(f'{entry[key]}')
175180

176181
def steps_CertifiedModel(self):
@@ -181,59 +186,79 @@ def test_CertifiedModel(self):
181186
key = 'certifiedModel'
182187
entry = requests.get(f"{self.url}/dcl/compliance/certified-models/{self.vid}/{self.pid}/{self.software_version}/matter").json()
183188
asserts.assert_true(key in entry.keys(),
184-
f"Unable to find certified model entry for {self.vid:04x} {self.pid:04x} {self.software_version}")
189+
f"Unable to find certified model entry for {self.vid_pid_sv_str}")
185190
logging.info(
186-
f'Found certified model for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:')
191+
f'Found certified model for {self.vid_pid_sv_str} in the DCL:')
187192
logging.info(f'{entry[key]}')
188193

189194
def steps_AllSoftwareVersions(self):
190195
return [TestStep(1, "Query the version information for this software version", "DCL entry exists"),
191196
TestStep(2, "For each valid software version with an OtaUrl, verify the OtaChecksumType is in the valid range and the OtaChecksum is a base64. If the softwareVersion matches the current softwareVersion on the device, ensure the entry is valid.", "OtaChecksum is base64 and OtaChecksumType is in the valid set")]
197+
192198
def test_AllSoftwareVersions(self):
193199
self.step(1)
194200
versions_entry = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}").json()
195201
key_model_versions = 'modelVersions'
196-
asserts.assert_true(key_model_versions in versions_entry.keys(), f"Unable to find {key_model_versions} in software versions schema for vid=0x{self.vid:04X} pid=0x{self.pid:04X}")
197-
logging.info(
198-
f'Found version info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:')
202+
asserts.assert_true(key_model_versions in versions_entry.keys(), f"Unable to find {key_model_versions} in software versions schema for {self.vid_pid_str}")
203+
logging.info(f'Found version info for vid=0x{self.vid_pid_str} in the DCL:')
199204
logging.info(f'{versions_entry[key_model_versions]}')
200205
key_software_versions = 'softwareVersions'
201-
asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys(), f"Unable to find {key_software_versions} in software versions schema for vid=0x{self.vid:04X} pid=0x{self.pid:04X}")
206+
asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys(), f"Unable to find {key_software_versions} in software versions schema for {self.vid_pid_str}")
202207

203208
problems = []
204209
self.step(2)
205210
for software_version in versions_entry[key_model_versions][key_software_versions]:
206211
entry_wrapper = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}/{software_version}").json()
207212
key_model_version = 'modelVersion'
208213
if key_model_version not in entry_wrapper:
209-
problems.append(f'Missing key {key_model_version} in entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}')
214+
problems.append(f'Missing key {key_model_version} in entry for {self.vid_pid_str} software version={software_version}')
210215
continue
211-
logging.info(f'Found entry version entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}')
216+
logging.info(f'Found entry version entry for {self.vid_pid_str} software version={software_version}')
212217
logging.info(entry_wrapper)
213218
entry = entry_wrapper[key_model_version]
214219
key_ota_url = 'otaUrl'
215220
key_software_version_valid = 'softwareVersionValid'
216221
key_ota_checksum = 'otaChecksum'
217222
key_ota_checksum_type = 'otaChecksumType'
223+
key_ota_file_size = 'otaFileSize'
224+
218225
def check_key(key):
219226
if key not in entry.keys():
220-
problems.append(f'Missing key {key} in DCL versions entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}')
227+
problems.append(f'Missing key {key} in DCL versions entry for {self.vid_pid_str} software version={software_version}')
221228
check_key(key_ota_url)
222229
check_key(key_software_version_valid)
223230
if entry[key_software_version_valid] and entry[key_ota_url]:
224231
check_key(key_ota_checksum)
225232
check_key(key_ota_checksum_type)
226-
valid_checksum_types = [1, 7, 8, 10, 11, 12]
227-
if entry[key_ota_checksum_type] not in valid_checksum_types:
228-
problems.append(f'OtaChecksumType for entry vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {valid_checksum_types}')
233+
checksum_types = {1: hashlib.sha256, 7: hashlib.sha384, 8: hashlib.sha256, 10: hashlib.sha3_256, 11: hashlib.sha3_384, 12: hashlib.sha3_512}
234+
if entry[key_ota_checksum_type] not in checksum_types.keys():
235+
problems.append(f'OtaChecksumType for entry {self.vid_pid_str} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {checksum_types.keys()}')
236+
continue
229237
checksum = entry[key_ota_checksum]
230238
try:
231239
is_base64 = base64.b64encode(base64.b64decode(checksum)).decode('utf-8') == checksum
232240
except (ValueError, TypeError):
233241
is_base64 = False
234242
if not is_base64:
235-
problems.append(f"Checksum {checksum} is not base64 encoded for for entry vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}")
236-
#TODO: download and actually checksum it? Maybe just for the current version? And size check?
243+
problems.append(f"Checksum {checksum} is not base64 encoded for for entry {self.vid_pid_str} software version={software_version}")
244+
continue
245+
246+
response = requests.get(entry[key_ota_url])
247+
if not response.ok:
248+
problems.append(f"Unable to get OTA object from {entry[key_ota_url]} for {self.vid_pid_str} software version = {software_version}")
249+
continue
250+
251+
ota_len = str(len(response.content))
252+
dcl_len = entry[key_ota_file_size]
253+
if ota_len != dcl_len:
254+
problems.append(f'Incorrect OTA size for {self.vid_pid_str} software_version = {software_version}, received size: {len(response.content)} DCL states {entry[key_ota_file_size]}')
255+
continue
256+
257+
checksum = checksum_types[entry[key_ota_checksum_type]](response.content).digest()
258+
dcl_checksum = base64.b64decode(entry[key_ota_checksum])
259+
if checksum != dcl_checksum:
260+
problems.append(f'Incorrect checksum for {self.vid_pid_str} software version = {software_version}, calculated: {checksum}, DCL: {dcl_checksum}')
261+
237262
msg = 'Problems found in software version DCL checks:\n'
238263
for problem in problems:
239264
msg += f'{problem}\n'
@@ -376,33 +401,36 @@ def main():
376401

377402
failures_test_event_trigger = run_test(TestEventTriggersCheck, ['test_TestEventTriggersCheck'], test_config)
378403

379-
failures_dcl = run_test(DclCheck, ['test_Vendor', 'test_Model', 'test_Compliance', 'test_CertifiedModel', 'test_AllSoftwareVersions'], test_config)
404+
# [] means all tests.
405+
failures_dcl = run_test(DclCheck, [], test_config)
380406

381407
report = []
382-
for failure in failures_DA_1_2:
408+
for test, failure in failures_DA_1_2.items():
383409
# Check for known failures first
384410
# step 6.9 - non-production CD
385411
# 9 - not signed by CSA CA
386412
# other steps - should have been caught in cert, but we should report none the less
387413
if failure.step.startswith('6.9'):
388414
report.append('Device is using a non-production certification declaration')
389-
continue
390-
if failure.step.startswith('9'):
415+
elif failure.step.startswith('9'):
391416
report.append('Device is using a certification declaration that was not signed by the CSA CA')
392-
continue
393-
report.append(f'Device attestation failure: TC-DA-1.2: {failure.step}')
417+
else:
418+
report.append(f'Device attestation failure: TC-DA-1.2: {failure.step}')
419+
report.append(f'\t{str(failure.exception)}\n')
394420

395-
for failure in failures_DA_1_7:
421+
for test, failure in failures_DA_1_7.items():
396422
# Notable failures in DA-1.7:
397423
# 1.3 - PAI signature does not chain to a PAA in the main net DCL
398424
if failure.step.startswith('1.3'):
399425
report.append('Device DAC chain does not chain to a PAA in the main net DCL')
400-
continue
401-
report.append(f'Device attestation failure: TC-DA-1.7: {failure.step}')
426+
else:
427+
report.append(f'Device attestation failure: TC-DA-1.7: {failure.step}')
428+
report.append(f'\t{str(failure.exception)}\n')
402429

403-
for failure in failures_test_event_trigger:
430+
for test, failure in failures_test_event_trigger.items():
404431
# only one possible failure here
405432
report.append('Device has test event triggers enabled in production')
433+
report.append(f'\t{str(failure.exception)}\n')
406434

407435
for test, failure in failures_dcl.items():
408436
if test == 'test_Vendor':
@@ -417,8 +445,7 @@ def main():
417445
report.append('Problems with device software version in the DCL')
418446
else:
419447
report.append(f'unknown DCL failure in test {test}: {failure.step}')
420-
report.append('\n')
421-
report.append(str(failure.exception))
448+
report.append(f'\t{str(failure.exception)}\n')
422449

423450
print('\n\n\n')
424451
if report:

0 commit comments

Comments
 (0)