|
| 1 | +# |
| 2 | +# Copyright (c) 2023 Project CHIP Authors |
| 3 | +# All rights reserved. |
| 4 | +# |
| 5 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | +# you may not use this file except in compliance with the License. |
| 7 | +# You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, software |
| 12 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | +# See the License for the specific language governing permissions and |
| 15 | +# limitations under the License. |
| 16 | +# |
| 17 | + |
| 18 | +from dataclasses import dataclass, field |
| 19 | + |
| 20 | +from chip.tlv import uint |
| 21 | +from conformance_support import ConformanceDecision |
| 22 | +from global_attribute_ids import GlobalAttributeIds |
| 23 | +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main |
| 24 | +from TC_DeviceConformance import DeviceConformanceTests |
| 25 | + |
| 26 | + |
| 27 | +@dataclass |
| 28 | +class ClusterMinimalElements: |
| 29 | + feature_masks: list[uint] = field(default_factory=list) |
| 30 | + attribute_ids: list[uint] = field(default_factory=list) |
| 31 | + # Only received commands are necessary - generated events are ALWAYS determined from accepted |
| 32 | + command_ids: list[uint] = field(default_factory=list) |
| 33 | + # TODO: need event support |
| 34 | + |
| 35 | + |
| 36 | +class MinimalRepresentationChecker(DeviceConformanceTests): |
| 37 | + def GenerateMinimals(self, ignore_in_progress: bool, is_ci: bool) -> dict[uint, dict[uint, ClusterMinimalElements]]: |
| 38 | + if not self.xml_clusters: |
| 39 | + self.setup_class_helper() |
| 40 | + |
| 41 | + success, _ = self.check_conformance(ignore_in_progress, is_ci) |
| 42 | + if not success: |
| 43 | + self.fail_current_test("Problems with conformance") |
| 44 | + |
| 45 | + # Now what we know the conformance is OK, we want to expose all the data model elements on the device |
| 46 | + # that are OPTIONAL given the other elements that are present. We can do this by assessing the conformance |
| 47 | + # again only on the elements we have. Because we've already run the full conformance checkers, we can rely |
| 48 | + # on the optional response really meaning optional. |
| 49 | + # TODO: do we also want to record the optional stuff that's NOT implemented? |
| 50 | + # endpoint -> list of clusters by id |
| 51 | + representation: dict[uint, dict[uint, ClusterMinimalElements]] = {} |
| 52 | + for endpoint_id, endpoint in self.endpoints_tlv.items(): |
| 53 | + representation[endpoint_id] = {} |
| 54 | + for cluster_id, cluster in endpoint.items(): |
| 55 | + minimal = ClusterMinimalElements() |
| 56 | + if cluster_id not in self.xml_clusters.keys(): |
| 57 | + continue |
| 58 | + |
| 59 | + feature_map = cluster[GlobalAttributeIds.FEATURE_MAP_ID] |
| 60 | + attribute_list = cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID] |
| 61 | + all_command_list = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] + \ |
| 62 | + cluster[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID] |
| 63 | + accepted_command_list = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] |
| 64 | + |
| 65 | + # All optional features |
| 66 | + feature_masks = [1 << i for i in range(32) if feature_map & (1 << i)] |
| 67 | + for f in feature_masks: |
| 68 | + xml_feature = self.xml_clusters[cluster_id].features[f] |
| 69 | + conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list) |
| 70 | + if conformance_decision == ConformanceDecision.OPTIONAL: |
| 71 | + minimal.feature_masks.append(f) |
| 72 | + |
| 73 | + # All optional attributes |
| 74 | + for attribute_id, attribute in cluster.items(): |
| 75 | + if attribute_id not in self.xml_clusters[cluster_id].attributes.keys(): |
| 76 | + if attribute_id > 0xFFFF: |
| 77 | + # MEI |
| 78 | + minimal.attribute_ids.append(attribute_id) |
| 79 | + continue |
| 80 | + xml_attribute = self.xml_clusters[cluster_id].attributes[attribute_id] |
| 81 | + conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list) |
| 82 | + if conformance_decision == ConformanceDecision.OPTIONAL: |
| 83 | + minimal.attribute_ids.append(attribute_id) |
| 84 | + |
| 85 | + # All optional commands |
| 86 | + for command_id in accepted_command_list: |
| 87 | + if command_id not in self.xml_clusters[cluster_id].accepted_commands: |
| 88 | + if command_id > 0xFFFF: |
| 89 | + # MEI |
| 90 | + minimal.attribute_ids.append(command_id) |
| 91 | + continue |
| 92 | + xml_command = self.xml_clusters[cluster_id].accepted_commands[command_id] |
| 93 | + conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list) |
| 94 | + if conformance_decision == ConformanceDecision.OPTIONAL: |
| 95 | + minimal.command_ids.append(command_id) |
| 96 | + |
| 97 | + representation[endpoint_id][cluster_id] = minimal |
| 98 | + |
| 99 | + return representation |
| 100 | + |
| 101 | + def PrettyPrintRepresentation(self, representation: dict[uint, dict[uint, ClusterMinimalElements]]) -> None: |
| 102 | + for endpoint_id, cluster_list in representation.items(): |
| 103 | + print(f'Endpoint: {endpoint_id}') |
| 104 | + for cluster_id, minimals in cluster_list.items(): |
| 105 | + name = self.xml_clusters[cluster_id].name |
| 106 | + print(f' Cluster {cluster_id:04x} - {name}') |
| 107 | + print(' Features:') |
| 108 | + for feature in minimals.feature_masks: |
| 109 | + code = self.xml_clusters[cluster_id].features[feature].code |
| 110 | + print(f' {feature:02x}: {code}') |
| 111 | + print(' Attributes:') |
| 112 | + for attribute in minimals.attribute_ids: |
| 113 | + name = self.xml_clusters[cluster_id].attributes[attribute].name |
| 114 | + print(f' {attribute:02x}: {name}') |
| 115 | + print(' Commands:') |
| 116 | + for command in minimals.command_ids: |
| 117 | + name = self.xml_clusters[cluster_id].accepted_commands[command].name |
| 118 | + print(f' {command:02x}: {name}') |
| 119 | + |
| 120 | + |
| 121 | +# Helper for running this against a test device through the python test framework |
| 122 | +class MinimalRunner(MatterBaseTest, MinimalRepresentationChecker): |
| 123 | + @async_test_body |
| 124 | + async def setup_class(self): |
| 125 | + super().setup_class() |
| 126 | + await self.setup_class_helper() |
| 127 | + |
| 128 | + def test_MinimalRepresentation(self): |
| 129 | + # Before we can generate a minimal representation, we need to make sure that the device is conformant. |
| 130 | + # Otherwise, the values we extract aren't fully informative. |
| 131 | + ignore_in_progress = self.user_params.get("ignore_in_progress", False) |
| 132 | + is_ci = self.check_pics('PICS_SDK_CI_ONLY') |
| 133 | + representation = self.GenerateMinimals(ignore_in_progress, is_ci) |
| 134 | + print(type(representation[0])) |
| 135 | + self.PrettyPrintRepresentation(representation) |
| 136 | + |
| 137 | + |
| 138 | +if __name__ == "__main__": |
| 139 | + default_matter_test_main() |
0 commit comments