forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTC_RR_1_1.py
870 lines (733 loc) · 50.1 KB
/
TC_RR_1_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import logging
import math
import queue
import random
import string
import time
from typing import Any, Dict, List, Set
import chip.clusters as Clusters
from chip.interaction_model import Status as StatusEnum
from chip.utils import CommissioningBuildingBlocks
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
from TC_SC_3_6 import AttributeChangeAccumulator, ResubscriptionCatcher
# TODO: Overall, we need to add validation that session IDs have not changed throughout to be agnostic
# to some internal behavior assumptions of the SDK we are making relative to the write to
# the trigger the subscriptions not re-opening a new CASE session
#
def generate_controller_name(fabric_index: int, controller_index: int):
return f"RD{fabric_index}{string.ascii_uppercase[controller_index]}"
class TC_RR_1_1(MatterBaseTest):
def setup_class(self):
self._pseudo_random_generator = random.Random(1234)
self._subscriptions = []
def teardown_class(self):
logging.info("Teardown: shutting down all subscription to avoid racy callbacks")
for subscription in self._subscriptions:
subscription.Shutdown()
@async_test_body
async def test_TC_RR_1_1(self):
dev_ctrl = self.default_controller
# Debug/test arguments
# Get overrides for debugging the test
num_fabrics_to_commission = self.user_params.get("num_fabrics_to_commission", 5)
num_controllers_per_fabric = self.user_params.get("num_controllers_per_fabric", 3)
# Immediate reporting
min_report_interval_sec = self.user_params.get("min_report_interval_sec", 0)
# 10 minutes max reporting interval --> We don't care about keep-alives per-se and
# want to avoid resubscriptions
max_report_interval_sec = self.user_params.get("max_report_interval_sec", 10 * 60)
# Time to wait after changing NodeLabel for subscriptions to all hit. This is dependant
# on MRP params of subscriber and on actual min_report_interval.
# TODO: Determine the correct max value depending on target. Test plan doesn't say!
timeout_delay_sec = self.user_params.get("timeout_delay_sec", max_report_interval_sec * 2)
# Whether to skip filling the UserLabel clusters
skip_user_label_cluster_steps = self.user_params.get("skip_user_label_cluster_steps", False)
# Whether to do the local session ID comparison checks to prove new sessions have not been established.
check_local_session_id_unchanged = self.user_params.get("check_local_session_id_unchanged", False)
# Whether to check heap statistics. Add `--bool-arg check_heap_watermarks:true` to command line to enable
check_heap_watermarks = self.user_params.get("check_heap_watermarks", False)
BEFORE_LABEL = "Before Subscriptions 12345678912"
AFTER_LABEL = "After Subscriptions 123456789123"
# Pre-conditions
# Do a read-out of heap statistics before the test begins
if check_heap_watermarks:
logging.info("Read Heap info before stress test")
high_watermark_before, current_usage_before = await self.read_heap_statistics(dev_ctrl)
# Make sure all certificates are installed with maximal size
dev_ctrl.fabricAdmin.certificateAuthority.maximizeCertChains = True
# TODO: Do from PICS list. The reflection approach here what a real client would do,
# and it respects what the test says: "TH writes 4 entries per endpoint where LabelList is supported"
logging.info("Pre-condition: determine whether any endpoints have UserLabel cluster (ULABEL.S.A0000(LabelList))")
endpoints_with_user_label_list = await dev_ctrl.ReadAttribute(self.dut_node_id, [Clusters.UserLabel.Attributes.LabelList])
has_user_labels = len(endpoints_with_user_label_list) > 0
if has_user_labels:
logging.info("--> User label cluster present on endpoints %s" %
", ".join(["%d" % ep for ep in endpoints_with_user_label_list.keys()]))
else:
logging.info("--> User label cluster not present on any endpoitns")
# Generate list of all clients names
client_list = []
# TODO: Shall we also verify SupportedFabrics attribute, and the CapabilityMinima attribute?
logging.info("Pre-conditions: validate CapabilityMinima.CaseSessionsPerFabric >= 3")
capability_minima = await self.read_single_attribute(dev_ctrl,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.BasicInformation.Attributes.CapabilityMinima)
asserts.assert_greater_equal(capability_minima.caseSessionsPerFabric, 3)
# Step 1: Commission 5 fabrics with maximized NOC chains. 1a and 1b have already been completed at this time.
logging.info(f"Step 1: use existing fabric to configure new fabrics so that total is {num_fabrics_to_commission} fabrics")
# Generate Node IDs for subsequent controllers start at 200, follow 200, 300, ...
node_ids = [200 + (i * 100) for i in range(num_controllers_per_fabric - 1)]
# Prepare clients for first fabric, that includes the default controller
fabric_index = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=dev_ctrl)
dev_ctrl.name = generate_controller_name(fabric_index, 0)
client_list.append(dev_ctrl)
if num_controllers_per_fabric > 1:
new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(
fabricAdmin=dev_ctrl.fabricAdmin,
adminDevCtrl=dev_ctrl,
controllerNodeIds=node_ids,
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
targetNodeId=self.dut_node_id, catTags=[0x0001_0001]
)
for idx, controller in enumerate(new_controllers):
controller.name = generate_controller_name(fabric_index, idx+1)
client_list.extend(new_controllers)
# Step 1c - Ensure there are no leftover fabrics from another process.
commissioned_fabric_count: int = await self.read_single_attribute(
dev_ctrl, node_id=self.dut_node_id,
endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
# Insert a fabric to self-test the next step.
# This is not hidden behind a flag to avoid potential undetected bugs.
if commissioned_fabric_count == 1:
logging.info("Commissioning fabric for TH test.")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId, catTags=[0x0001_0001])
new_admin_ctrl.name = "THTF"
await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=self.dut_node_id)
commissioned_fabric_count = await self.read_single_attribute(
dev_ctrl, node_id=self.dut_node_id,
endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
asserts.assert_not_equal(commissioned_fabric_count, 1, "TH Error: failed to add fabric for testing TH.")
# Step 1c - perform removal.
if commissioned_fabric_count > 1:
logging.info("Removing extra fabrics from device.")
fabrics: List[Clusters.OperationalCredentials.Structs.FabricDescriptorStruct] = await self.read_single_attribute(
dev_ctrl, node_id=self.dut_node_id, endpoint=0,
attribute=Clusters.OperationalCredentials.Attributes.Fabrics, fabricFiltered=False)
current_fabric_index = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)
for fabric in fabrics:
if fabric.fabricIndex == current_fabric_index:
continue
# This is not the test client's fabric, so remove it.
await dev_ctrl.SendCommand(
self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=fabric.fabricIndex))
commissioned_fabric_count = await self.read_single_attribute(
dev_ctrl, node_id=self.dut_node_id,
endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
asserts.assert_equal(commissioned_fabric_count, 1, "Failed to remove extra fabrics from DUT.")
# Prepare clients for subsequent fabrics (step 1d)
for i in range(num_fabrics_to_commission - 1):
admin_index = 2 + i
logging.info("Commissioning fabric %d/%d" % (admin_index, num_fabrics_to_commission))
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=admin_index)
new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId, catTags=[0x0001_0001])
await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(commissionerDevCtrl=dev_ctrl,
newFabricDevCtrl=new_admin_ctrl,
existingNodeId=self.dut_node_id,
newNodeId=self.dut_node_id)
fabric_index = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=new_admin_ctrl)
new_admin_ctrl.name = generate_controller_name(fabric_index, 0)
client_list.append(new_admin_ctrl)
if num_controllers_per_fabric > 1:
new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(
fabricAdmin=new_fabric_admin,
adminDevCtrl=new_admin_ctrl,
controllerNodeIds=node_ids,
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
targetNodeId=self.dut_node_id,
catTags=[0x0001_0001]
)
for idx, controller in enumerate(new_controllers):
controller.name = generate_controller_name(fabric_index, idx+1)
client_list.extend(new_controllers)
asserts.assert_equal(len(client_list), num_fabrics_to_commission *
num_controllers_per_fabric, "Must have the right number of clients")
commissioned_fabric_count = await self.read_single_attribute(
dev_ctrl, node_id=self.dut_node_id,
endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
asserts.assert_equal(commissioned_fabric_count, num_fabrics_to_commission,
"Must have the right number of fabrics commissioned.")
fabric_table: List[Clusters.OperationalCredentials.Structs.FabricDescriptorStruct] = await self.read_single_attribute(
dev_ctrl, node_id=self.dut_node_id,
endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, fabricFiltered=False)
client_by_name = {client.name: client for client in client_list}
local_session_id_by_client_name = {client.name: client.GetConnectedDeviceSync(
self.dut_node_id).localSessionId for client in client_list}
# Step 2: Set the Label field for each fabric and BasicInformation.NodeLabel to 32 characters
logging.info("Step 2: Setting the Label field for each fabric and BasicInformation.NodeLabel to 32 characters")
for fabric in fabric_table:
client_name = generate_controller_name(fabric.fabricIndex, 0)
client = client_by_name[client_name]
# Send the UpdateLabel command
label = ("%d" % fabric.fabricIndex) * 32
logging.info("Step 2a: Setting fabric label on fabric %d to '%s' using client %s" %
(fabric.fabricIndex, label, client_name))
await client.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.UpdateFabricLabel(label))
# Read back
fabric_metadata = await self.read_single_attribute(client,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.OperationalCredentials.Attributes.Fabrics)
print(fabric_metadata)
asserts.assert_equal(fabric_metadata[0].label, label, "Fabrics[x].label must match what was written")
# Before subscribing, set the NodeLabel to "Before Subscriptions"
logging.info(f"Step 2b: Set BasicInformation.NodeLabel to {BEFORE_LABEL}")
await client_list[0].WriteAttribute(self.dut_node_id,
[(0, Clusters.BasicInformation.Attributes.NodeLabel(value=BEFORE_LABEL))])
node_label = await self.read_single_attribute(client,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.BasicInformation.Attributes.NodeLabel)
asserts.assert_equal(node_label, BEFORE_LABEL, "NodeLabel must match what was written")
# Step 3: Add 4 Access Control entries on DUT with a list of 4 Subjects and 3 Targets with the following parameters (...)
await self.send_acl(test_step=3, client_by_name=client_by_name, enable_access_to_group_cluster=False, fabric_table=fabric_table)
# Step 4 and 5 (the operations cannot be separated): establish all CASE sessions and subscriptions
# Subscribe with all clients to NodeLabel attribute and 2 more paths
sub_handlers = []
resub_catchers = []
output_queue = queue.Queue()
subscription_contents = [
(0, Clusters.BasicInformation.Attributes.NodeLabel), # Single attribute
(0, Clusters.OperationalCredentials), # Wildcard all of opcreds attributes on EP0
Clusters.Descriptor # All descriptors on all endpoints
]
logging.info("Step 4 and 5 (first part): Establish subscription with all %d clients" % len(client_list))
for sub_idx, client in enumerate(client_list):
logging.info("Establishing subscription %d/%d from controller node %s" % (sub_idx + 1, len(client_list), client.name))
sub = await client.ReadAttribute(
nodeid=self.dut_node_id,
attributes=subscription_contents,
reportInterval=(min_report_interval_sec, max_report_interval_sec),
keepSubscriptions=False
)
self._subscriptions.append(sub)
attribute_handler = AttributeChangeAccumulator(
name=client.name, expected_attribute=Clusters.BasicInformation.Attributes.NodeLabel, output=output_queue)
sub.SetAttributeUpdateCallback(attribute_handler)
sub_handlers.append(attribute_handler)
# TODO: Replace resubscription catcher with API to disable re-subscription on failure
resub_catcher = ResubscriptionCatcher(name=client.name)
sub.SetResubscriptionAttemptedCallback(resub_catcher)
resub_catchers.append(resub_catcher)
asserts.assert_equal(len(self._subscriptions), len(client_list), "Must have the right number of subscriptions")
# Step 6: Read 9 paths and validate success
logging.info("Step 6: Read 9 paths (first 9 attributes of Basic Information cluster) and validate success")
large_read_contents = [
Clusters.BasicInformation.Attributes.DataModelRevision,
Clusters.BasicInformation.Attributes.VendorName,
Clusters.BasicInformation.Attributes.VendorID,
Clusters.BasicInformation.Attributes.ProductName,
Clusters.BasicInformation.Attributes.ProductID,
Clusters.BasicInformation.Attributes.NodeLabel,
Clusters.BasicInformation.Attributes.Location,
Clusters.BasicInformation.Attributes.HardwareVersion,
Clusters.BasicInformation.Attributes.HardwareVersionString,
]
large_read_paths = [(0, attrib) for attrib in large_read_contents]
basic_info = await dev_ctrl.ReadAttribute(self.dut_node_id, large_read_paths)
# Make sure everything came back from the read that we expected
asserts.assert_true(0 in basic_info.keys(), "Must have read endpoint 0 data")
asserts.assert_true(Clusters.BasicInformation in basic_info[0].keys(), "Must have read Basic Information cluster data")
for attribute in large_read_contents:
asserts.assert_true(attribute in basic_info[0][Clusters.BasicInformation],
"Must have read back attribute %s" % (attribute.__name__))
# Step 7: Trigger a change on NodeLabel
logging.info(
"Step 7: Change attribute with one client, await all attributes changed successfully without loss of subscriptions")
await asyncio.sleep(1)
await client_list[0].WriteAttribute(self.dut_node_id,
[(0, Clusters.BasicInformation.Attributes.NodeLabel(value=AFTER_LABEL))])
all_changes = {client.name: False for client in client_list}
# Await a stabilization delay in increments to let the event loops run
start_time = time.time()
elapsed = 0
time_remaining = timeout_delay_sec
while time_remaining > 0:
try:
item = output_queue.get(block=True, timeout=time_remaining)
client_name, endpoint, attribute, value = item['name'], item['endpoint'], item['attribute'], item['value']
# Record arrival of an expected subscription change when seen
if endpoint == 0 and attribute == Clusters.BasicInformation.Attributes.NodeLabel and value == AFTER_LABEL:
if not all_changes[client_name]:
logging.info("Got expected attribute change for client %s" % client_name)
all_changes[client_name] = True
# We are done waiting when we have accumulated all results
if all(all_changes.values()):
logging.info("All clients have reported, done waiting.")
break
except queue.Empty:
# No error, we update timeouts and keep going
pass
elapsed = time.time() - start_time
time_remaining = timeout_delay_sec - elapsed
logging.info("Step 7: Validation of results")
sub_test_failed = False
for catcher in resub_catchers:
if catcher.caught_resubscription:
logging.error("Client %s saw a resubscription" % catcher.name)
sub_test_failed = True
else:
logging.info("Client %s correctly did not see a resubscription" % catcher.name)
all_reports_gotten = all(all_changes.values())
if not all_reports_gotten:
logging.error("Missing reports from the following clients: %s" %
", ".join([name for name, value in all_changes.items() if value is False]))
sub_test_failed = True
else:
logging.info("Got successful reports from all clients, meaning all concurrent CASE sessions worked")
# Determine result of Step 7
if sub_test_failed:
asserts.fail("Failed step 7 !")
# Step 8: Validate sessions have not changed by doing a read on NodeLabel from all clients
logging.info("Step 8a: Read back NodeLabel directly from all clients")
for sub_idx, client in enumerate(client_list):
logging.info("Reading NodeLabel (%d/%d) from controller node %s" % (sub_idx + 1, len(client_list), client.name))
label_readback = await self.read_single_attribute(client,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.BasicInformation.Attributes.NodeLabel)
asserts.assert_equal(label_readback, AFTER_LABEL)
# On each client, read back the local session id for the CASE session to the DUT and ensure
# it's the same as that of the session established right at the beginning of the test.
# In tandem with checking that the number of sessions to the DUT is exactly one,
# this ensures we have not established any new CASE
# sessions in this test.
if check_local_session_id_unchanged:
logging.info("Step 8b: Validate that the local CASE session ID hasn't changed")
num_failed_clients = 0
for client in client_list:
beginning_session_id = local_session_id_by_client_name[client.name]
end_session_id = client.GetConnectedDeviceSync(self.dut_node_id).localSessionId
total_sessions = client.GetConnectedDeviceSync(self.dut_node_id).numTotalSessions
if (beginning_session_id != end_session_id):
logging.error(
f"Test ended with a different session ID created from what we had before for {client.name} "
f"(total sessions = {total_sessions})")
num_failed_clients = num_failed_clients + 1
elif (total_sessions != 1):
logging.error(f"Test ended with more than 1 session for {client.name}")
num_failed_clients = num_failed_clients + 1
if (num_failed_clients > 0):
asserts.fail(f"Failed Step 8b: ({num_failed_clients} / {len(client_list)} failed)")
# Step 9: Fill user label list
if has_user_labels and not skip_user_label_cluster_steps:
await self.fill_user_label_list(dev_ctrl, self.dut_node_id)
else:
logging.info("Step 9: Skipped due to no UserLabel cluster instances")
# Step 10: Reconfig ACL to allow test runner access to Groups clusters on all endpoints.
logging.info("Step 10: Reconfiguring ACL to allow access to Groups Clusters")
await self.send_acl(test_step=10, client_by_name=client_by_name, enable_access_to_group_cluster=True, fabric_table=fabric_table)
# Step 11: Count all group cluster instances
# and ensure MaxGroupsPerFabric >= 4 * counted_groups_clusters.
logging.info("Step 11: Validating groups support minimums")
groups_cluster_endpoints: Dict[int, Any] = await dev_ctrl.ReadAttribute(self.dut_node_id, [Clusters.Groups])
counted_groups_clusters: int = len(groups_cluster_endpoints)
# The test for Step 11 and all of Steps 12 to 15 are only performed if Groups cluster instances are found.
if counted_groups_clusters > 0:
indicated_max_groups_per_fabric: int = await self.read_single_attribute(
dev_ctrl,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupsPerFabric)
logging.info(
f"MaxGroupsPerFabric value: {indicated_max_groups_per_fabric}, number of endpoints with Groups clusters cluster: {counted_groups_clusters}, which are: {list(groups_cluster_endpoints.keys())}")
if indicated_max_groups_per_fabric < 4 * counted_groups_clusters:
asserts.fail("Failed Step 11: MaxGroupsPerFabric < 4 * counted_groups_clusters")
# Step 12: Confirm MaxGroupKeysPerFabric meets the minimum requirement of 3.
indicated_max_group_keys_per_fabric: int = await self.read_single_attribute(
dev_ctrl,
node_id=self.dut_node_id,
endpoint=0,
attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupKeysPerFabric)
if indicated_max_group_keys_per_fabric < 3:
asserts.fail("Failed Step 12: MaxGroupKeysPerFabric < 3")
# Create a list of per-fabric clients to use for filling group resources accross all fabrics.
fabric_unique_clients: List[Any] = []
for fabric in fabric_table:
client_name = generate_controller_name(fabric.fabricIndex, 0)
fabric_unique_clients.append(client_by_name[client_name])
# Step 13: Write and verify indicated_max_group_keys_per_fabric group keys to all fabrics.
group_keys: List[List[
Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]] = await self.fill_and_validate_group_key_sets(
num_fabrics_to_commission, fabric_unique_clients, indicated_max_group_keys_per_fabric)
# Step 14: Write and verify indicated_max_groups_per_fabric group/key mappings for all fabrics.
# First, Generate list of unique group/key mappings
group_key_map: List[Dict[int, int]] = [{} for _ in range(num_fabrics_to_commission)]
for fabric_list_idx in range(num_fabrics_to_commission):
for group_idx in range(indicated_max_groups_per_fabric):
group_id: int = fabric_list_idx * indicated_max_groups_per_fabric + group_idx + 1
group_key_idx: int = group_idx % len(group_keys[fabric_list_idx])
group_key_map[fabric_list_idx][group_id] = group_keys[fabric_list_idx][group_key_idx].groupKeySetID
await self.fill_and_validate_group_key_map(
num_fabrics_to_commission, fabric_unique_clients, group_key_map, fabric_table)
# Step 15: Add all the groups to the discovered groups-supporting endpoints and verify GroupTable
group_table_written: List[
Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]] = await self.add_all_groups(
num_fabrics_to_commission, fabric_unique_clients, group_key_map,
groups_cluster_endpoints, indicated_max_groups_per_fabric, fabric_table)
await self.validate_group_table(num_fabrics_to_commission, fabric_unique_clients, group_table_written, fabric_table)
# Read heap watermarks after the test
if check_heap_watermarks:
logging.info("Read Heap info after stress test")
high_watermark_after, current_usage_after = await self.read_heap_statistics(dev_ctrl)
logging.info("=== Heap Usage Diagnostics ===\nHigh watermark: {} (before) / {} (after)\n"
"Current usage: {} (before) / {} (after)".format(high_watermark_before, high_watermark_after,
current_usage_before, current_usage_after))
def random_string(self, length) -> str:
rnd = self._pseudo_random_generator
return "".join([rnd.choice("abcdef0123456789") for _ in range(length)])[:length]
async def fill_user_label_list(self, dev_ctrl, target_node_id):
logging.info("Step 9: Fill UserLabel clusters on each endpoint")
user_labels = await dev_ctrl.ReadAttribute(target_node_id, [Clusters.UserLabel])
# Build 4 sets of maximized labels
random_label = self.random_string(16)
random_value = self.random_string(16)
labels = [Clusters.UserLabel.Structs.LabelStruct(label=random_label, value=random_value) for _ in range(4)]
for endpoint_id in user_labels:
clusters = user_labels[endpoint_id]
for cluster in clusters:
if cluster == Clusters.UserLabel:
logging.info("Step 9a: Filling UserLabel cluster on endpoint %d" % endpoint_id)
statuses = await dev_ctrl.WriteAttribute(target_node_id,
[(endpoint_id, Clusters.UserLabel.Attributes.LabelList(labels))])
asserts.assert_equal(statuses[0].Status, StatusEnum.Success, "Label write must succeed")
logging.info("Step 9b: Validate UserLabel cluster contents after write on endpoint %d" % endpoint_id)
read_back_labels = await self.read_single_attribute(dev_ctrl,
node_id=target_node_id,
endpoint=endpoint_id,
attribute=Clusters.UserLabel.Attributes.LabelList)
print(read_back_labels)
asserts.assert_equal(read_back_labels, labels, "LabelList attribute must match what was written")
async def fill_and_validate_group_key_sets(self,
fabrics: int,
clients: List[Any],
keys_per_fabric: int) -> List[List[
Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]]:
# Step 12: Write indicated_max_group_keys_per_fabric group keys to all fabrics.
group_keys: List[List[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]] = [[] for _ in range(fabrics)]
for client_idx in range(fabrics):
client: Any = clients[client_idx]
# Write, skip the IPK key set.
for group_key_cluster_idx in range(1, keys_per_fabric):
group_key_list_idx: int = group_key_cluster_idx - 1
logging.info("Step 13: Setting group key on fabric %d at index '%d'" % (client_idx+1, group_key_cluster_idx))
group_keys[client_idx].append(self.build_group_key(client_idx, group_key_cluster_idx, keys_per_fabric))
await client.SendCommand(self.dut_node_id, 0, Clusters.GroupKeyManagement.Commands.KeySetWrite(
group_keys[client_idx][group_key_list_idx]))
# Step 12 verification: After all the key sets were written, read all the information back.
for client_idx in range(fabrics):
client: Any = clients[client_idx]
logging.info("Step 13: Reading back group keys on fabric %d" % (client_idx+1))
resp = await client.SendCommand(self.dut_node_id, 0,
Clusters.GroupKeyManagement.Commands.KeySetReadAllIndices(),
responseType=Clusters.GroupKeyManagement.Commands.KeySetReadAllIndicesResponse)
read_group_key_ids: List[int] = resp.groupKeySetIDs
known_group_key_ids: List[int] = [key_set.groupKeySetID for key_set in group_keys[client_idx]]
ipk_group_key_id: Set[int] = set(read_group_key_ids) - set(known_group_key_ids)
asserts.assert_equal(keys_per_fabric, len(read_group_key_ids),
"KeySetReadAllIndicesResponse length does "
"not match the key support indicated: %d." % (keys_per_fabric))
asserts.assert_equal(len(ipk_group_key_id), 1,
"Read more than 1 key ID that did not match written values after IPK (only expected 1 for IPK).")
return group_keys
async def fill_and_validate_group_key_map(self,
fabrics: int,
clients: List[Any],
group_key_map: List[Dict[int, int]],
fabric_table: List[
Clusters.OperationalCredentials.Structs.FabricDescriptorStruct]) -> None:
# Step 14: Write and verify indicated_max_groups_per_fabric group/key mappings for all fabrics.
mapping_structs: List[List[Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct]] = [[] for _ in range(fabrics)]
for client_idx in range(fabrics):
client: Any = clients[client_idx]
fabric_idx: int = fabric_table[client_idx].fabricIndex
for group in group_key_map[client_idx]:
mapping_structs[client_idx].append(Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct(
groupId=group,
groupKeySetID=group_key_map[client_idx][group],
fabricIndex=fabric_idx))
logging.info("Step 14: Setting group key map on fabric %d" % (fabric_idx))
await client.WriteAttribute(
self.dut_node_id, [(0, Clusters.GroupKeyManagement.Attributes.GroupKeyMap(mapping_structs[client_idx]))])
# Step 14 verification: After all the group key maps were written, read all the information back.
for client_idx in range(fabrics):
client: Any = clients[client_idx]
fabric_idx: int = fabric_table[client_idx].fabricIndex
logging.info("Step 14: Reading group key map on fabric %d" % (fabric_idx))
group_key_map_readback = await self.read_single_attribute(
client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.GroupKeyMap)
found_entry: int = 0
for read_entry in group_key_map_readback:
if read_entry.fabricIndex != fabric_idx:
continue
written_entry = next(entry for entry in mapping_structs[client_idx] if entry.groupId == read_entry.groupId)
found_entry += 1
asserts.assert_equal(written_entry.groupId, read_entry.groupId)
asserts.assert_equal(written_entry.groupKeySetID, read_entry.groupKeySetID)
asserts.assert_equal(written_entry.fabricIndex, read_entry.fabricIndex)
asserts.assert_equal(found_entry, len(mapping_structs[client_idx]),
"GroupKeyMap does not match the length of written data.")
async def add_all_groups(self,
fabrics: int,
clients: List[Any],
group_key_map: List[Dict[int, int]],
group_endpoints: Dict[int, Any],
groups_per_fabric: int,
fabric_table: List[
Clusters.OperationalCredentials.Structs.FabricDescriptorStruct]) -> List[
Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]]:
# Step 14: Add indicated_max_groups_per_fabric to each fabric through the Groups clusters on supporting endpoints.
written_group_table_map: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]] = [
{} for _ in range(fabrics)]
for client_idx in range(fabrics):
client: Any = clients[client_idx]
fabric_idx: int = fabric_table[client_idx].fabricIndex
base_groups_per_endpoint: int = math.floor(groups_per_fabric / len(group_endpoints))
groups_remainder: int = groups_per_fabric % len(group_endpoints)
group_id_list_copy = list(group_key_map[client_idx])
# This is just a sanity check that the number of IDs provided matches how many group IDs we are told to
# write to each fabric.
asserts.assert_equal(len(group_id_list_copy), groups_per_fabric)
for endpoint_id in group_endpoints:
groups_to_add: int = base_groups_per_endpoint
if groups_remainder:
groups_to_add += 1
groups_remainder -= 1
feature_map: int = await self.read_single_attribute(client,
node_id=self.dut_node_id,
endpoint=endpoint_id,
attribute=Clusters.Groups.Attributes.FeatureMap)
name_featrure_bit: int = 0
name_supported: bool = (feature_map & (1 << name_featrure_bit)) != 0
# Write groups to cluster
for _ in range(groups_to_add):
group_id = group_id_list_copy.pop()
group_name: str = self.random_string(16) if name_supported else ""
command: Clusters.Groups.Commands.AddGroup = Clusters.Groups.Commands.AddGroup(
groupID=group_id, groupName=group_name)
written_group_table_map[client_idx][group_id] = Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct(
groupId=group_id,
groupName=group_name,
fabricIndex=fabric_idx,
endpoints=[endpoint_id])
add_response: Clusters.Groups.Commands.AddGroupResponse = await client.SendCommand(
self.dut_node_id, endpoint_id, command, responseType=Clusters.Groups.Commands.AddGroupResponse)
asserts.assert_equal(StatusEnum.Success, add_response.status)
asserts.assert_equal(group_id, add_response.groupID)
return written_group_table_map
async def validate_group_table(self,
fabrics: int,
clients: List[Any],
group_table_written: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]],
fabric_table: List[Clusters.OperationalCredentials.Structs.FabricDescriptorStruct]) -> None:
for client_idx in range(fabrics):
client: Any = clients[client_idx]
fabric_idx: int = fabric_table[client_idx].fabricIndex
group_table_read: List[Clusters.GroupKeyManagement.Attributes.GroupTable] = await self.read_single_attribute(
client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.GroupTable)
found_groups: int = 0
for read_entry in group_table_read:
if read_entry.fabricIndex != fabric_idx:
continue
found_groups += 1
asserts.assert_in(read_entry.groupId, group_table_written[client_idx], "Group missing from group map")
written_entry: Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct = group_table_written[
client_idx][read_entry.groupId]
asserts.assert_equal(written_entry.groupId, read_entry.groupId)
asserts.assert_equal(written_entry.endpoints, read_entry.endpoints)
asserts.assert_equal(written_entry.groupName, read_entry.groupName)
asserts.assert_equal(written_entry.fabricIndex, read_entry.fabricIndex)
asserts.assert_equal(found_groups, len(group_table_written[client_idx]),
"Found group count does not match written value.")
async def send_acl(self,
test_step: int,
client_by_name,
enable_access_to_group_cluster: bool,
fabric_table: List[
Clusters.OperationalCredentials.Structs.FabricDescriptorStruct]):
for fabric in fabric_table:
client_name = generate_controller_name(fabric.fabricIndex, 0)
client = client_by_name[client_name]
acl = self.build_acl(enable_access_to_group_cluster)
logging.info(f"Step {test_step}a: Writing ACL entry for fabric {fabric.fabricIndex}")
await client.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
logging.info(f"Step {test_step}b: Validating ACL entry for fabric {fabric.fabricIndex}")
acl_readback = await self.read_single_attribute(
client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.AccessControl.Attributes.Acl)
fabric_index = 9999
for entry in acl_readback:
asserts.assert_equal(entry.fabricIndex, fabric.fabricIndex, "Fabric Index of response entries must match")
fabric_index = entry.fabricIndex
for entry in acl:
# Fix-up the original ACL list items (that all had fabricIndex of 0 on write, since ignored)
# so that they match incoming fabric index. Allows checking by equality of the structs
entry.fabricIndex = fabric_index
asserts.assert_equal(acl_readback, acl, "ACL must match what was written")
def build_acl(self, enable_access_to_group_cluster: bool):
acl = []
# Test says:
#
# . struct
# - Privilege field: Administer (5)
# - AuthMode field: CASE (2)
# - Subjects field: [0xFFFF_FFFD_0001_0001, 0x2000_0000_0000_0001, 0x2000_0000_0000_0002, 0x2000_0000_0000_0003]
# - Targets field: [
# {Endpoint: 0},
# {Cluster: 0xFFF1_FC00, DeviceType: 0xFFF1_BC30},
# {Cluster: 0xFFF1_FC00, DeviceType: 0xFFF1_BC31}
# ]
# . struct
# - Privilege field: Manage (4)
# - AuthMode field: CASE (2)
# - Subjects field: [0x1000_0000_0000_0001, 0x1000_0000_0000_0002, 0x1000_0000_0000_0003, 0x1000_0000_0000_0004]
# - Targets field: [
# {Cluster: 0xFFF1_FC00, DeviceType: 0xFFF1_BC20},
# {Cluster: 0xFFF1_FC01, DeviceType: 0xFFF1_BC21},
# {Cluster: 0xFFF1_FC02, DeviceType: 0xFFF1_BC22}
# ]
# . struct
# - Privilege field: Operate (3)
# - AuthMode field: CASE (2)
# - Subjects field: [0x3000_0000_0000_0001, 0x3000_0000_0000_0002, 0x3000_0000_0000_0003, 0x3000_0000_0000_0004]
# - Targets field: [{Cluster: 0xFFF1_FC40, DeviceType: 0xFFF1_BC20},
# {Cluster: 0xFFF1_FC41, DeviceType: 0xFFF1_BC21},
# {Cluster: 0xFFF1_FC02, DeviceType: 0xFFF1_BC42}]
# . struct
# - Privilege field: View (1)
# - AuthMode field: CASE (2)
# - Subjects field: [0x4000_0000_0000_0001, 0x4000_0000_0000_0002, 0x4000_0000_0000_0003, 0x4000_0000_0000_0004]
# - Targets field: [{Cluster: 0xFFF1_FC80, DeviceType: 0xFFF1_BC20},
# {Cluster: 0xFFF1_FC81, DeviceType: 0xFFF1_BC21},
# {Cluster: 0xFFF1_FC82, DeviceType: 0xFFF1_BC22}]
# Administer ACL entry
admin_subjects = [0xFFFF_FFFD_0001_0001, 0x2000_0000_0000_0001, 0x2000_0000_0000_0002, 0x2000_0000_0000_0003]
admin_target_field_2 = Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC00, deviceType=0xFFF1_BC30)
if enable_access_to_group_cluster:
admin_target_field_2 = Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0x0000_0004)
admin_targets = [
Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0),
admin_target_field_2,
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC01, deviceType=0xFFF1_BC31)
]
admin_acl_entry = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=admin_subjects,
targets=admin_targets
)
acl.append(admin_acl_entry)
# Manage ACL entry
manage_subjects = [0x1000_0000_0000_0001, 0x1000_0000_0000_0002, 0x1000_0000_0000_0003, 0x1000_0000_0000_0004]
manage_targets = [
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC00, deviceType=0xFFF1_BC20),
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC01, deviceType=0xFFF1_BC21),
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC02, deviceType=0xFFF1_BC22)
]
manage_acl_entry = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kManage,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=manage_subjects,
targets=manage_targets
)
acl.append(manage_acl_entry)
# Operate ACL entry
operate_subjects = [0x3000_0000_0000_0001, 0x3000_0000_0000_0002, 0x3000_0000_0000_0003, 0x3000_0000_0000_0004]
operate_targets = [
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC40, deviceType=0xFFF1_BC20),
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC41, deviceType=0xFFF1_BC21),
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC42, deviceType=0xFFF1_BC42)
]
operate_acl_entry = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=operate_subjects,
targets=operate_targets
)
acl.append(operate_acl_entry)
# View ACL entry
view_subjects = [0x4000_0000_0000_0001, 0x4000_0000_0000_0002, 0x4000_0000_0000_0003, 0x4000_0000_0000_0004]
view_targets = [
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC80, deviceType=0xFFF1_BC20),
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC81, deviceType=0xFFF1_BC21),
Clusters.AccessControl.Structs.AccessControlTargetStruct(cluster=0xFFF1_FC82, deviceType=0xFFF1_BC22)
]
view_acl_entry = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=view_subjects,
targets=view_targets)
acl.append(view_acl_entry)
return acl
def build_group_key(self, fabric_index: int,
group_key_index: int, keys_per_fabric: int) -> Clusters.GroupKeyManagement.Structs.GroupKeySetStruct:
asserts.assert_not_equal(group_key_index, 0, "TH Internal Error: IPK key set index (0) should not be re-generated.")
# groupKeySetID is definted as uint16 in the Matter specification.
# To easily test that the stored values are unique, unique values are created accross all fabrics.
# However, it is only required that values be unique within a fabric according to the specifiction.
# If a device ever provides over 65535 total key sets, then this will need to be updated.
set_id: int = fabric_index*keys_per_fabric + group_key_index
asserts.assert_less_equal(
set_id, 0xFFFF, "Invalid Key Set ID. This may be a limitation of the test harness, not the device under test.")
return Clusters.GroupKeyManagement.Structs.GroupKeySetStruct(
groupKeySetID=set_id,
groupKeySecurityPolicy=Clusters.GroupKeyManagement.Enums.GroupKeySecurityPolicyEnum.kTrustFirst,
epochKey0=self.random_string(16).encode(),
epochStartTime0=(set_id * 4),
epochKey1=self.random_string(16).encode(),
epochStartTime1=(set_id * 4 + 1),
epochKey2=self.random_string(16).encode(),
epochStartTime2=(set_id * 4 + 2))
async def read_heap_statistics(self, dev_ctrl):
diagnostics_contents = [
Clusters.SoftwareDiagnostics.Attributes.CurrentHeapHighWatermark,
Clusters.SoftwareDiagnostics.Attributes.CurrentHeapUsed,
]
diagnostics_paths = [(0, attrib) for attrib in diagnostics_contents]
swdiag_info = await dev_ctrl.ReadAttribute(self.dut_node_id, diagnostics_paths)
# Make sure everything came back from the read that we expected
asserts.assert_true(0 in swdiag_info.keys(), "Must have read endpoint 0 data")
asserts.assert_true(Clusters.SoftwareDiagnostics in swdiag_info[0].keys(
), "Must have read Software Diagnostics cluster data")
for attribute in diagnostics_contents:
asserts.assert_true(attribute in swdiag_info[0][Clusters.SoftwareDiagnostics],
"Must have read back attribute %s" % (attribute.__name__))
high_watermark = swdiag_info[0][Clusters.SoftwareDiagnostics][
Clusters.SoftwareDiagnostics.Attributes.CurrentHeapHighWatermark]
current_usage = swdiag_info[0][Clusters.SoftwareDiagnostics][
Clusters.SoftwareDiagnostics.Attributes.CurrentHeapUsed]
return high_watermark, current_usage
if __name__ == "__main__":
default_matter_test_main(maximize_cert_chains=True, controller_cat_tags=[0x0001_0001])