forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTC_ACE_1_2.py
292 lines (237 loc) · 15.5 KB
/
TC_ACE_1_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# === END CI TEST ARGUMENTS ===
import logging
import queue
import chip.clusters as Clusters
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction, TypedAttributePath
from chip.exceptions import ChipStackError
from chip.interaction_model import Status
from chip.testing.matter_testing import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
class AttributeChangeCallback:
def __init__(self, expected_attribute: ClusterObjects.ClusterAttributeDescriptor, output: queue.Queue):
self._output = output
self._expected_attribute = expected_attribute
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
q = (path, transaction)
logging.info(f'Got subscription report for {path.AttributeType}')
self._output.put(q)
class EventChangeCallback:
def __init__(self, expected_event: ClusterObjects.ClusterEvent, output: queue.Queue):
self._output = output
self._expected_cluster_id = expected_event.cluster_id
self._expected_event_id = expected_event.event_id
def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster_id and res.Header.EventId == self._expected_event_id:
logging.info(
f'Got subscription report for event {self._expected_event_id} on cluster {self._expected_cluster_id}: {res.Data}')
self._output.put(res)
def WaitForAttributeReport(q: queue.Queue, expected_attribute: ClusterObjects.ClusterAttributeDescriptor):
try:
path, transaction = q.get(block=True, timeout=10)
except queue.Empty:
asserts.fail("Failed to receive a report for the attribute change for {}".format(expected_attribute))
asserts.assert_equal(path.AttributeType, expected_attribute, "Received incorrect attribute report")
try:
transaction.GetAttribute(path)
except KeyError:
asserts.fail("Attribute not found in returned report")
def WaitForEventReport(q: queue.Queue, expected_event: ClusterObjects.ClusterEvent):
try:
res = q.get(block=True, timeout=10)
except queue.Empty:
asserts.fail("Failed to receive a report for the event {}".format(expected_event))
asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
class TC_ACE_1_2(MatterBaseTest):
def __init__(self, *args):
self.breadcrumb = 1
self.breadcrumb_queue = queue.Queue()
self.subscription_breadcrumb = None
super().__init__(*args)
async def write_acl(self, acl):
# This returns an attribute status
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
async def steps_subscribe_breadcrumb(self, print_steps: bool):
if print_steps:
self.print_step(3, "TH2 subscribes to the Breadcrumb attribute")
self.subscription_breadcrumb = await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb)], reportInterval=(1, 5), keepSubscriptions=False, autoResubscribe=False)
breadcrumb_cb = AttributeChangeCallback(Clusters.GeneralCommissioning.Attributes.Breadcrumb, self.breadcrumb_queue)
self.subscription_breadcrumb.SetAttributeUpdateCallback(breadcrumb_cb)
async def steps_receive_breadcrumb(self, print_steps: bool):
if print_steps:
self.print_step(9, "TH1 writes the breadcrumb attribute")
await self.default_controller.WriteAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb(self.breadcrumb))])
if print_steps:
self.print_step(10, "TH2 waits for a subscription report from the DUT for breadcrumb")
WaitForAttributeReport(self.breadcrumb_queue, Clusters.GeneralCommissioning.Attributes.Breadcrumb)
self.breadcrumb = self.breadcrumb + 1
async def steps_admin_subscription_error(self, print_steps: bool):
if print_steps:
self.print_step(13, "Subscribe to the ACL attribute, expect INVALID_ACTION")
try:
await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=False, autoResubscribe=False)
asserts.fail("Incorrectly subscribed to attribute with invalid permissions")
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
if print_steps:
self.print_step(14, "Subscribe to the AccessControlEntryChanged event, expect INVALID_ACTION")
try:
await self.TH2.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessControlEntryChanged)], reportInterval=(
1, 5), fabricFiltered=False, keepSubscriptions=False, autoResubscribe=False)
asserts.fail("Incorrectly subscribed to event with invalid permissions")
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
@async_test_body
async def test_TC_ACE_1_2(self):
self.print_step(1, "Commissioning, already done")
fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0]
TH1_nodeid = self.matter_test_config.controller_node_id
TH2_nodeid = self.matter_test_config.controller_node_id + 1
self.TH2 = fabric_admin.NewController(nodeId=TH2_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path))
self.print_step(2, "TH1 writes ACL for admin with two subjects")
TH1_2_admin = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([TH1_2_admin])
# Step 3 - subscribe to breadcrumb - print handled in function
await self.steps_subscribe_breadcrumb(print_steps=True)
self.print_step(4, "TH2 subscribes to ACL attribute")
subscription_acl = await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
acl_queue = queue.Queue()
acl_cb = AttributeChangeCallback(Clusters.AccessControl.Attributes.Acl, acl_queue)
subscription_acl.SetAttributeUpdateCallback(acl_cb)
self.print_step(5, "TH2 subscribes to the AccessControlEntryChanged event")
urgent = 1
subscription_ace = await self.TH2.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessControlEntryChanged, urgent)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
ace_queue = queue.Queue()
ace_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessControlEntryChanged, ace_queue)
subscription_ace.SetEventUpdateCallback(ace_cb)
self.print_step(6, "TH1 writes ACL attribute")
acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH2_nodeid, TH2_nodeid + 1],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl])
self.print_step(7, "TH2 waits for subscription report for ACL")
WaitForAttributeReport(acl_queue, Clusters.AccessControl.Attributes.Acl)
self.print_step(8, "TH2 waits for subscription report for access control entry changed event")
WaitForEventReport(ace_queue, Clusters.AccessControl.Events.AccessControlEntryChanged)
# this function prints the steps for 9 and 10
await self.steps_receive_breadcrumb(print_steps=True)
self.print_step(11, "TH1 writes ACL attribute")
acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kManage,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl1, acl2])
self.print_step(12, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
# step 13 and 14 - printed in the function
await self.steps_admin_subscription_error(print_steps=True)
self.print_step(15, "TH2 subscribes to breadcrumb attribute")
await self.steps_subscribe_breadcrumb(print_steps=False)
self.print_step(16, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(17, "TH1 writes ACL attribute")
acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl1, acl2])
self.print_step(18, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(19, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
await self.steps_admin_subscription_error(print_steps=False)
self.print_step(20, "TH2 subscribes to breadcrumb attribute")
await self.steps_subscribe_breadcrumb(print_steps=False)
self.print_step(21, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(22, "TH1 writes ACL attribute")
acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl1, acl2])
self.print_step(23, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(24, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
await self.steps_admin_subscription_error(print_steps=False)
self.print_step(25, "TH2 subscribes to breadcrumb attribute")
await self.steps_subscribe_breadcrumb(print_steps=False)
self.print_step(26, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(27, "TH1 writes ACL attribute")
acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[])
await self.write_acl([acl])
self.print_step(28, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
await self.steps_admin_subscription_error(print_steps=False)
self.print_step(29, "TH2 attempts to subscribe to the breadcrumb attribute - expect error")
try:
await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb)], reportInterval=(1, 5), keepSubscriptions=False, autoResubscribe=False)
asserts.fail("Incorrectly subscribed to attribute with invalid permissions")
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
if __name__ == "__main__":
default_matter_test_main()