forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTC_DEMTestBase.py
262 lines (203 loc) · 14.4 KB
/
TC_DEMTestBase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import chip.clusters as Clusters
from chip.interaction_model import InteractionModelError, Status
from chip.testing.utilities import utc_time_in_matter_epoch
from mobly import asserts
logger = logging.getLogger(__name__)
class DEMTestBase:
async def read_dem_attribute_expect_success(self, endpoint: int = None, attribute: str = ""):
cluster = Clusters.Objects.DeviceEnergyManagement
full_attr = getattr(cluster.Attributes, attribute)
logging.info(f"endpoint {endpoint} full_attr {full_attr}")
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=full_attr)
async def check_dem_attribute(self, attribute, expected_value, endpoint: int = None):
value = await self.read_dem_attribute_expect_success(endpoint=endpoint, attribute=attribute)
asserts.assert_equal(value, expected_value,
f"Unexpected '{attribute}' value - expected {expected_value}, was {value}")
async def validate_feature_map(self, must_have_features, must_not_have_features):
feature_map = await self.read_dem_attribute_expect_success(attribute="FeatureMap")
for must_have_feature in must_have_features:
asserts.assert_true(feature_map & must_have_feature,
f"{must_have_feature.name} must be set but is not. feature_map 0x{feature_map:x}")
for must_not_have_feature in must_not_have_features:
asserts.assert_false(feature_map & must_not_have_feature,
f"{must_not_have_feature.name} is not allowed to be set. feature_map 0x{feature_map:x}")
async def validate_pfr_or_sfr_in_feature_map(self):
feature_map = await self.read_dem_attribute_expect_success(attribute="FeatureMap")
illegal_combination = Clusters.DeviceEnergyManagement.Bitmaps.Feature.kPowerForecastReporting | Clusters.DeviceEnergyManagement.Bitmaps.Feature.kStateForecastReporting
asserts.assert_not_equal(feature_map & illegal_combination, illegal_combination,
f"Cannot have kPowerForecastReporting and kStateForecastReporting both set. feature_map 0x{feature_map:x}")
asserts.assert_not_equal(feature_map & illegal_combination, 0,
f"Must have one of kPowerForecastReporting and kStateForecastReporting set. feature_map 0x{feature_map:x}")
async def send_power_adjustment_command(self, power: int, duration: int,
cause: Clusters.Objects.DeviceEnergyManagement.Enums.CauseEnum,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.PowerAdjustRequest(
power=power,
duration=duration,
cause=cause),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_cancel_power_adjustment_command(self, endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.CancelPowerAdjustRequest(),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_start_time_adjust_request_command(self, requestedStartTime: int,
cause: Clusters.Objects.DeviceEnergyManagement.Enums.CauseEnum,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.StartTimeAdjustRequest(
requestedStartTime=requestedStartTime,
cause=cause),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_start_time_adjust_clear_command(self,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.StartTimeAdjustClear(), # StartTimeAdjustmentClear(),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_cancel_request_command(self,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.CancelRequest(),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_pause_request_command(self, duration: int, cause:
Clusters.Objects.DeviceEnergyManagement.Enums.AdjustmentCauseEnum,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.PauseRequest(
duration=duration,
cause=cause),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_resume_request_command(self, endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.ResumeRequest(),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_modify_forecast_request_command(self, forecastID: int,
slotAdjustments: list[Clusters.DeviceEnergyManagement.Structs.SlotAdjustmentStruct],
cause: Clusters.DeviceEnergyManagement.Enums.AdjustmentCauseEnum,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.ModifyForecastRequest(forecastID=forecastID,
slotAdjustments=slotAdjustments,
cause=cause),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
async def send_request_constraint_based_forecast(self, constraintList: list[Clusters.DeviceEnergyManagement.Structs.ConstraintsStruct],
cause: Clusters.DeviceEnergyManagement.Enums.AdjustmentCauseEnum,
endpoint: int = None, timedRequestTimeoutMs: int = 3000,
expected_status: Status = Status.Success):
try:
await self.send_single_cmd(cmd=Clusters.DeviceEnergyManagement.Commands.RequestConstraintBasedForecast(constraints=constraintList,
cause=cause),
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(expected_status, Status.Success)
except InteractionModelError as e:
asserts.assert_equal(e.status, expected_status, "Unexpected error returned")
def print_forecast(self, forecast):
for index, slot in enumerate(forecast.slots):
logging.info(
f" [{index}] MinDuration: {slot.minDuration} MaxDuration: {slot.maxDuration} DefaultDuration: {slot.defaultDuration}")
logging.info(f" ElapseSlotTime: {slot.elapsedSlotTime} RemainingSlotTime: {slot.remainingSlotTime}")
logging.info(
f" SlotIsPausable: {slot.slotIsPausable} MinPauseDuration: {slot.minPauseDuration} MaxPauseDuration: {slot.maxPauseDuration}")
logging.info(f" ManufacturerESAState: {slot.manufacturerESAState}")
logging.info(f" NominalPower: {slot.nominalPower} MinPower: {slot.minPower} MaxPower: {slot.maxPower}")
logging.info(f" MinPowerAdjustment: {slot.minPowerAdjustment} MaxPowerAdjustment: {slot.maxPowerAdjustment}")
logging.info(
f" MinDurationAdjustment: {slot.minDurationAdjustment} MaxDurationAdjustment: {slot.maxDurationAdjustment}")
if slot.costs is not None:
for cost_index, cost in enumerate(slot):
logging.info(
f" Cost: [{cost_index}] CostType:{cost.costType} Value: {cost.value} DecimalPoints: {cost.decimalPoints} Currency: {cost.currency}")
def get_current_utc_time_in_seconds(self):
microseconds_in_second = 1000000
return int(utc_time_in_matter_epoch()/microseconds_in_second)
async def send_test_event_trigger_power_adjustment(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000000)
async def send_test_event_trigger_power_adjustment_clear(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000001)
async def send_test_event_trigger_user_opt_out_local(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000002)
async def send_test_event_trigger_user_opt_out_grid(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000003)
async def send_test_event_trigger_user_opt_out_clear_all(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000004)
async def send_test_event_trigger_start_time_adjustment(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000005)
async def send_test_event_trigger_start_time_adjustment_clear(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000006)
async def send_test_event_trigger_pausable(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000007)
async def send_test_event_trigger_pausable_next_slot(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000008)
async def send_test_event_trigger_pausable_clear(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000009)
async def send_test_event_trigger_forecast_adjustment(self):
await self.send_test_event_triggers(eventTrigger=0x009800000000000A)
async def send_test_event_trigger_forecast_adjustment_next_slot(self):
await self.send_test_event_triggers(eventTrigger=0x009800000000000B)
async def send_test_event_trigger_forecast_adjustment_clear(self):
await self.send_test_event_triggers(eventTrigger=0x009800000000000C)
async def send_test_event_trigger_constraint_based_adjustment(self):
await self.send_test_event_triggers(eventTrigger=0x009800000000000D)
async def send_test_event_trigger_constraint_based_adjustment_clear(self):
await self.send_test_event_triggers(eventTrigger=0x009800000000000E)
async def send_test_event_trigger_forecast(self):
await self.send_test_event_triggers(eventTrigger=0x009800000000000F)
async def send_test_event_trigger_forecast_clear(self):
await self.send_test_event_triggers(eventTrigger=0x0098000000000010)