1
1
#
2
- # Copyright (c) 2024 Project CHIP Authors
2
+ # Copyright (c) 2025 Project CHIP Authors
3
3
# All rights reserved.
4
4
#
5
5
# Licensed under the Apache License, Version 2.0 (the "License");
38
38
39
39
import json
40
40
import logging
41
- import queue
42
41
import typing
43
42
from dataclasses import dataclass
44
43
from time import sleep
48
47
from chip import ChipUtility
49
48
from chip .clusters .ClusterObjects import ClusterCommand , ClusterObjectDescriptor , ClusterObjectFieldDescriptor
50
49
from chip .interaction_model import InteractionModelError , Status
51
- from chip .testing . matter_testing import ( MatterBaseTest , SimpleEventCallback , TestStep , async_test_body , default_matter_test_main ,
52
- type_matches )
50
+ from chip .testing import matter_asserts
51
+ from chip . testing . matter_testing import EventChangeCallback , MatterBaseTest , TestStep , async_test_body , default_matter_test_main
53
52
from chip .tlv import uint
54
53
from mobly import asserts
55
54
@@ -98,7 +97,7 @@ def descriptor(cls) -> ClusterObjectDescriptor:
98
97
class TC_REFALM_2_2 (MatterBaseTest ):
99
98
"""Implementation of test case TC_REFALM_2_2."""
100
99
101
- def TC_REFALM_2_2 (self ) -> str :
100
+ def desc_TC_REFALM_2_2 (self ) -> str :
102
101
return "223.2.2. [TC-REFALM-2.2] Primary functionality with DUT as Server"
103
102
104
103
def pics_TC_REFALM_2_2 (self ):
@@ -113,40 +112,35 @@ def steps_TC_REFALM_2_2(self) -> list[TestStep]:
113
112
steps = [
114
113
TestStep (1 , "Commission DUT to TH (can be skipped if done in a preceding test)" , is_commissioning = True ),
115
114
TestStep (2 , "Ensure that the door on the DUT is closed" ),
116
- TestStep (3 , "TH reads from the DUT the State attribute" ),
115
+ TestStep (3 , "TH reads from the DUT the State attribute" ,
116
+ "Verify that the DUT response contains a 32-bit value with bit 0 set to 0" ),
117
117
TestStep (4 , "Manually open the door on the DUT" ),
118
118
TestStep (5 , "Wait for the time defined in PIXIT.REFALM.AlarmThreshold" ),
119
- TestStep (6 , "TH reads from the DUT the State attribute" ),
119
+ TestStep (6 , "TH reads from the DUT the State attribute" ,
120
+ "Verify that the DUT response contains a 32-bit value with bit 0 set to 1" ),
120
121
TestStep (7 , "Ensure that the door on the DUT is closed" ),
121
- TestStep (8 , "TH reads from the DUT the State attribute" ),
122
- TestStep (9 , "TH sends Reset command to the DUT" ),
123
- TestStep (10 , "TH sends ModifyEnabledAlarms command to the DUT" ),
122
+ TestStep (8 , "TH reads from the DUT the State attribute" ,
123
+ "Verify that the DUT response contains a 32-bit value with bit 0 set to 0" ),
124
+ TestStep (9 , "TH sends Reset command to the DUT" , "Verify DUT responds w/ status UNSUPPORTED_COMMAND(0x81)" ),
125
+ TestStep (10 , "TH sends ModifyEnabledAlarms command to the DUT" ,
126
+ "Verify DUT responds w/ status UNSUPPORTED_COMMAND(0x81)" ),
124
127
TestStep (11 , "Set up subscription to the Notify event" ),
125
- TestStep ("12.a" , "Repeating step 4 Manually open the door on the DUT" ),
126
- TestStep ("12.b" , "Step 12b: Repeat step 5 Wait for the time defined in PIXIT.REFALM.AlarmThreshold" ),
127
- TestStep ("12.c" , "After step 5 (repeated), receive a Notify event with the State attribute bit 0 set to 1." ),
128
- TestStep ("13.a" , "Repeat step 7 Ensure that the door on the DUT is closed" ),
129
- TestStep ("13.b" , "Receive a Notify event with the State attribute bit 0 set to 0" ),
128
+ TestStep (12 , "Repeat steps 4 and then 5" ,
129
+ "After step 5 (repeated), receive a Notify event with the State attribute bit 0 set to 1." ),
130
+ TestStep (13 , "Repeat step 7" ,
131
+ "Receive a Notify event with the State attribute bit 0 set to 0." ),
130
132
]
131
133
132
134
return steps
133
135
134
- async def _get_command_status (self , cmd : ClusterCommand , cmd_str : str = "" ):
136
+ async def _get_command_status (self , cmd : ClusterCommand ):
135
137
"""Return the status of the executed command. By default the status is 0x0 unless a different
136
138
status on InteractionModel is returned. For this test we consider the status 0x0 as not succesfull."""
137
139
cmd_status = Status .Success
138
- if self .is_pics_sdk_ci_only :
139
- try :
140
- await self .default_controller .SendCommand (nodeid = self .dut_node_id , endpoint = self .endpoint , payload = cmd )
141
- except InteractionModelError as uc :
142
- cmd_status = uc .status
143
- else :
144
- user_response = self .wait_for_user_input (prompt_msg = f"{ cmd_str } command is implemented in the DUT?. Enter 'y' or 'n' to confirm." ,
145
- default_value = "n" )
146
- asserts .assert_equal (user_response .lower (), "n" )
147
- if user_response .lower () == "n" :
148
- cmd_status = Status .UnsupportedCommand
149
-
140
+ try :
141
+ await self .default_controller .SendCommand (nodeid = self .dut_node_id , endpoint = self .endpoint , payload = cmd )
142
+ except InteractionModelError as uc :
143
+ cmd_status = uc .status
150
144
return cmd_status
151
145
152
146
def _ask_for_closed_door (self ):
@@ -224,6 +218,7 @@ async def test_TC_REFALM_2_2(self):
224
218
# reads the state attribute , must be a bitMap32 ( list wich values are 32 bits)
225
219
device_state = await self ._read_refalm_state_attribute ()
226
220
logger .info (f"The device state is { device_state } " )
221
+ matter_asserts .assert_valid_uint32 (device_state , "State" )
227
222
asserts .assert_equal (device_state , 0 )
228
223
229
224
# # open the door manually
@@ -238,6 +233,7 @@ async def test_TC_REFALM_2_2(self):
238
233
self .step (6 )
239
234
device_state = await self ._read_refalm_state_attribute ()
240
235
logger .info (f"The device state is { device_state } " )
236
+ matter_asserts .assert_valid_uint32 (device_state , "State" )
241
237
asserts .assert_equal (device_state , 1 )
242
238
243
239
# # # ensure the door is closed
@@ -249,54 +245,43 @@ async def test_TC_REFALM_2_2(self):
249
245
device_status = await self ._read_refalm_state_attribute ()
250
246
logger .info (f"The device state is { device_state } " )
251
247
asserts .assert_equal (device_status , 0 )
248
+ matter_asserts .assert_valid_uint32 (device_state , "State" )
252
249
253
250
self .step (9 )
254
- cmd_status = await self ._get_command_status (cmd = FakeReset (), cmd_str = "Reset" )
251
+ cmd_status = await self ._get_command_status (cmd = FakeReset ())
255
252
asserts .assert_equal (Status .UnsupportedCommand , cmd_status ,
256
253
msg = f"Command status is not { Status .UnsupportedCommand } , is { cmd_status } " )
257
254
258
255
self .step (10 )
259
- cmd_status = await self ._get_command_status (cmd = FakeModifyEnabledAlarms (), cmd_str = "ModifyEnabledAlarms" )
256
+ cmd_status = await self ._get_command_status (cmd = FakeModifyEnabledAlarms ())
260
257
asserts .assert_equal (Status .UnsupportedCommand , cmd_status ,
261
258
msg = f"Command status is not { Status .UnsupportedCommand } , is { cmd_status } " )
262
259
263
260
# Subscribe to Notify Event
264
261
self .step (11 )
265
- self . q = queue . Queue ( )
266
- notify_event = Clusters . RefrigeratorAlarm . Events . Notify
267
- cb = SimpleEventCallback ( "Notify" , notify_event . cluster_id , notify_event . event_id , self .q )
268
- urgent = 1
269
- subscription = await self . default_controller . ReadEvent ( nodeid = self . dut_node_id , events = [( self . endpoint , notify_event , urgent )], reportInterval = [ 2 , 10 ])
270
- subscription . SetEventUpdateCallback ( callback = cb )
271
-
272
- self . step ( "12.a " )
262
+ event_callback = EventChangeCallback ( Clusters . RefrigeratorAlarm )
263
+ await event_callback . start ( self . default_controller ,
264
+ self .dut_node_id ,
265
+ self . get_endpoint ( 1 ))
266
+
267
+ self . step ( 12 )
268
+ # repeat step 4 and 5
269
+ logger . info ( "Manually open the door on the DUT " )
273
270
self ._ask_for_open_door ()
274
-
275
- self .step ("12.b" )
271
+ logger .info ("Wait for the time defined in PIXIT.REFALM.AlarmThreshold" )
276
272
self ._wait_thresshold ()
277
-
278
- self .step ("12.c" )
279
273
# Wait for the Notify event with the State value.
280
- try :
281
- ret = self .q .get (block = True , timeout = 5 )
282
- logger .info (f"Event data { ret } " )
283
- asserts .assert_true (type_matches (ret .Data , cluster .Events .Notify ), "Unexpected event type returned" )
284
- asserts .assert_equal (ret .Data .state , 1 , "Unexpected value for State returned" )
285
- except queue .Empty :
286
- asserts .fail ("Did not receive Notify event" )
287
-
288
- self .step ("13.a" )
289
- self ._ask_for_closed_door ()
274
+ event_data = event_callback .wait_for_event_report (cluster .Events .Notify , timeout_sec = 5 )
275
+ logger .info (f"Event data { event_data } " )
276
+ asserts .assert_equal (event_data .state , 1 , "Unexpected value for State returned" )
290
277
291
- self .step ("13.b" )
278
+ self .step (13 )
279
+ logger .info ("Ensure that the door on the DUT is closed" )
280
+ self ._ask_for_closed_door ()
292
281
# Wait for the Notify event with the State value.
293
- try :
294
- ret = self .q .get (block = True , timeout = 5 )
295
- logger .info (f"Event data { ret } " )
296
- asserts .assert_true (type_matches (ret .Data , cluster .Events .Notify ), "Unexpected event type returned" )
297
- asserts .assert_equal (ret .Data .state , 0 , "Unexpected value for State returned" )
298
- except queue .Empty :
299
- asserts .fail ("Did not receive Notify event" )
282
+ event_data = event_callback .wait_for_event_report (cluster .Events .Notify , timeout_sec = 5 )
283
+ logger .info (f"Event data { event_data } " )
284
+ asserts .assert_equal (event_data .state , 0 , "Unexpected value for State returned" )
300
285
301
286
302
287
if __name__ == "__main__" :
0 commit comments