|
| 1 | +# All rights reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +# |
| 15 | + |
| 16 | +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments |
| 17 | +# for details about the block below. |
| 18 | +# |
| 19 | +# === BEGIN CI TEST ARGUMENTS === |
| 20 | +# test-runner-runs: |
| 21 | +# run1: |
| 22 | +# app: ${ALL_CLUSTERS_APP} |
| 23 | +# factory-reset: true |
| 24 | +# quiet: true |
| 25 | +# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json |
| 26 | +# script-args: > |
| 27 | +# --storage-path admin_storage.json |
| 28 | +# --commissioning-method on-network |
| 29 | +# --discriminator 1234 |
| 30 | +# --passcode 20202021 |
| 31 | +# --PICS src/app/tests/suites/certification/ci-pics-values |
| 32 | +# --app-pid ${CLUSTER_PID} |
| 33 | +# --trace-to json:${TRACE_TEST_JSON}.json |
| 34 | +# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto |
| 35 | +# === END CI TEST ARGUMENTS === |
| 36 | + |
| 37 | +import logging |
| 38 | +from time import sleep |
| 39 | +from typing import Any |
| 40 | +import queue |
| 41 | +import json |
| 42 | +import chip.clusters as Clusters |
| 43 | +import typing |
| 44 | +from chip.clusters.ClusterObjects import ClusterObjectDescriptor, ClusterCommand, ClusterObjectFieldDescriptor |
| 45 | +from chip.interaction_model import InteractionModelError, Status |
| 46 | +from chip.testing.matter_testing import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, SimpleEventCallback,type_matches |
| 47 | +from mobly import asserts |
| 48 | +from dataclasses import dataclass |
| 49 | +from chip import ChipUtility |
| 50 | +from chip.tlv import uint |
| 51 | + |
| 52 | + |
| 53 | +logger = logging.getLogger(__name__) |
| 54 | + |
| 55 | +# TODO(#37217) Refactor using generic method. |
| 56 | +# Implement fake commands for the RefrigeratorAlarm Cluster |
| 57 | +# These comands are Disallowed (X) in the spec. |
| 58 | +# When running those commands must return 0x81 (UNSUPPORTED COMMAND) |
| 59 | +@dataclass |
| 60 | +class FakeReset(ClusterCommand): |
| 61 | + cluster_id: typing.ClassVar[int] = 0x00000057 |
| 62 | + command_id: typing.ClassVar[int] = 0x00000000 |
| 63 | + is_client: typing.ClassVar[bool] = True |
| 64 | + response_type: typing.ClassVar[typing.Optional[str]] = None |
| 65 | + |
| 66 | + @ChipUtility.classproperty |
| 67 | + def descriptor(cls) -> ClusterObjectDescriptor: |
| 68 | + return ClusterObjectDescriptor( |
| 69 | + Fields=[ |
| 70 | + ClusterObjectFieldDescriptor(Label="alarms", Tag=0, Type=uint), |
| 71 | + ]) |
| 72 | + |
| 73 | + alarms: uint = 0 |
| 74 | + |
| 75 | +@dataclass |
| 76 | +class FakeModifyEnabledAlarms(ClusterCommand): |
| 77 | + cluster_id: typing.ClassVar[int] = 0x00000057 |
| 78 | + command_id: typing.ClassVar[int] = 0x00000001 |
| 79 | + is_client: typing.ClassVar[bool] = True |
| 80 | + response_type: typing.ClassVar[typing.Optional[str]] = None |
| 81 | + |
| 82 | + @ChipUtility.classproperty |
| 83 | + def descriptor(cls) -> ClusterObjectDescriptor: |
| 84 | + return ClusterObjectDescriptor( |
| 85 | + Fields=[ |
| 86 | + ClusterObjectFieldDescriptor(Label="mask", Tag=0, Type=uint), |
| 87 | + ]) |
| 88 | + |
| 89 | + mask: uint = 0 |
| 90 | + |
| 91 | +class TC_REFALM_2_2(MatterBaseTest): |
| 92 | + """Implementation of test case TC_REFALM_2_2.""" |
| 93 | + |
| 94 | + def TC_REFALM_2_2(self) -> str : |
| 95 | + return "223.2.2. [TC-REFALM-2.2] Primary functionality with DUT as Server" |
| 96 | + |
| 97 | + def pics_TC_REFALM_2_2(self): |
| 98 | + """Return PICS definitions asscociated with this test.""" |
| 99 | + pics = [ |
| 100 | + "REFALM.S" |
| 101 | + ] |
| 102 | + return pics |
| 103 | + |
| 104 | + def steps_TC_REFALM_2_2(self) -> list[TestStep]: |
| 105 | + """Execute the test steps.""" |
| 106 | + steps = [ |
| 107 | + TestStep(1, "Commission DUT to TH (can be skipped if done in a preceding test)", is_commissioning=True), |
| 108 | + TestStep(2, "Ensure that the door on the DUT is closed"), |
| 109 | + TestStep(3, "TH reads from the DUT the State attribute"), |
| 110 | + TestStep(4, "Manually open the door on the DUT"), |
| 111 | + TestStep(5, "Wait for the time defined in PIXIT.REFALM.AlarmThreshold"), |
| 112 | + TestStep(6, "TH reads from the DUT the State attribute"), |
| 113 | + TestStep(7, "Ensure that the door on the DUT is closed"), |
| 114 | + TestStep(8, "TH reads from the DUT the State attribute"), |
| 115 | + TestStep(9, "TH sends Reset command to the DUT"), |
| 116 | + TestStep(10, "TH sends ModifyEnabledAlarms command to the DUT"), |
| 117 | + TestStep(11, "Set up subscription to the Notify event"), |
| 118 | + TestStep("12.a", "Repeating step 4 Manually open the door on the DUT"), |
| 119 | + TestStep("12.b", "Step 12b: Repeat step 5 Wait for the time defined in PIXIT.REFALM.AlarmThreshold"), |
| 120 | + TestStep("12.c", "After step 5 (repeated), receive a Notify event with the State attribute bit 0 set to 1."), |
| 121 | + TestStep("13.a", "Repeat step 7 Ensure that the door on the DUT is closed"), |
| 122 | + TestStep("13.b", "Receive a Notify event with the State attribute bit 0 set to 0"), |
| 123 | + ] |
| 124 | + |
| 125 | + return steps |
| 126 | + |
| 127 | + async def _get_command_status(self,cmd: ClusterCommand,cmd_str:str=""): |
| 128 | + """Return the status of the executed command. By default the status is 0x0 unless a different |
| 129 | + status on InteractionModel is returned. For this test we consider the status 0x0 as not succesfull.""" |
| 130 | + cmd_status = Status.Success |
| 131 | + if self.is_ci: |
| 132 | + try: |
| 133 | + await self.default_controller.SendCommand(nodeid=self.dut_node_id,endpoint=self.endpoint,payload=cmd) |
| 134 | + except InteractionModelError as uc: |
| 135 | + cmd_status = uc.status |
| 136 | + else: |
| 137 | + user_response = self.wait_for_user_input(prompt_msg=f"{cmd_str} command is implemented in the DUT?. Enter 'y' or 'n' to confirm.", |
| 138 | + default_value="n") |
| 139 | + asserts.assert_equal(user_response.lower(), "n") |
| 140 | + if user_response.lower() == "n": |
| 141 | + cmd_status = Status.UnsupportedCommand |
| 142 | + |
| 143 | + return cmd_status |
| 144 | + |
| 145 | + |
| 146 | + def _ask_for_closed_door(self): |
| 147 | + if self.is_ci: |
| 148 | + self._send_close_door_commnad() |
| 149 | + sleep(1) |
| 150 | + else: |
| 151 | + user_response = self.wait_for_user_input(prompt_msg=f"Ensure that the door on the DUT is closed.", |
| 152 | + default_value="y") |
| 153 | + asserts.assert_equal(user_response.lower(), "y") |
| 154 | + |
| 155 | + def _ask_for_open_door(self): |
| 156 | + if self.is_ci: |
| 157 | + self._send_open_door_command() |
| 158 | + else: |
| 159 | + user_response = self.wait_for_user_input(prompt_msg=f"Manually open the door on the DUT. Enter 'y' or 'n' after completition", |
| 160 | + default_value="y") |
| 161 | + asserts.assert_equal(user_response.lower(), "y") |
| 162 | + |
| 163 | + async def _read_refalm_state_attribute(self): |
| 164 | + cluster = Clusters.Objects.RefrigeratorAlarm |
| 165 | + return await self.read_single_attribute_check_success( |
| 166 | + endpoint=self.endpoint, |
| 167 | + cluster=cluster, |
| 168 | + attribute=Clusters.RefrigeratorAlarm.Attributes.State |
| 169 | + ) |
| 170 | + |
| 171 | + def _wait_thresshold(self): |
| 172 | + sleep(self.wait_thresshold_v/1000) |
| 173 | + |
| 174 | + def _send_named_pipe_command(self, command_dict: dict[str, Any]): |
| 175 | + app_pid = self.matter_test_config.app_pid |
| 176 | + if app_pid == 0: |
| 177 | + asserts.fail("The --app-pid flag must be set when usage of door state simulation named pipe is required (e.g. CI)") |
| 178 | + |
| 179 | + app_pipe = f"/tmp/chip_all_clusters_fifo_{app_pid}" |
| 180 | + command = json.dumps(command_dict) |
| 181 | + |
| 182 | + # Sends an out-of-band command to the sample app |
| 183 | + with open(app_pipe, "w") as outfile: |
| 184 | + logging.info(f"Sending named pipe command to {app_pipe}: '{command}'") |
| 185 | + outfile.write(command + "\n") |
| 186 | + # Delay for pipe command to be processed (otherwise tests may be flaky). |
| 187 | + sleep(0.1) |
| 188 | + |
| 189 | + def _send_open_door_command(self): |
| 190 | + command_dict = {"Name":"SetRefrigeratorDoorStatus", "EndpointId": self.endpoint, "DoorOpen": Clusters.RefrigeratorAlarm.Bitmaps.AlarmBitmap.kDoorOpen} |
| 191 | + self._send_named_pipe_command(command_dict) |
| 192 | + |
| 193 | + def _send_close_door_commnad(self): |
| 194 | + command_dict = {"Name":"SetRefrigeratorDoorStatus", "EndpointId": self.endpoint, "DoorOpen": 0} |
| 195 | + self._send_named_pipe_command(command_dict) |
| 196 | + |
| 197 | + @async_test_body |
| 198 | + async def test_TC_REFALM_2_2(self): |
| 199 | + """Run the test steps.""" |
| 200 | + self.wait_thresshold_v = 5000 |
| 201 | + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") |
| 202 | + self.endpoint = self.get_endpoint(default=1) |
| 203 | + cluster = Clusters.RefrigeratorAlarm |
| 204 | + |
| 205 | + logger.info(f"Default endpoint {self.endpoint}") |
| 206 | + self.step(1) |
| 207 | + |
| 208 | + # check is closed |
| 209 | + self.step(2) |
| 210 | + self._ask_for_closed_door() |
| 211 | + |
| 212 | + self.step(3) |
| 213 | + # reads the state attribute , must be a bitMap32 ( list wich values are 32 bits) |
| 214 | + device_state = await self._read_refalm_state_attribute() |
| 215 | + logger.info(f"The device state is {device_state}") |
| 216 | + asserts.assert_equal(device_state,0) |
| 217 | + |
| 218 | + |
| 219 | + # # open the door manually |
| 220 | + self.step(4) |
| 221 | + self._ask_for_open_door() |
| 222 | + |
| 223 | + # wait PIXIT.REFALM.AlarmThreshold (5s) |
| 224 | + self.step(5) |
| 225 | + self._wait_thresshold() |
| 226 | + |
| 227 | + # # read the status |
| 228 | + self.step(6) |
| 229 | + device_state = await self._read_refalm_state_attribute() |
| 230 | + logger.info(f"The device state is {device_state}") |
| 231 | + asserts.assert_equal(device_state,1) |
| 232 | + |
| 233 | + # # # ensure the door is closed |
| 234 | + self.step(7) |
| 235 | + self._ask_for_closed_door() |
| 236 | + |
| 237 | + # # read from the state attr |
| 238 | + self.step(8) |
| 239 | + device_status = await self._read_refalm_state_attribute() |
| 240 | + logger.info(f"The device state is {device_state}") |
| 241 | + asserts.assert_equal(device_status,0) |
| 242 | + |
| 243 | + self.step(9) |
| 244 | + cmd_status = await self._get_command_status(cmd=FakeReset(),cmd_str="Reset") |
| 245 | + asserts.assert_equal(Status.UnsupportedCommand,cmd_status, msg=f"Command status is not {Status.UnsupportedCommand}, is {cmd_status}") |
| 246 | + |
| 247 | + self.step(10) |
| 248 | + cmd_status = await self._get_command_status(cmd=FakeModifyEnabledAlarms(),cmd_str="ModifyEnabledAlarms") |
| 249 | + asserts.assert_equal(Status.UnsupportedCommand,cmd_status,msg=f"Command status is not {Status.UnsupportedCommand}, is {cmd_status}") |
| 250 | + |
| 251 | + |
| 252 | + # Subscribe to Notify Event |
| 253 | + self.step(11) |
| 254 | + self.q = queue.Queue() |
| 255 | + notify_event = Clusters.RefrigeratorAlarm.Events.Notify |
| 256 | + cb = SimpleEventCallback("Notify", notify_event.cluster_id, notify_event.event_id, self.q) |
| 257 | + urgent = 1 |
| 258 | + subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(self.endpoint, notify_event, urgent)], reportInterval=[2, 10]) |
| 259 | + subscription.SetEventUpdateCallback(callback=cb) |
| 260 | + |
| 261 | + self.step("12.a") |
| 262 | + self._ask_for_open_door() |
| 263 | + |
| 264 | + self.step("12.b") |
| 265 | + self._wait_thresshold() |
| 266 | + |
| 267 | + self.step("12.c") |
| 268 | + # Wait for the Notify event with the State value. |
| 269 | + try: |
| 270 | + ret = self.q.get(block=True, timeout=5) |
| 271 | + logger.info(f"Event data {ret}") |
| 272 | + asserts.assert_true(type_matches(ret.Data, cluster.Events.Notify ),"Unexpected event type returned") |
| 273 | + asserts.assert_equal(ret.Data.state, 1,"Unexpected value for State returned") |
| 274 | + except queue.Empty: |
| 275 | + asserts.fail("Did not receive Notify event") |
| 276 | + |
| 277 | + self.step("13.a") |
| 278 | + self._ask_for_closed_door() |
| 279 | + |
| 280 | + self.step("13.b") |
| 281 | + # Wait for the Notify event with the State value. |
| 282 | + try: |
| 283 | + ret = self.q.get(block=True, timeout=5) |
| 284 | + logger.info(f"Event data {ret}") |
| 285 | + asserts.assert_true(type_matches(ret.Data, cluster.Events.Notify ),"Unexpected event type returned") |
| 286 | + asserts.assert_equal(ret.Data.state, 0,"Unexpected value for State returned") |
| 287 | + except queue.Empty: |
| 288 | + asserts.fail("Did not receive Notify event") |
| 289 | + |
| 290 | +if __name__ == "__main__": |
| 291 | + default_matter_test_main() |
0 commit comments