32
32
33
33
import chip .clusters as Clusters
34
34
import chip .exceptions
35
- from matter_testing_support import MatterBaseTest , default_matter_test_main , per_endpoint_test , has_cluster , async_test_body
35
+ from matter_testing_support import MatterBaseTest , TestStep , default_matter_test_main , per_endpoint_test , has_cluster , async_test_body
36
36
from mobly import asserts
37
37
from chip .interaction_model import InteractionModelError , Status
38
38
@@ -88,22 +88,55 @@ def teardown_class(self):
88
88
os .remove (self .kvs )
89
89
super ().teardown_class ()
90
90
91
+
92
+ def steps_TC_CCTRL_2_2 (self ) -> list [TestStep ]:
93
+ steps = [TestStep (1 , "Get number of fabrics from TH_SERVER" , is_commissioning = True ),
94
+ TestStep (2 , "Reading Attribute VendorId from TH_SERVER" ),
95
+ TestStep (3 , "Reading Attribute ProductId from TH_SERVER" ),
96
+ TestStep (4 , "Reading Event CommissioningRequestResult from DUT" ),
97
+ TestStep (5 , "Send CommissionNode command to DUT with CASE session" ),
98
+ TestStep (6 , "Send OpenCommissioningWindow command on Administrator Commissioning Cluster to DUT with CASE session" ),
99
+ TestStep (7 , "Send CommissionNode command to DUT with PASE session" ),
100
+ TestStep (8 , "Send RequestCommissioningApproval command to DUT with PASE session" ),
101
+ TestStep (9 , "Send RevokeCommissioning command on Administrator Commissioning Cluster to DUT with CASE session" ),
102
+ TestStep (10 , "Reading Event CommissioningRequestResult from DUT, confirm no new events" ),
103
+ TestStep (11 , "Send RequestCommissioningApproval command to DUT with CASE session with incorrect vendorID" ),
104
+ TestStep (12 , "(Manual Step) Approve Commissioning Approval Request on DUT using method indicated by the manufacturer" ),
105
+ TestStep (13 , "Reading Event CommissioningRequestResult from DUT, confirm one new event" ),
106
+ TestStep (14 , "Send CommissionNode command to DUT with CASE session, with invalid RequestId" ),
107
+ TestStep (15 , "Send CommissionNode command to DUT with CASE session, with invalid ResponseTimeoutSeconds too low" ),
108
+ TestStep (16 , "Send CommissionNode command to DUT with CASE session, with invalid ResponseTimeoutSeconds too high" ),
109
+ TestStep (17 , "Send CommissionNode command to DUT with CASE session, with valid parameters" ),
110
+ TestStep (18 , "Send OpenCommissioningWindow command on Administrator Commissioning Cluster sent to TH_SERVER" ),
111
+ TestStep (19 , "Wait for DUT to fail commissioning TH_SERVER, 30 seconds" ),
112
+ TestStep (20 , "Get number of fabrics from TH_SERVER" ),
113
+ TestStep (21 , "Send RevokeCommissioning command on Administrator Commissioning Cluster sent to TH_SERVER" ),
114
+ TestStep (22 , "Send RequestCommissioningApproval command to DUT with CASE session with correct vendorID" ),
115
+ TestStep (23 , "(Manual Step) Approve Commissioning Approval Request on DUT using method indicated by the manufacturer" ),
116
+ TestStep (24 , "Reading Event CommissioningRequestResult from DUT, confirm one new event" ),
117
+ TestStep (25 , "Send CommissionNode command to DUT with CASE session, with valid parameters" ),
118
+ TestStep (26 , "Send OpenCommissioningWindow command on Administrator Commissioning Cluster sent to TH_SERVER" ),
119
+ TestStep (27 , "Wait for DUT to successfully commission TH_SERVER, 30 seconds" ),
120
+ TestStep (28 , "Get number of fabrics from TH_SERVER, verify DUT successfully commissioned TH_SERVER" )]
121
+
122
+ return steps
123
+
91
124
@per_endpoint_test (has_cluster (Clusters .CommissionerControl ))
92
125
async def test_TC_CCTRL_2_2 (self ):
93
126
self .is_ci = self .check_pics ('PICS_SDK_CI_ONLY' )
94
127
95
- # self.step(1)
128
+ self .step (1 )
96
129
th_server_fabrics = await self .read_single_attribute_check_success (cluster = Clusters .OperationalCredentials , attribute = Clusters .OperationalCredentials .Attributes .Fabrics , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 )
97
- # self.step(2)
130
+ self .step (2 )
98
131
th_server_vid = await self .read_single_attribute_check_success (cluster = Clusters .BasicInformation , attribute = Clusters .BasicInformation .Attributes .VendorID , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 )
99
- # self.step(3)
132
+ self .step (3 )
100
133
th_server_pid = await self .read_single_attribute_check_success (cluster = Clusters .BasicInformation , attribute = Clusters .BasicInformation .Attributes .ProductID , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 )
101
134
102
- # self.step(4)
135
+ self .step (4 )
103
136
event_path = [(self .matter_test_config .endpoint , Clusters .CommissionerControl .Events .CommissioningRequestResult , 1 )]
104
137
events = await self .default_controller .ReadEvent (nodeid = self .dut_node_id , events = event_path )
105
138
106
- # self.step(5)
139
+ self .step (5 )
107
140
ipaddr = ipaddress .IPv6Address ('::1' )
108
141
cmd = Clusters .CommissionerControl .Commands .CommissionNode (requestId = 1 , responseTimeoutSeconds = 30 , ipAddress = ipaddr .packed , port = self .port )
109
142
try :
@@ -112,8 +145,9 @@ async def test_TC_CCTRL_2_2(self):
112
145
except InteractionModelError as e :
113
146
asserts .assert_equal (e .status , Status .Failure , "Incorrect error returned" )
114
147
115
- # self.step(6)
148
+ self .step (6 )
116
149
params = await self .openCommissioningWindow (dev_ctrl = self .default_controller , node_id = self .dut_node_id )
150
+ self .step (7 )
117
151
pase_nodeid = self .dut_node_id + 1
118
152
await self .default_controller .FindOrEstablishPASESession (setupCode = params .commissioningParameters .setupQRCode , nodeid = pase_nodeid )
119
153
try :
@@ -122,7 +156,7 @@ async def test_TC_CCTRL_2_2(self):
122
156
except InteractionModelError as e :
123
157
asserts .assert_equal (e .status , Status .UnsupportedAccess , "Incorrect error returned" )
124
158
125
- # self.step(7 )
159
+ self .step (8 )
126
160
good_request_id = 0x1234567887654321
127
161
cmd = Clusters .CommissionerControl .Commands .RequestCommissioningApproval (requestId = good_request_id , vendorId = th_server_vid , productId = th_server_pid )
128
162
try :
@@ -131,26 +165,32 @@ async def test_TC_CCTRL_2_2(self):
131
165
except InteractionModelError as e :
132
166
asserts .assert_equal (e .status , Status .UnsupportedAccess , "Incorrect error returned" )
133
167
134
- #self.step(8)
168
+ self .step (9 )
169
+ cmd = Clusters .AdministratorCommissioning .Commands .RevokeCommissioning ()
170
+ # If no exception is raised, this is success
171
+ await self .send_single_cmd (cmd )
172
+
173
+ self .step (10 )
135
174
if not events :
136
175
new_event = await self .default_controller .ReadEvent (nodeid = self .dut_node_id , events = event_path )
137
176
else :
138
177
event_nums = [e .Header .EventNumber for e in events ]
139
178
new_event = await self .default_controller .ReadEvent (nodeid = self .dut_node_id , events = event_path , eventNumberFilter = max (event_nums )+ 1 )
140
179
asserts .assert_equal (new_event , [], "Unexpected event" )
141
180
142
-
143
- #self.step(9)
144
- bad_vid = 0x6006
145
- cmd = Clusters .CommissionerControl .Commands .RequestCommissioningApproval (requestId = good_request_id , vendorId = bad_vid , productId = th_server_pid )
181
+ self .step (11 )
182
+ # There should be nothing on the TH that uses VID 0x6006 as this is a real vendor ID.
183
+ not_th_server_vid = 0x6006
184
+ asserts .assert_not_equal (not_th_server_vid , th_server_vid , "Test implementation assumption incorrect" )
185
+ cmd = Clusters .CommissionerControl .Commands .RequestCommissioningApproval (requestId = good_request_id , vendorId = not_th_server_vid , productId = th_server_pid )
146
186
# If no exception is raised, this is success
147
187
await self .send_single_cmd (cmd )
148
188
149
- # self.step(10 )
189
+ self .step (12 )
150
190
if not self .is_ci :
151
191
self .wait_for_use_input ("Approve Commissioning approval request using manufacturer specified mechanism" )
152
192
153
- # self.step(11 )
193
+ self .step (13 )
154
194
if not events :
155
195
new_event = await self .default_controller .ReadEvent (nodeid = self .dut_node_id , events = event_path )
156
196
else :
@@ -161,7 +201,7 @@ async def test_TC_CCTRL_2_2(self):
161
201
asserts .assert_equal (new_event [0 ].Data .clientNodeId , self .matter_test_config .controller_node_id , "Unexpected client node id" )
162
202
asserts .assert_equal (new_event [0 ].Data .requestId , good_request_id , "Unexpected request ID" )
163
203
164
- # self.step(12 )
204
+ self .step (14 )
165
205
bad_request_id = 0x1234567887654322
166
206
cmd = Clusters .CommissionerControl .Commands .CommissionNode (requestId = bad_request_id , responseTimeoutSeconds = 30 )
167
207
try :
@@ -170,59 +210,59 @@ async def test_TC_CCTRL_2_2(self):
170
210
except InteractionModelError as e :
171
211
asserts .assert_equal (e .status , Status .Failure , "Incorrect error returned" )
172
212
173
- # self.step(13 )
213
+ self .step (15 )
174
214
cmd = Clusters .CommissionerControl .Commands .CommissionNode (requestId = good_request_id , responseTimeoutSeconds = 29 )
175
215
try :
176
216
await self .send_single_cmd (cmd = cmd )
177
217
asserts .fail ("Unexpected success on CommissionNode" )
178
218
except InteractionModelError as e :
179
219
asserts .assert_equal (e .status , Status .Failure , "Incorrect error returned" )
180
220
181
- # self.step(14 )
221
+ self .step (16 )
182
222
cmd = Clusters .CommissionerControl .Commands .CommissionNode (requestId = good_request_id , responseTimeoutSeconds = 121 )
183
223
try :
184
224
await self .send_single_cmd (cmd = cmd )
185
225
asserts .fail ("Unexpected success on CommissionNode" )
186
226
except InteractionModelError as e :
187
227
asserts .assert_equal (e .status , Status .Failure , "Incorrect error returned" )
188
228
189
- # self.step(15 )
229
+ self .step (17 )
190
230
cmd = Clusters .CommissionerControl .Commands .CommissionNode (requestId = good_request_id , responseTimeoutSeconds = 30 )
191
231
resp : Clusters .CommissionerControl .Commands .ReverseOpenCommissioningWindow = await self .send_single_cmd (cmd )
192
232
asserts .assert_equal (type (resp ), Clusters .CommissionerControl .Commands .ReverseOpenCommissioningWindow , "Incorrect response type" )
193
233
194
- # self.step(16 )
234
+ self .step (18 )
195
235
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
196
236
cmd = Clusters .AdministratorCommissioning .Commands .OpenCommissioningWindow (commissioningTimeout = 3 * 60 ,
197
237
PAKEPasscodeVerifier = resp .PAKEPasscodeVerifier ,
198
238
discriminator = resp .discriminator ,
199
239
iterations = resp .iterations , salt = resp .salt )
200
240
await self .send_single_cmd (cmd , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 , timedRequestTimeoutMs = 5000 )
201
241
202
- # self.step(17 )
242
+ self .step (19 )
203
243
logging .info ("Test now waits for 30 seconds" )
204
- #time.sleep(30)
244
+ if not self .is_ci :
245
+ time .sleep (30 )
205
246
206
- # self.step(18 )
247
+ self .step (20 )
207
248
print (f'server node id { self .server_nodeid } ' )
208
249
th_server_fabrics_new = await self .read_single_attribute_check_success (cluster = Clusters .OperationalCredentials , attribute = Clusters .OperationalCredentials .Attributes .Fabrics , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 )
209
250
asserts .assert_equal (len (th_server_fabrics ), len (th_server_fabrics_new ), "Unexpected number of fabrics on TH_SERVER" )
210
251
211
- # self.step(19 )
252
+ self .step (21 )
212
253
cmd = Clusters .AdministratorCommissioning .Commands .RevokeCommissioning ()
213
254
await self .send_single_cmd (cmd , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , timedRequestTimeoutMs = 5000 , endpoint = 0 )
214
255
215
- #self.step(20)
216
- print ('step 20' )
256
+ self .step (22 )
217
257
good_request_id = 0x1234567812345678
218
258
cmd = Clusters .CommissionerControl .Commands .RequestCommissioningApproval (requestId = good_request_id , vendorId = th_server_vid , productId = th_server_pid , label = "Test Ecosystem" )
219
259
await self .send_single_cmd (cmd )
220
260
221
- # self.step(21 )
261
+ self .step (23 )
222
262
if not self .is_ci :
223
263
self .wait_for_use_input ("Approve Commissioning approval request using manufacturer specified mechanism" )
224
264
225
- # self.step(22 )
265
+ self .step (24 )
226
266
events = new_event
227
267
event_nums = [e .Header .EventNumber for e in events ]
228
268
new_event = await self .default_controller .ReadEvent (nodeid = self .dut_node_id , events = event_path , eventNumberFilter = max (event_nums )+ 1 )
@@ -231,23 +271,24 @@ async def test_TC_CCTRL_2_2(self):
231
271
asserts .assert_equal (new_event [0 ].Data .clientNodeId , self .matter_test_config .controller_node_id , "Unexpected client node id" )
232
272
asserts .assert_equal (new_event [0 ].Data .requestId , good_request_id , "Unexpected request ID" )
233
273
234
- # self.step(23 )
274
+ self .step (25 )
235
275
cmd = Clusters .CommissionerControl .Commands .CommissionNode (requestId = good_request_id , responseTimeoutSeconds = 30 )
236
276
resp = await self .send_single_cmd (cmd )
237
277
asserts .assert_equal (type (resp ), Clusters .CommissionerControl .Commands .ReverseOpenCommissioningWindow , "Incorrect response type" )
238
278
239
- # self.step(24 )
279
+ self .step (26 )
240
280
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
241
281
cmd = Clusters .AdministratorCommissioning .Commands .OpenCommissioningWindow (commissioningTimeout = 3 * 60 ,
242
282
PAKEPasscodeVerifier = resp .PAKEPasscodeVerifier ,
243
283
discriminator = resp .discriminator ,
244
284
iterations = resp .iterations , salt = resp .salt )
245
285
await self .send_single_cmd (cmd , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 , timedRequestTimeoutMs = 5000 )
246
286
247
- # self.step(25 )
287
+ self .step (27 )
248
288
if not self .is_ci :
249
289
time .sleep (30 )
250
290
291
+ self .step (28 )
251
292
th_server_fabrics_new = await self .read_single_attribute_check_success (cluster = Clusters .OperationalCredentials , attribute = Clusters .OperationalCredentials .Attributes .Fabrics , dev_ctrl = self .TH_server_controller , node_id = self .server_nodeid , endpoint = 0 )
252
293
#TODO: this should be mocked too.
253
294
if not self .is_ci :
0 commit comments