forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTC_ACE_1_5.py
161 lines (136 loc) · 8.16 KB
/
TC_ACE_1_5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --PICS src/app/tests/suites/certification/ci-pics-values
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# === END CI TEST ARGUMENTS ===
import logging
import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.interaction_model import Status
from chip.testing.matter_testing import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
class TC_ACE_1_5(MatterBaseTest):
async def read_currentfabricindex(self, th: ChipDeviceCtrl) -> int:
cluster = Clusters.Objects.OperationalCredentials
attribute = Clusters.OperationalCredentials.Attributes.CurrentFabricIndex
current_fabric_index = await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute)
return current_fabric_index
async def write_acl(self, acl: Clusters.AccessControl, th: ChipDeviceCtrl):
result = await th.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
@async_test_body
async def test_TC_ACE_1_5(self):
self.print_step(1, "Comissioning, already done")
self.th1 = self.default_controller
# TODO: move into base class and adjust tests (#31521)
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.matter_test_config.fabric_id + 1)
TH1_nodeid = self.matter_test_config.controller_node_id
TH2_nodeid = self.matter_test_config.controller_node_id + 2
self.th2 = new_fabric_admin.NewController(nodeId=TH2_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path))
params = await self.openCommissioningWindow(self.th1, self.dut_node_id)
self.print_step(2, "TH1 opens the commissioning window on the DUT")
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.commissioningParameters.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=params.randomDiscriminator)
logging.info('Commissioning complete done. Successful.')
self.print_step(3, "TH2 commissions DUT using admin node ID N2")
self.print_step(4, "TH2 reads its fabric index from the Operational Credentials cluster CurrentFabricIndex attribute")
th2FabricIndex = await self.read_currentfabricindex(self.th2)
self.print_step(
5, "TH1 writes DUT Endpoint 0 ACL cluster ACL attribute, value is list of ACLEntryStruct containing 2 elements")
admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.AccessControl.id)])
descriptor_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.Descriptor.id)])
acl = [admin_acl, descriptor_view]
await self.write_acl(acl, self.th1)
self.print_step(
6, "TH2 writes DUT Endpoint 0 ACL cluster ACL attribute, value is list of ACLEntryStruct containing 2 elements")
admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
fabricIndex=th2FabricIndex,
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.AccessControl.id)])
descriptor_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(
fabricIndex=th2FabricIndex,
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.BasicInformation.id)])
acl = [admin_acl, descriptor_view]
await self.write_acl(acl, self.th2)
self.print_step(7, "TH1 reads DUT Endpoint 0 Descriptor cluster DeviceTypeList attribute")
await self.read_single_attribute_check_success(
dev_ctrl=self.th1, endpoint=0,
cluster=Clusters.Objects.Descriptor,
attribute=Clusters.Descriptor.Attributes.DeviceTypeList)
self.print_step(8, "TH1 reads DUT Endpoint 0 Basic Information cluster VendorID attribute")
await self.read_single_attribute_expect_error(
dev_ctrl=self.th1, endpoint=0,
cluster=Clusters.Objects.BasicInformation,
attribute=Clusters.BasicInformation.Attributes.VendorID,
error=Status.UnsupportedAccess)
self.print_step(9, "TH2 reads DUT Endpoint 0 Descriptor cluster DeviceTypeList attribute")
await self.read_single_attribute_expect_error(
dev_ctrl=self.th2, endpoint=0,
cluster=Clusters.Objects.Descriptor,
attribute=Clusters.Descriptor.Attributes.DeviceTypeList,
error=Status.UnsupportedAccess)
self.print_step(10, "TH2 reads DUT Endpoint 0 Basic Information cluster VendorID attribute")
await self.read_single_attribute_check_success(
dev_ctrl=self.th2, endpoint=0,
cluster=Clusters.Objects.BasicInformation,
attribute=Clusters.BasicInformation.Attributes.VendorID)
self.print_step(11, "TH1 resets the ACLs to default value by writing DUT EP0")
full_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[])
acl = [full_acl]
await self.write_acl(acl, self.th1)
self.print_step(
12, "TH1 removes the TH2 fabric by sending the RemoveFabric command to the DUT with the FabricIndex set to th2FabricIndex")
removeFabricCmd = Clusters.OperationalCredentials.Commands.RemoveFabric(th2FabricIndex)
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd)
if __name__ == "__main__":
default_matter_test_main()