forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.py
1024 lines (825 loc) · 45.4 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import queue
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, IntEnum
import chip.interaction_model
import chip.yaml.format_converter as Converter
import stringcase
from chip.ChipDeviceCtrl import ChipDeviceController, discovery
from chip.clusters import ClusterObjects
from chip.clusters.Attribute import (AttributeStatus, EventReadResult, SubscriptionTransaction, TypedAttributePath,
ValueDecodeFailure)
from chip.exceptions import ChipStackError
from chip.yaml.data_model_lookup import DataModelLookup
from chip.yaml.errors import ActionCreationError, UnexpectedActionCreationError
from matter_yamltests.pseudo_clusters.pseudo_clusters import get_default_pseudo_clusters
from .data_model_lookup import PreDefinedDataModelLookup
_PSEUDO_CLUSTERS = get_default_pseudo_clusters()
logger = logging.getLogger('YamlParser')
class _ActionStatus(Enum):
SUCCESS = 'success',
ERROR = 'error'
class _TestFabricId(IntEnum):
ALPHA = 1,
BETA = 2,
GAMMA = 3
@dataclass
class _GetCommissionerNodeIdResult:
node_id: int
@dataclass
class EventResponse:
event_result_list: list[EventReadResult]
@dataclass
class _ActionResult:
status: _ActionStatus
response: object
@dataclass
class _AttributeSubscriptionCallbackResult:
name: str
attribute_path: TypedAttributePath
result: _ActionResult
@dataclass
class _EventSubscriptionCallbackResult:
name: str
result: _ActionResult
@dataclass
class _ExecutionContext:
''' Objects that is commonly passed around this file that are vital to test execution.'''
# Data model lookup to get python attribute, cluster, command object.
data_model_lookup: DataModelLookup = None
# List of subscriptions.
subscriptions: list = field(default_factory=list)
# The key is the attribute/event name, and the value is a queue of subscription callback results
# that been sent by device under test. For attribute subscription the queue is of type
# _AttributeSubscriptionCallbackResult, for event the queue is of type
# _EventSubscriptionCallbackResult.
subscription_callback_result_queue: dict = field(default_factory=dict)
class BaseAction(ABC):
'''Interface for a single YAML action that is to be executed.'''
def __init__(self, test_step):
self._label = test_step.label
self._identity = test_step.identity
self._pics_enabled = test_step.is_pics_enabled
@property
def label(self):
return self._label
@property
def identity(self):
return self._identity
@property
def pics_enabled(self):
return self._pics_enabled
@abstractmethod
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
pass
class DefaultPseudoCluster(BaseAction):
def __init__(self, test_step):
super().__init__(test_step)
self._test_step = test_step
if not _PSEUDO_CLUSTERS.supports(test_step):
raise ActionCreationError(f'Default cluster {test_step.cluster} {test_step.command}, not supported')
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
response = await _PSEUDO_CLUSTERS.execute(self._test_step)
return _ActionResult(status=_ActionStatus.SUCCESS, response=response[0])
class InvokeAction(BaseAction):
'''Single invoke action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to invoke command action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run invoke command action.
'cluster': Name of cluster which to invoke action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ActionCreationError: Raised if there is a benign error. This occurs when we
cannot find the action to invoke for the provided cluster. When this happens
it is expected that the action to invoke and the provided cluster is an action
to be invoked on a pseudo cluster.
UnexpectedActionCreationError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._busy_wait_ms = test_step.busy_wait_ms
self._command_name = stringcase.pascalcase(test_step.command)
self._cluster = cluster
self._interation_timeout_ms = test_step.timed_interaction_timeout_ms
self._request_object = None
self._expected_response_object = None
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._group_id = test_step.group_id
if self._node_id is None and self._group_id is None:
raise UnexpectedActionCreationError(
'Both node_id and group_id are None, at least one needs to be provided')
command = context.data_model_lookup.get_command(self._cluster, self._command_name)
if command is None:
# If we have not found a command it could me that it is a pseudo cluster command.
raise ActionCreationError(
f'Failed to find cluster:{self._cluster} Command:{self._command_name}')
command_object = command()
if (test_step.arguments):
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
try:
request_data = Converter.convert_to_data_model_type(
request_data_as_dict, type(command_object))
except ValueError:
raise UnexpectedActionCreationError('Could not covert yaml type')
self._request_object = command_object.FromDict(request_data)
else:
self._request_object = command_object
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
if self._group_id:
resp = dev_ctrl.SendGroupCommand(
self._group_id, self._request_object,
busyWaitMs=self._busy_wait_ms)
else:
resp = await dev_ctrl.SendCommand(
self._node_id, self._endpoint, self._request_object,
timedRequestTimeoutMs=self._interation_timeout_ms,
busyWaitMs=self._busy_wait_ms)
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
# Commands with no response give a None response. In those cases we return a success
return _ActionResult(status=_ActionStatus.SUCCESS, response=resp)
class ReadAttributeAction(BaseAction):
'''Single read attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to read attribute action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run read attribute action.
'cluster': Name of cluster read attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedActionCreationError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._cluster = cluster
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._cluster_object = None
self._request_object = None
self._fabric_filtered = True
if test_step.fabric_filtered is not None:
self._fabric_filtered = test_step.fabric_filtered
self._cluster_object = context.data_model_lookup.get_cluster(self._cluster)
if self._cluster_object is None:
raise UnexpectedActionCreationError(
f'ReadAttribute failed to find cluster object:{self._cluster}')
self._request_object = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if self._request_object is None:
raise UnexpectedActionCreationError(
f'ReadAttribute failed to find cluster:{self._cluster} '
f'Attribute:{self._attribute_name}')
if test_step.arguments:
raise UnexpectedActionCreationError(
f'ReadAttribute should not contain arguments. {self.label}')
if self._request_object.attribute_type is None:
raise UnexpectedActionCreationError(
f'ReadAttribute doesnt have valid attribute_type. {self.label}')
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
raw_resp = await dev_ctrl.ReadAttribute(self._node_id,
[(self._endpoint, self._request_object)],
fabricFiltered=self._fabric_filtered)
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
except ChipStackError as error:
_CHIP_TIMEOUT_ERROR = 50
if error.err == _CHIP_TIMEOUT_ERROR:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
# For now it is unsure if all ChipStackError are supposed to be intentional.
# As a result we simply re-raise the error.
raise error
return self.parse_raw_response(raw_resp)
def parse_raw_response(self, raw_resp) -> _ActionResult:
# TODO Currently there are no checks that this indexing won't fail. Need to add some
# initial validity checks. Coming soon in a future PR.
resp = raw_resp[self._endpoint][self._cluster_object][self._request_object]
if isinstance(resp, ValueDecodeFailure):
# response.Reason is of type chip.interaction_model.Status.
return _ActionResult(status=_ActionStatus.ERROR, response=resp.Reason)
# decode() is expecting to get a DataModelLookup Object type to grab certain attributes
# like cluster id.
return_val = self._request_object(resp)
return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)
class ReadEventAction(BaseAction):
''' Read Event action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to read event action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run read event action.
'cluster': Name of cluster read event action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedActionCreationError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._event_name = stringcase.pascalcase(test_step.event)
self._cluster = cluster
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._cluster_object = None
self._request_object = None
self._event_number_filter = test_step.event_number
self._fabric_filtered = False
if test_step.fabric_filtered is not None:
self._fabric_filtered = test_step.fabric_filtered
self._request_object = context.data_model_lookup.get_event(self._cluster,
self._event_name)
if self._request_object is None:
raise UnexpectedActionCreationError(
f'ReadEvent failed to find cluster:{self._cluster} Event:{self._event_name}')
if test_step.arguments:
raise UnexpectedActionCreationError(
f'ReadEvent should not contain arguments. {self.label}')
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
urgent = 0
request = [(self._endpoint, self._request_object, urgent)]
resp = await dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter,
fabricFiltered=self._fabric_filtered)
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
parsed_resp = EventResponse(event_result_list=resp)
return _ActionResult(status=_ActionStatus.SUCCESS, response=parsed_resp)
class WaitForCommissioneeAction(BaseAction):
''' Wait for commissionee action to be executed.'''
def __init__(self, test_step):
super().__init__(test_step)
self._expire_existing_session = False
# This is the default when no timeout is provided.
_DEFAULT_TIMEOUT_MS = 10 * 1000
self._timeout_ms = _DEFAULT_TIMEOUT_MS
if test_step.arguments is None:
# Nothing left for us to do the default values are what we want
return
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
# There's a chance the commissionee may have rebooted before this call here as part of a
# test flow or is just starting out fresh outright. Unless expireExistingSession is
# explicitly set, the default behaviour it to make sure we're not re-using any cached CASE
# sessions that will now be stale and mismatched with the peer, causing subsequent
# interactions to fail.
self._expire_existing_session = request_data_as_dict.get('expireExistingSession', True)
self._node_id = request_data_as_dict['nodeId']
if 'timeout' in request_data_as_dict:
# Timeout is provided in seconds we need to conver to milliseconds.
self._timeout_ms = request_data_as_dict['timeout'] * 1000
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
if self._expire_existing_session:
dev_ctrl.ExpireSessions(self._node_id)
dev_ctrl.GetConnectedDeviceSync(self._node_id, timeoutMs=self._timeout_ms)
except TimeoutError:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
class AttributeChangeAccumulator:
def __init__(self, name: str, expected_attribute: ClusterObjects.ClusterAttributeDescriptor,
output_queue: queue.SimpleQueue):
self._name = name
self._expected_attribute = expected_attribute
self._output_queue = output_queue
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
data = transaction.GetAttribute(path)
result = _ActionResult(status=_ActionStatus.SUCCESS, response=path.AttributeType(data))
item = _AttributeSubscriptionCallbackResult(self._name, path, result)
logging.debug(
f'Got subscription report on client {self.name} for {path.AttributeType}: {data}')
self._output_queue.put(item)
@property
def name(self) -> str:
return self._name
class EventChangeAccumulator:
def __init__(self, name: str, expected_event, output_queue: queue.SimpleQueue):
self._name = name
self._expected_event = expected_event
self._output_queue = output_queue
def __call__(self, event_result: EventReadResult, transaction: SubscriptionTransaction):
if (self._expected_event.cluster_id == event_result.Header.ClusterId and
self._expected_event.event_id == event_result.Header.EventId):
event_response = EventResponse(event_result_list=[event_result])
result = _ActionResult(status=_ActionStatus.SUCCESS, response=event_response)
item = _EventSubscriptionCallbackResult(self._name, result)
logging.debug(f'Got subscription report on client {self.name}')
self._output_queue.put(item)
@property
def name(self) -> str:
return self._name
class SubscribeAttributeAction(ReadAttributeAction):
'''Single subscribe attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe attribute action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedActionCreationError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedActionCreationError(
f'SubscribeAttribute action does not have min_interval {self.label}')
self._min_interval = test_step.min_interval
if test_step.max_interval is None:
raise UnexpectedActionCreationError(
f'SubscribeAttribute action does not have max_interval {self.label}')
self._max_interval = test_step.max_interval
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
subscription = await dev_ctrl.ReadAttribute(self._node_id, [(self._endpoint, self._request_object)],
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False)
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._attribute_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._attribute_name] = output_queue
while not output_queue.empty():
output_queue.get(block=False)
subscription_handler = AttributeChangeAccumulator(self.label, self._request_object,
output_queue)
subscription.SetAttributeUpdateCallback(subscription_handler)
raw_resp = subscription.GetAttributes()
return self.parse_raw_response(raw_resp)
class SubscribeEventAction(ReadEventAction):
'''Single subscribe event action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe event action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run subscribe event action.
'cluster': Name of cluster subscribe event action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedActionCreationError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedActionCreationError(
f'SubscribeEvent action does not have min_interval {self.label}')
self._min_interval = test_step.min_interval
if test_step.max_interval is None:
raise UnexpectedActionCreationError(
f'SubscribeEvent action does not have max_interval {self.label}')
self._max_interval = test_step.max_interval
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
urgent = 0
request = [(self._endpoint, self._request_object, urgent)]
subscription = await dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter,
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False)
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._event_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._event_name] = output_queue
while not output_queue.empty():
output_queue.get(block=False)
subscription_handler = EventChangeAccumulator(self.label, self._request_object, output_queue)
subscription.SetEventUpdateCallback(subscription_handler)
events = subscription.GetEvents()
response = EventResponse(event_result_list=events)
return _ActionResult(status=_ActionStatus.SUCCESS, response=response)
class WriteAttributeAction(BaseAction):
'''Single write attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to write attribute action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedActionCreationError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._busy_wait_ms = test_step.busy_wait_ms
self._cluster = cluster
self._endpoint = test_step.endpoint
self._interation_timeout_ms = test_step.timed_interaction_timeout_ms
self._node_id = test_step.node_id
self._group_id = test_step.group_id
self._request_object = None
if self._node_id is None and self._group_id is None:
raise UnexpectedActionCreationError(
'Both node_id and group_id are None, at least one needs to be provided')
attribute = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if attribute is None:
raise UnexpectedActionCreationError(
f'WriteAttribute failed to find cluster:{self._cluster} '
f'Attribute:{self._attribute_name}')
if not test_step.arguments:
raise UnexpectedActionCreationError(f'WriteAttribute action does have arguments {self.label}')
args = test_step.arguments['values']
if len(args) != 1:
raise UnexpectedActionCreationError('WriteAttribute is trying to write multiple values')
request_data_as_dict = args[0]
try:
# TODO this is an ugly hack
request_data = Converter.convert_to_data_model_type(
request_data_as_dict['value'], attribute.attribute_type.Type)
except ValueError:
raise UnexpectedActionCreationError('Could not covert yaml type')
# Create a cluster object for the request from the provided YAML data.
self._request_object = attribute(request_data)
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
if self._group_id:
resp = dev_ctrl.WriteGroupAttribute(self._group_id, [(self._request_object,)],
busyWaitMs=self._busy_wait_ms)
else:
resp = await dev_ctrl.WriteAttribute(self._node_id, [(self._endpoint, self._request_object)],
timedRequestTimeoutMs=self._interation_timeout_ms,
busyWaitMs=self._busy_wait_ms)
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
# Group writes are expected to have no response upon success.
if self._group_id and len(resp) == 0:
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
if len(resp) == 1 and isinstance(resp[0], AttributeStatus):
if resp[0].Status == chip.interaction_model.Status.Success:
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
else:
return _ActionResult(status=_ActionStatus.ERROR, response=resp[0].Status)
# We always expecte the response to be a list of length 1, for that reason we return error
# here.
return _ActionResult(status=_ActionStatus.ERROR, response=None)
class WaitForReportAction(BaseAction):
'''Single WaitForReport action to be executed.'''
def __init__(self, test_step, context: _ExecutionContext):
'''Converts 'test_step' to wait for report action.
Args:
'test_step': Step containing information required to run wait for report action.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedActionCreationError: Raised if the expected queue does not exist.
'''
super().__init__(test_step)
if test_step.attribute is not None:
queue_name = stringcase.pascalcase(test_step.attribute)
elif test_step.event is not None:
queue_name = stringcase.pascalcase(test_step.event)
else:
raise UnexpectedActionCreationError(
'WaitForReport needs to wait on either attribute or event, neither were provided')
self._output_queue = context.subscription_callback_result_queue.get(queue_name, None)
if self._output_queue is None:
raise UnexpectedActionCreationError('Could not find output queue')
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
# While there should be a timeout here provided by the test, the current codegen version
# of YAML tests doesn't have a per test step timeout, only a global timeout for the
# entire test. For that reason we default to a 30 second timeout.
item = self._output_queue.get(block=True, timeout=30)
except queue.Empty:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
if isinstance(item, _AttributeSubscriptionCallbackResult):
return item.result
return item.result
class CommissionerCommandAction(BaseAction):
'''Single Commissioner Command action to be executed.'''
def __init__(self, test_step):
'''Converts 'test_step' to commissioner command action.
Args:
'test_step': Step containing information required to run wait for report action.
Raises:
UnexpectedActionCreationError: Raised if the expected queue does not exist.
'''
super().__init__(test_step)
self._command = test_step.command
if test_step.command == 'GetCommissionerNodeId':
# Just setting the self._command is enough for run_action below.
pass
elif test_step.command == 'PairWithCode':
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
self._setup_payload = request_data_as_dict['payload']
self._node_id = request_data_as_dict['nodeId']
else:
raise UnexpectedActionCreationError(f'Unexpected CommisionerCommand {test_step.command}')
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
if self._command == 'GetCommissionerNodeId':
return _ActionResult(status=_ActionStatus.SUCCESS, response=_GetCommissionerNodeIdResult(dev_ctrl.nodeId))
try:
await dev_ctrl.CommissionWithCode(self._setup_payload, self._node_id)
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
except ChipStackError:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
class DiscoveryCommandAction(BaseAction):
"""DiscoveryCommand implementation (FindCommissionable* methods)."""
@staticmethod
def _filter_for_step(test_step) -> (discovery.FilterType, any):
"""Given a test step, figure out the correct filters to give to
DiscoverCommissionableNodes.
"""
if test_step.command == 'FindCommissionable':
return discovery.FilterType.NONE, None
if test_step.command == 'FindCommissionableByCommissioningMode':
# this is just a "_CM" subtype
return discovery.FilterType.COMMISSIONING_MODE, None
# all the items below require a "value" to use for filtering
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
filter = request_data_as_dict['value']
if test_step.command == 'FindCommissionableByDeviceType':
return discovery.FilterType.DEVICE_TYPE, filter
if test_step.command == 'FindCommissionableByLongDiscriminator':
return discovery.FilterType.LONG_DISCRIMINATOR, filter
if test_step.command == 'FindCommissionableByShortDiscriminator':
return discovery.FilterType.SHORT_DISCRIMINATOR, filter
if test_step.command == 'FindCommissionableByVendorId':
return discovery.FilterType.VENDOR_ID, filter
raise UnexpectedActionCreationError(f'Invalid command: {test_step.command}')
def __init__(self, test_step):
super().__init__(test_step)
self.filterType, self.filter = DiscoveryCommandAction._filter_for_step(test_step)
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
devices = await dev_ctrl.DiscoverCommissionableNodes(
filterType=self.filterType, filter=self.filter, stopOnFirst=True, timeoutSecond=5)
# Devices will be a list: [CommissionableNode(), ...]
logging.info("Discovered devices: %r" % devices)
if not devices:
logging.error("No devices found")
return _ActionResult(status=_ActionStatus.ERROR, response="NO DEVICES FOUND")
elif len(devices) > 1:
logging.warning("Commissionable discovery found multiple results!")
return _ActionResult(status=_ActionStatus.SUCCESS, response=devices[0])
class NotImplementedAction(BaseAction):
"""Raises a "NOT YET IMPLEMENTED" exception when run."""
def __init__(self, test_step, cluster, command):
super().__init__(test_step)
self.cluster = cluster
self.command = command
async def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
raise Exception(f"NOT YET IMPLEMENTED: {self.cluster}::{self.command}")
class ReplTestRunner:
'''Test runner to encode/decode values from YAML test Parser for executing the TestStep.
Uses ChipDeviceController from chip-repl to execute parsed YAML TestSteps.
'''
def __init__(self, test_spec_definition, certificate_authority_manager, alpha_dev_ctrl):
self._test_spec_definition = test_spec_definition
self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup())
self._certificate_authority_manager = certificate_authority_manager
self._dev_ctrls = {}
if alpha_dev_ctrl is not None:
alpha_dev_ctrl.InitGroupTestingData()
self._dev_ctrls['alpha'] = alpha_dev_ctrl
def _invoke_action_factory(self, test_step, cluster: str):
'''Creates cluster invoke action command from TestStep.
Args:
'test_step': Step containing information required to run an invoke command action.
'cluster': Name of cluster action is targeting.
Returns:
InvokeAction if 'test_step' is a valid action to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return InvokeAction(test_step, cluster, self._context)
except ActionCreationError:
return None
def _attribute_read_action_factory(self, test_step, cluster: str):
'''Creates read attribute command TestStep.
Args:
'test_step': Step containing information required to run read attribute action.
'cluster': Name of cluster read attribute action is targeting.
Returns:
ReadAttributeAction if 'test_step' is a valid read attribute to be executed.
'''
return ReadAttributeAction(test_step, cluster, self._context)
def _event_read_action_factory(self, test_step, cluster: str):
return ReadEventAction(test_step, cluster, self._context)
def _attribute_subscribe_action_factory(self, test_step, cluster: str):
'''Creates subscribe attribute command from TestStep provided.
Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeAttributeAction if 'test_step' is a valid subscribe attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
return SubscribeAttributeAction(test_step, cluster, self._context)
def _attribute_subscribe_event_factory(self, test_step, cluster: str):
'''Creates subscribe event command from TestStep provided.
Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeEventAction if 'test_step' is a valid subscribe attribute to be executed.
'''
return SubscribeEventAction(test_step, cluster, self._context)
def _attribute_write_action_factory(self, test_step, cluster: str):
'''Creates write attribute command TestStep.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
WriteAttributeAction if 'test_step' is a valid write attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
return WriteAttributeAction(test_step, cluster, self._context)
def _wait_for_commissionee_action_factory(self, test_step):
return WaitForCommissioneeAction(test_step)
def _wait_for_report_action_factory(self, test_step):
return WaitForReportAction(test_step, self._context)
def _commissioner_command_action_factory(self, test_step):
return CommissionerCommandAction(test_step)
def _default_pseudo_cluster(self, test_step):
try:
return DefaultPseudoCluster(test_step)
except ActionCreationError:
return None
def encode(self, request) -> BaseAction:
action = None
cluster = request.cluster.replace(' ', '').replace('/', '').replace('.', '')
command = request.command
if cluster == 'CommissionerCommands':
return self._commissioner_command_action_factory(request)
# Some of the tests contain 'cluster over-rides' that refer to a different
# cluster than that specified in 'config'.
elif cluster == 'DiscoveryCommands':
return DiscoveryCommandAction(request)
elif cluster == 'DelayCommands' and command == 'WaitForCommissionee':
action = self._wait_for_commissionee_action_factory(request)
elif command == 'writeAttribute':
action = self._attribute_write_action_factory(request, cluster)
elif command == 'readAttribute':
action = self._attribute_read_action_factory(request, cluster)
elif command == 'readEvent':
action = self._event_read_action_factory(request, cluster)
elif command == 'subscribeAttribute':
action = self._attribute_subscribe_action_factory(request, cluster)
elif command == 'subscribeEvent':
action = self._attribute_subscribe_event_factory(request, cluster)
elif command == 'waitForReport':
action = self._wait_for_report_action_factory(request)
else:
action = self._invoke_action_factory(request, cluster)
if action is None:
# Now we try to create a default pseudo cluster.
action = self._default_pseudo_cluster(request)
if action is None:
logger.warn(f"Failed to parse {request.label}")
return action
def decode(self, result: _ActionResult):
# If this is a generic response, there is nothing to do.
if result.response is None:
# TODO Once yamltest and idl python packages are properly packaged as a single module
# the type we are returning will be formalized. For now TestStep.post_process_response
# expects this particular case to be sent an empty dict or a dict with an error.
return {} if result.status == _ActionStatus.SUCCESS else {'error': 'FAILURE'}
response = result.response
decoded_response = {}
if isinstance(response, dict):
return response
if isinstance(response, chip.interaction_model.InteractionModelError):
decoded_response['error'] = stringcase.snakecase(response.status.name).upper()
decoded_response['clusterError'] = response.clusterStatus
return decoded_response
if isinstance(response, chip.interaction_model.Status):
decoded_response['error'] = stringcase.snakecase(response.name).upper()
return decoded_response
if isinstance(response, _GetCommissionerNodeIdResult):
decoded_response['value'] = {'nodeId': response.node_id}
return decoded_response
if isinstance(response, chip.discovery.CommissionableNode):
decoded_response['value'] = {
'instanceName': response.instanceName,
'hostName': response.hostName,
'port': response.port,
'longDiscriminator': response.longDiscriminator,
'vendorId': response.vendorId,
'productId': response.productId,
'commissioningMode': response.commissioningMode,
'deviceType': response.deviceType,
'deviceName': response.deviceName,
'pairingInstruction': response.pairingInstruction,
'pairingHint': response.pairingHint,
'mrpRetryIntervalIdle': response.mrpRetryIntervalIdle,
'mrpRetryIntervalActive': response.mrpRetryIntervalActive,
'mrpRetryActiveThreshold': response.mrpRetryActiveThreshold,
'supportsTcp': response.supportsTcp,
'isICDOperatingAsLIT': response.isICDOperatingAsLIT,
'addresses': response.addresses,
'rotatingId': response.rotatingId,
# derived values
'rotatingIdLen': 0 if not response.rotatingId else len(response.rotatingId),
'numIPs': len(response.addresses),
}
return decoded_response
if isinstance(response, EventResponse):
if not response.event_result_list:
# This means that the event result we got back was empty, below is how we
# represent this.
decoded_response = [{}]
return decoded_response
decoded_response = []
for event in response.event_result_list:
if event.Status != chip.interaction_model.Status.Success:
error_message = stringcase.snakecase(event.Status.name).upper()
decoded_response.append({'error': error_message})
continue
cluster_id = event.Header.ClusterId
cluster_name = self._test_spec_definition.get_cluster_name(cluster_id)
event_id = event.Header.EventId
event_number = event.Header.EventNumber
event_name = self._test_spec_definition.get_event_name(cluster_id, event_id)
event_definition = self._test_spec_definition.get_event_by_name(cluster_name, event_name)
is_fabric_scoped = bool(event_definition.is_fabric_sensitive)
decoded_event = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, event_definition.fields, event.Data, is_fabric_scoped)
decoded_response.append({'value': decoded_event, 'eventNumber': event_number})
return decoded_response
if isinstance(response, ChipStackError):
decoded_response['error'] = 'FAILURE'
return decoded_response
cluster_name = self._test_spec_definition.get_cluster_name(response.cluster_id)
if cluster_name is None:
raise Exception("Cannot find cluster name for id 0x%0X / %d" % (response.cluster_id, response.cluster_id))
decoded_response['clusterId'] = cluster_name
if hasattr(response, 'command_id'):
decoded_response['command'] = self._test_spec_definition.get_response_name(
response.cluster_id, response.command_id)
response_definition = self._test_spec_definition.get_response_by_name(
cluster_name, decoded_response['command'])
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, response_definition.fields, response)
if hasattr(response, 'attribute_id'):
decoded_response['attribute'] = self._test_spec_definition.get_attribute_name(
response.cluster_id, response.attribute_id)
attribute = self._test_spec_definition.get_attribute_by_name(
cluster_name, decoded_response['attribute'])
# TODO Once we fix the issue of not being able to find the global attribute properly
# we should be able to remove this if/else statement below.
if attribute is None:
# When we cannot find the attribute it is because it is a global attribute like
# FeatureMap. Fortunately for these types we can get away with using
# 'response.value' directly if it is a list and mapping to int if not a list.
if isinstance(response.value, list):
decoded_response['value'] = response.value
else:
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, int, response.value)
else:
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, attribute.definition, response.value)
return decoded_response
def _get_fabric_id(self, id):
return _TestFabricId[id.upper()].value
def _get_dev_ctrl(self, action: BaseAction):
if action.identity is not None:
dev_ctrl = self._dev_ctrls.get(action.identity, None)
if dev_ctrl is None:
fabric_id = self._get_fabric_id(action.identity)