|
23 | 23 | from typing import Optional
|
24 | 24 |
|
25 | 25 | import chip.clusters as Clusters
|
26 |
| -from chip.interaction_model import Status |
| 26 | +from chip.interaction_model import Status, InteractionModelError |
27 | 27 | from chip.testing.basic_composition import BasicCompositionTests
|
28 | 28 | from chip.testing.global_attribute_ids import GlobalAttributeIds
|
29 |
| -from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, MatterBaseTest, TestStep, async_test_body, |
| 29 | +from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, TestStep, async_test_body, |
30 | 30 | default_matter_test_main)
|
31 | 31 | from chip.testing.spec_parsing import XmlCluster, build_xml_clusters
|
32 | 32 | from chip.tlv import uint
|
|
35 | 35 | class AccessTestType(Enum):
|
36 | 36 | READ = auto()
|
37 | 37 | WRITE = auto()
|
| 38 | + INVOKE = auto() |
38 | 39 |
|
39 | 40 |
|
40 | 41 | def step_number_with_privilege(step: int, substep: str, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum) -> str:
|
@@ -65,6 +66,15 @@ def known_cluster_attribute(attribute_id) -> bool:
|
65 | 66 | return [x for x in all_attrs if known_cluster_attribute(x)]
|
66 | 67 |
|
67 | 68 |
|
| 69 | +def checkable_commands(cluster_id, cluster, xml_cluster) -> list[uint]: |
| 70 | + all_cmds = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST] |
| 71 | + |
| 72 | + def known_cluster_cmds(command_id) -> bool: |
| 73 | + ''' Returns true if this is a non-manufacturer specific command that has information in the XML and has python codegen data''' |
| 74 | + return command_id <= 0xFFFF and command_id in xml_cluster.accepted_commands and command_id in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id] |
| 75 | + return [x for x in all_cmds if known_cluster_cmds(x)] |
| 76 | + |
| 77 | + |
68 | 78 | class AccessChecker(MatterBaseTest, BasicCompositionTests):
|
69 | 79 | @async_test_body
|
70 | 80 | async def setup_class(self):
|
@@ -148,6 +158,33 @@ def _record_errors(self):
|
148 | 158 | self.success = False
|
149 | 159 | continue
|
150 | 160 |
|
| 161 | + async def _maybe_run_command_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum): |
| 162 | + """ Runs a command only if the required cluster privilege is HIGHER than the specified privilege. In this way, |
| 163 | + no commands are actually run on the device, which means there are no side effects. However, we can differentiate |
| 164 | + ACL rejections from commands being unsupported. |
| 165 | + """ |
| 166 | + for command_id in checkable_commands(cluster_id, device_cluster_data, xml_cluster): |
| 167 | + spec_requires = xml_cluster.accepted_commands[command_id].privilege |
| 168 | + command = Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id][command_id] |
| 169 | + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id) |
| 170 | + name = f"Command test - privilege {privilege}" |
| 171 | + if operation_allowed(spec_requires, privilege): |
| 172 | + # In this test, we're only checking that the disallowed commands are rejected so that there are |
| 173 | + # no side effects. Commands are checked with admin privilege in their cluster tests. The error that |
| 174 | + # may be let through here is if the spec requires operate and the implementation requires admin. |
| 175 | + continue |
| 176 | + try: |
| 177 | + await self.send_single_cmd(cmd=command(), dev_ctrl=self.TH2, endpoint=endpoint_id) |
| 178 | + # If this was successful, that's an error |
| 179 | + self.record_error(test_name=name, location=location, |
| 180 | + problem=f"Unexpected success sending command {command} with privilege {privilege}") |
| 181 | + self.success = False |
| 182 | + except InteractionModelError as e: |
| 183 | + if e.status != Status.UnsupportedAccess: |
| 184 | + self.record_error(test_name=name, location=location, |
| 185 | + problem=f'Unexpected error sending command {command} with privilege {privilege} - expected UNSUPPORTED_ACCESS, got {e.status}') |
| 186 | + self.success = False |
| 187 | + |
151 | 188 | async def _run_read_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum):
|
152 | 189 | # TODO: This assumes all attributes are readable. Which they are currently. But we don't have a general way to mark otherwise.
|
153 | 190 | for attribute_id in checkable_attributes(cluster_id, device_cluster_data, xml_cluster):
|
@@ -248,6 +285,8 @@ async def run_access_test(self, test_type: AccessTestType):
|
248 | 285 | await self._run_read_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege)
|
249 | 286 | elif test_type == AccessTestType.WRITE:
|
250 | 287 | await self._run_write_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege, wildcard_read)
|
| 288 | + elif test_type == AccessTestType.INVOKE: |
| 289 | + await self._maybe_run_command_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege) |
251 | 290 | else:
|
252 | 291 | self.fail_current_test("Unsupported test type")
|
253 | 292 | if not self.success:
|
|
0 commit comments