33
33
# source py/bin/activate
34
34
# pip install opencv-python requests click_option_group
35
35
36
-
36
+ import base64
37
37
import importlib
38
38
import logging
39
39
import os
75
75
76
76
@dataclass
77
77
class Failure :
78
- test : str
79
78
step : str
79
+ exception : typing .Optional [Exception ]
80
80
81
81
82
82
class Hooks ():
83
83
def __init__ (self ):
84
- self .failures = []
84
+ self .failures = {}
85
85
self .current_step = 'unknown'
86
86
self .current_test = 'unknown'
87
87
@@ -96,7 +96,9 @@ def test_start(self, filename: str, name: str, count: int):
96
96
pass
97
97
98
98
def test_stop (self , exception : Exception , duration : int ):
99
- pass
99
+ # Exception is the test assertion that caused the failure
100
+ if exception :
101
+ self .failures [self .current_test ].exception = exception
100
102
101
103
def step_skipped (self , name : str , expression : str ):
102
104
pass
@@ -108,7 +110,7 @@ def step_success(self, logger, logs, duration: int, request):
108
110
pass
109
111
110
112
def step_failure (self , logger , logs , duration : int , request , received ):
111
- self .failures . append ( Failure ( self .current_test , self .current_step ) )
113
+ self .failures [ self .current_test ] = Failure ( self .current_step )
112
114
113
115
def step_unknown (self ):
114
116
pass
@@ -126,22 +128,6 @@ async def test_TestEventTriggersCheck(self):
126
128
asserts .assert_equal (ret , 0 , "TestEventTriggers are still on" )
127
129
128
130
129
- def get_dcl_vendor (vid ):
130
- return requests .get (f"{ fetch_paa_certs_from_dcl .PRODUCTION_NODE_URL_REST } /dcl/vendorinfo/vendors/{ vid } " ).json ()
131
-
132
-
133
- def get_dcl_model (vid , pid ):
134
- return requests .get (f"{ fetch_paa_certs_from_dcl .PRODUCTION_NODE_URL_REST } /dcl/model/models/{ vid } /{ pid } " ).json ()
135
-
136
-
137
- def get_dcl_compliance_info (vid , pid , software_version ):
138
- return requests .get (f"{ fetch_paa_certs_from_dcl .PRODUCTION_NODE_URL_REST } /dcl/compliance/compliance-info/{ vid } /{ pid } /{ software_version } /matter" ).json ()
139
-
140
-
141
- def get_dcl_certified_model (vid , pid , software_version ):
142
- return requests .get (f"{ fetch_paa_certs_from_dcl .PRODUCTION_NODE_URL_REST } /dcl/compliance/certified-models/{ vid } /{ pid } /{ software_version } /matter" ).json ()
143
-
144
-
145
131
class DclCheck (MatterBaseTest , BasicCompositionTests ):
146
132
@async_test_body
147
133
async def setup_class (self ):
@@ -150,13 +136,17 @@ async def setup_class(self):
150
136
self .vid = await self .read_single_attribute_check_success (cluster = bi , attribute = bi .Attributes .VendorID )
151
137
self .pid = await self .read_single_attribute_check_success (cluster = bi , attribute = bi .Attributes .ProductID )
152
138
self .software_version = await self .read_single_attribute_check_success (cluster = bi , attribute = bi .Attributes .SoftwareVersion )
139
+ self .url = fetch_paa_certs_from_dcl .PRODUCTION_NODE_URL_REST
140
+ self .vid = 0x6006
141
+ self .pid = 1
142
+ self .software_version = 1300
153
143
154
144
def steps_Vendor (self ):
155
145
return [TestStep (1 , "Check if device VID is listed in the DCL vendor schema" , "Listing found" )]
156
146
157
147
def test_Vendor (self ):
158
148
self .step (1 )
159
- entry = get_dcl_vendor ( self .vid )
149
+ entry = requests . get ( f" { self .url } /dcl/vendorinfo/vendors/ { self . vid } " ). json ( )
160
150
key = 'vendorInfo'
161
151
asserts .assert_true (key in entry .keys (), f"Unable to find vendor entry for { self .vid :04x} " )
162
152
logging .info (f'Found vendor key 0x{ self .vid :04X} in DCL:' )
@@ -168,7 +158,7 @@ def steps_Model(self):
168
158
def test_Model (self ):
169
159
self .step (1 )
170
160
key = 'model'
171
- entry = get_dcl_model ( self .vid , self .pid )
161
+ entry = requests . get ( f" { self .url } /dcl/model/models/ { self . vid } / { self .pid } " ). json ( )
172
162
asserts .assert_true (key in entry .keys (), f"Unable to find model entry for { self .vid :04x} { self .pid :04x} " )
173
163
logging .info (f'Found model entry for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} in the DCL:' )
174
164
logging .info (f'{ entry [key ]} ' )
@@ -179,7 +169,7 @@ def steps_Compliance(self):
179
169
def test_Compliance (self ):
180
170
self .step (1 )
181
171
key = 'complianceInfo'
182
- entry = get_dcl_compliance_info ( self .vid , self .pid , self .software_version )
172
+ entry = requests . get ( f" { self .url } /dcl/compliance/compliance-info/ { self . vid } / { self .pid } / { self .software_version } /matter" ). json ( )
183
173
asserts .assert_true (key in entry .keys (),
184
174
f"Unable to find compliance entry for { self .vid :04x} { self .pid :04x} { self .software_version } " )
185
175
logging .info (
@@ -192,13 +182,66 @@ def steps_CertifiedModel(self):
192
182
def test_CertifiedModel (self ):
193
183
self .step (1 )
194
184
key = 'certifiedModel'
195
- entry = get_dcl_certified_model ( self .vid , self .pid , self .software_version )
185
+ entry = requests . get ( f" { self .url } /dcl/compliance/certified-models/ { self . vid } / { self .pid } / { self .software_version } /matter" ). json ( )
196
186
asserts .assert_true (key in entry .keys (),
197
187
f"Unable to find certified model entry for { self .vid :04x} { self .pid :04x} { self .software_version } " )
198
188
logging .info (
199
189
f'Found certified model for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} software version={ self .software_version } in the DCL:' )
200
190
logging .info (f'{ entry [key ]} ' )
201
191
192
+ def steps_AllSoftwareVersions (self ):
193
+ return [TestStep (1 , "Query the version information for this software version" , "DCL entry exists" ),
194
+ TestStep (2 , "For each valid software version with an OtaUrl, verify the OtaChecksumType is in the valid range and the OtaChecksum is a base64. If the softwareVersion matches the current softwareVersion on the device, ensure the entry is valid." , "OtaChecksum is base64 and OtaChecksumType is in the valid set" )]
195
+ def test_AllSoftwareVersions (self ):
196
+ self .step (1 )
197
+ versions_entry = requests .get (f"{ self .url } /dcl/model/versions/{ self .vid } /{ self .pid } " ).json ()
198
+ key_model_versions = 'modelVersions'
199
+ asserts .assert_true (key_model_versions in versions_entry .keys (), f"Unable to find { key_model_versions } in software versions schema for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} " )
200
+ logging .info (
201
+ f'Found version info for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} in the DCL:' )
202
+ logging .info (f'{ versions_entry [key_model_versions ]} ' )
203
+ key_software_versions = 'softwareVersions'
204
+ asserts .assert_true (key_software_versions in versions_entry [key_model_versions ].keys (), f"Unable to find { key_software_versions } in software versions schema for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} " )
205
+
206
+ problems = []
207
+ self .step (2 )
208
+ for software_version in versions_entry [key_model_versions ][key_software_versions ]:
209
+ entry_wrapper = requests .get (f"{ self .url } /dcl/model/versions/{ self .vid } /{ self .pid } /{ software_version } " ).json ()
210
+ key_model_version = 'modelVersion'
211
+ if key_model_version not in entry_wrapper :
212
+ problems .append (f'Missing key { key_model_version } in entry for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} software version={ software_version } ' )
213
+ continue
214
+ logging .info (f'Found entry version entry for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} software version={ software_version } ' )
215
+ logging .info (entry_wrapper )
216
+ entry = entry_wrapper [key_model_version ]
217
+ key_ota_url = 'otaUrl'
218
+ key_software_version_valid = 'softwareVersionValid'
219
+ key_ota_checksum = 'otaChecksum'
220
+ key_ota_checksum_type = 'otaChecksumType'
221
+ def check_key (key ):
222
+ if key not in entry .keys ():
223
+ problems .append (f'Missing key { key } in DCL versions entry for vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} software version={ software_version } ' )
224
+ check_key (key_ota_url )
225
+ check_key (key_software_version_valid )
226
+ if entry [key_software_version_valid ] and entry [key_ota_url ]:
227
+ check_key (key_ota_checksum )
228
+ check_key (key_ota_checksum_type )
229
+ valid_checksum_types = [1 , 7 , 8 , 10 , 11 , 12 ]
230
+ if entry [key_ota_checksum_type ] not in valid_checksum_types :
231
+ problems .append (f'OtaChecksumType for entry vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} software version={ software_version } is invalid. Found { entry [key_ota_checksum_type ]} valid values: { valid_checksum_types } ' )
232
+ checksum = entry [key_ota_checksum ]
233
+ try :
234
+ is_base64 = base64 .b64encode (base64 .b64decode (checksum )).decode ('utf-8' ) == checksum
235
+ except (ValueError , TypeError ):
236
+ is_base64 = False
237
+ if not is_base64 :
238
+ problems .append (f"Checksum { checksum } is not base64 encoded for for entry vid=0x{ self .vid :04X} pid=0x{ self .pid :04X} software version={ software_version } " )
239
+ #TODO: download and actually checksum it? Maybe just for the current version? And size check?
240
+ msg = 'Problems found in software version DCL checks:\n '
241
+ for problem in problems :
242
+ msg += f'{ problem } \n '
243
+ asserts .assert_false (problems , msg )
244
+
202
245
203
246
def get_qr () -> str :
204
247
qr_code_detector = cv2 .QRCodeDetector ()
@@ -336,9 +379,7 @@ def main():
336
379
337
380
failures_test_event_trigger = run_test (TestEventTriggersCheck , ['test_TestEventTriggersCheck' ], test_config )
338
381
339
- failures_dcl = run_test (DclCheck , ['test_Vendor' , 'test_Model' , 'test_Compliance' , 'test_CertifiedModel' ], test_config )
340
-
341
- failures = failures_DA_1_2 + failures_DA_1_7 + failures_test_event_trigger + failures_dcl
382
+ failures_dcl = run_test (DclCheck , ['test_Vendor' , 'test_Model' , 'test_Compliance' , 'test_CertifiedModel' , 'test_AllSoftwareVersions' ], test_config )
342
383
343
384
report = []
344
385
for failure in failures_DA_1_2 :
@@ -366,17 +407,21 @@ def main():
366
407
# only one possible failure here
367
408
report .append ('Device has test event triggers enabled in production' )
368
409
369
- for failure in failures_dcl :
370
- if failure . test == 'test_Vendor' :
410
+ for test , failure in failures_dcl . items () :
411
+ if test == 'test_Vendor' :
371
412
report .append ('Device vendor ID is not present in the DCL' )
372
- elif failure . test == 'test_Model' :
413
+ elif test == 'test_Model' :
373
414
report .append ('Device model is not present in the DCL' )
374
- elif failure . test == 'test_Compliance' :
415
+ elif test == 'test_Compliance' :
375
416
report .append ('Device compliance information is not present in the DCL' )
376
- elif failure . test == 'test_CertifiedModel' :
417
+ elif test == 'test_CertifiedModel' :
377
418
report .append ('Device certified model is not present in the DCL' )
419
+ elif test == 'test_AllSoftwareVersions' :
420
+ report .append ('Problems with device software version in the DCL' )
378
421
else :
379
- report .append (f'unknown DCL failure in test { failure .test } : { failure .step } ' )
422
+ report .append (f'unknown DCL failure in test { test } : { failure .step } ' )
423
+ report .append ('\n ' )
424
+ report .append (str (failure .exception ))
380
425
381
426
print ('\n \n \n ' )
382
427
if report :
0 commit comments