27
27
28
28
import chip .clusters as Clusters
29
29
from chip .clusters .Types import NullValue
30
- from matter_testing_support import MatterBaseTest , async_test_body , default_matter_test_main , utc_time_in_matter_epoch
30
+ from matter_testing_support import MatterBaseTest , default_matter_test_main , utc_time_in_matter_epoch , per_endpoint_test , has_cluster , has_attribute
31
31
from mobly import asserts
32
32
33
33
34
34
class TC_TIMESYNC_2_1 (MatterBaseTest ):
35
- async def read_ts_attribute_expect_success (self , endpoint , attribute ):
35
+ async def read_ts_attribute_expect_success (self , attribute ):
36
36
cluster = Clusters .Objects .TimeSynchronization
37
- return await self .read_single_attribute_check_success (endpoint = endpoint , cluster = cluster , attribute = attribute )
37
+ return await self .read_single_attribute_check_success (endpoint = None , cluster = cluster , attribute = attribute )
38
38
39
- def pics_TC_TIMESYNC_2_1 (self ) -> list [str ]:
40
- return ["TIMESYNC.S" ]
41
-
42
- @async_test_body
39
+ @per_endpoint_test (has_cluster (Clusters .TimeSynchronization ) and has_attribute (Clusters .TimeSynchronization .Attributes .TimeSource ))
43
40
async def test_TC_TIMESYNC_2_1 (self ):
44
- endpoint = 0
45
-
46
- features = await self .read_single_attribute (dev_ctrl = self .default_controller , node_id = self .dut_node_id ,
47
- endpoint = endpoint , attribute = Clusters .TimeSynchronization .Attributes .FeatureMap )
41
+ attributes = Clusters .TimeSynchronization .Attributes
42
+ features = await self .read_ts_attribute_expect_success (attribute = attributes .FeatureMap )
48
43
49
44
self .supports_time_zone = bool (features & Clusters .TimeSynchronization .Bitmaps .Feature .kTimeZone )
50
45
self .supports_ntpc = bool (features & Clusters .TimeSynchronization .Bitmaps .Feature .kNTPClient )
51
46
self .supports_ntps = bool (features & Clusters .TimeSynchronization .Bitmaps .Feature .kNTPServer )
52
47
self .supports_trusted_time_source = bool (features & Clusters .TimeSynchronization .Bitmaps .Feature .kTimeSyncClient )
53
48
54
- time_cluster = Clusters .TimeSynchronization
55
- timesync_attr_list = time_cluster .Attributes .AttributeList
56
- attribute_list = await self .read_single_attribute_check_success (endpoint = endpoint , cluster = time_cluster , attribute = timesync_attr_list )
57
- timesource_attr_id = time_cluster .Attributes .TimeSource .attribute_id
49
+ timesync_attr_list = attributes .AttributeList
50
+ attribute_list = await self .read_ts_attribute_expect_success (attribute = timesync_attr_list )
51
+ timesource_attr_id = attributes .TimeSource .attribute_id
58
52
59
53
self .print_step (1 , "Commissioning, already done" )
60
- attributes = Clusters .TimeSynchronization .Attributes
61
54
62
55
self .print_step (2 , "Read Granularity attribute" )
63
- granularity_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .Granularity )
56
+ granularity_dut = await self .read_ts_attribute_expect_success (attribute = attributes .Granularity )
64
57
asserts .assert_less (granularity_dut , Clusters .TimeSynchronization .Enums .GranularityEnum .kUnknownEnumValue ,
65
58
"Granularity is not in valid range" )
66
59
67
60
self .print_step (3 , "Read TimeSource" )
68
61
if timesource_attr_id in attribute_list :
69
- time_source = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .TimeSource )
62
+ time_source = await self .read_ts_attribute_expect_success (attribute = attributes .TimeSource )
70
63
asserts .assert_less (time_source , Clusters .TimeSynchronization .Enums .TimeSourceEnum .kUnknownEnumValue ,
71
64
"TimeSource is not in valid range" )
72
65
73
66
self .print_step (4 , "Read TrustedTimeSource" )
74
67
if self .supports_trusted_time_source :
75
- trusted_time_source = await self .read_ts_attribute_expect_success (endpoint = endpoint ,
76
- attribute = attributes .TrustedTimeSource )
68
+ trusted_time_source = await self .read_ts_attribute_expect_success (attribute = attributes .TrustedTimeSource )
77
69
if trusted_time_source is not NullValue :
78
70
asserts .assert_less_equal (trusted_time_source .fabricIndex , 0xFE ,
79
71
"FabricIndex for the TrustedTimeSource is out of range" )
@@ -82,7 +74,7 @@ async def test_TC_TIMESYNC_2_1(self):
82
74
83
75
self .print_step (5 , "Read DefaultNTP" )
84
76
if self .supports_ntpc :
85
- default_ntp = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .DefaultNTP )
77
+ default_ntp = await self .read_ts_attribute_expect_success (attribute = attributes .DefaultNTP )
86
78
if default_ntp is not NullValue :
87
79
asserts .assert_less_equal (len (default_ntp ), 128 , "DefaultNTP length must be less than 128" )
88
80
# Assume this is a valid web address if it has at least one . in the name
@@ -97,7 +89,7 @@ async def test_TC_TIMESYNC_2_1(self):
97
89
98
90
self .print_step (6 , "Read TimeZone" )
99
91
if self .supports_time_zone :
100
- tz_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .TimeZone )
92
+ tz_dut = await self .read_ts_attribute_expect_success (attribute = attributes .TimeZone )
101
93
asserts .assert_greater_equal (len (tz_dut ), 1 , "TimeZone must have at least one entry in the list" )
102
94
asserts .assert_less_equal (len (tz_dut ), 2 , "TimeZone may have a maximum of two entries in the list" )
103
95
for entry in tz_dut :
@@ -112,7 +104,7 @@ async def test_TC_TIMESYNC_2_1(self):
112
104
113
105
self .print_step (7 , "Read DSTOffset" )
114
106
if self .supports_time_zone :
115
- dst_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .DSTOffset )
107
+ dst_dut = await self .read_ts_attribute_expect_success (attribute = attributes .DSTOffset )
116
108
last_valid_until = - 1
117
109
last_valid_starting = - 1
118
110
for dst in dst_dut :
@@ -126,7 +118,7 @@ async def test_TC_TIMESYNC_2_1(self):
126
118
asserts .assert_equal (dst , dst_dut [- 1 ], "DSTOffset list must have Null ValidUntil at the end" )
127
119
128
120
self .print_step (8 , "Read UTCTime" )
129
- utc_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .UTCTime )
121
+ utc_dut = await self .read_ts_attribute_expect_success (attribute = attributes .UTCTime )
130
122
if utc_dut is NullValue :
131
123
asserts .assert_equal (granularity_dut , Clusters .TimeSynchronization .Enums .GranularityEnum .kNoTimeGranularity )
132
124
else :
@@ -141,8 +133,8 @@ async def test_TC_TIMESYNC_2_1(self):
141
133
142
134
self .print_step (9 , "Read LocalTime" )
143
135
if self .supports_time_zone :
144
- utc_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .UTCTime )
145
- local_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .LocalTime )
136
+ utc_dut = await self .read_ts_attribute_expect_success (attribute = attributes .UTCTime )
137
+ local_dut = await self .read_ts_attribute_expect_success (attribute = attributes .LocalTime )
146
138
if utc_dut is NullValue :
147
139
asserts .assert_true (local_dut is NullValue , "LocalTime must be Null if UTC time is Null" )
148
140
elif len (dst_dut ) == 0 :
@@ -156,30 +148,30 @@ async def test_TC_TIMESYNC_2_1(self):
156
148
157
149
self .print_step (10 , "Read TimeZoneDatabase" )
158
150
if self .supports_time_zone :
159
- tz_db_dut = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .TimeZoneDatabase )
151
+ tz_db_dut = await self .read_ts_attribute_expect_success (attribute = attributes .TimeZoneDatabase )
160
152
asserts .assert_less (tz_db_dut , Clusters .TimeSynchronization .Enums .TimeZoneDatabaseEnum .kUnknownEnumValue ,
161
153
"TimeZoneDatabase is not in valid range" )
162
154
163
155
self .print_step (11 , "Read NTPServerAvailable" )
164
156
if self .supports_ntps :
165
157
# bool typechecking happens in the test read functions, so all we need to do here is do the read
166
- await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .NTPServerAvailable )
158
+ await self .read_ts_attribute_expect_success (attribute = attributes .NTPServerAvailable )
167
159
168
160
self .print_step (12 , "Read TimeZoneListMaxSize" )
169
161
if self .supports_time_zone :
170
- size = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .TimeZoneListMaxSize )
162
+ size = await self .read_ts_attribute_expect_success (attribute = attributes .TimeZoneListMaxSize )
171
163
asserts .assert_greater_equal (size , 1 , "TimeZoneListMaxSize must be at least 1" )
172
164
asserts .assert_less_equal (size , 2 , "TimeZoneListMaxSize must be max 2" )
173
165
174
166
self .print_step (13 , "Read DSTOffsetListMaxSize" )
175
167
if self .supports_time_zone :
176
- size = await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .DSTOffsetListMaxSize )
168
+ size = await self .read_ts_attribute_expect_success (attribute = attributes .DSTOffsetListMaxSize )
177
169
asserts .assert_greater_equal (size , 1 , "DSTOffsetListMaxSize must be at least 1" )
178
170
179
171
self .print_step (14 , "Read SupportsDNSResolve" )
180
172
# bool typechecking happens in the test read functions, so all we need to do here is do the read
181
173
if self .supports_ntpc :
182
- await self .read_ts_attribute_expect_success (endpoint = endpoint , attribute = attributes .SupportsDNSResolve )
174
+ await self .read_ts_attribute_expect_success (attribute = attributes .SupportsDNSResolve )
183
175
184
176
185
177
if __name__ == "__main__" :
0 commit comments