forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdrlk_2_x_common.py
311 lines (259 loc) · 19 KB
/
drlk_2_x_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import random
import string
import time
import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import type_matches
from mobly import asserts
class DRLK_COMMON:
async def read_drlk_attribute_expect_success(self, attribute):
cluster = Clusters.Objects.DoorLock
return await self.read_single_attribute_check_success(endpoint=self.endpoint, cluster=cluster, attribute=attribute)
async def write_drlk_attribute_expect_success(self, attribute):
cluster = Clusters.Objects.DoorLock
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(self.endpoint, attribute)])
err_msg = "Received error status {} when writing {}:{}".format(str(result[0].Status), str(cluster), str(attribute))
asserts.assert_equal(result[0].Status, Status.Success, err_msg)
async def write_drlk_attribute_expect_error(self, attribute, error):
cluster = Clusters.Objects.DoorLock
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(self.endpoint, attribute)])
err_msg = "Did not see expected error {} when writing {}:{}".format(str(error), str(cluster), str(attribute))
asserts.assert_equal(result[0].Status, error, err_msg)
async def send_drlk_cmd_expect_success(self, command) -> None:
await self.send_single_cmd(cmd=command, endpoint=self.endpoint, timedRequestTimeoutMs=1000)
async def send_drlk_cmd_expect_error(self, command, error: Status) -> None:
try:
await self.send_single_cmd(cmd=command, endpoint=self.endpoint, timedRequestTimeoutMs=1000)
asserts.assert_true(False, "Unexpected command success, command=%s", command)
except InteractionModelError as e:
asserts.assert_equal(e.status, error, "Unexpected error returned")
pass
async def send_set_credential_cmd(self, operationType, credential, credentialData, userIndex, userStatus, userType) -> Clusters.Objects.DoorLock.Commands.SetCredentialResponse:
ret = await self.send_single_cmd(cmd=Clusters.Objects.DoorLock.Commands.SetCredential(operationType=operationType,
credential=credential,
credentialData=credentialData,
userIndex=userIndex,
userStatus=userStatus,
userType=userType),
endpoint=self.endpoint,
timedRequestTimeoutMs=1000)
asserts.assert_true(type_matches(ret, Clusters.Objects.DoorLock.Commands.SetCredentialResponse),
"Unexpected return type for SetCredential")
asserts.assert_true(ret.status == Status.Success, "Error sending SetCredential command, status={}".format(str(ret.status)))
return ret
async def send_clear_credential_cmd(self, credential) -> None:
await self.send_single_cmd(cmd=Clusters.Objects.DoorLock.Commands.ClearCredential(credential=credential),
endpoint=self.endpoint,
timedRequestTimeoutMs=1000)
ret = await self.send_single_cmd(cmd=Clusters.Objects.DoorLock.Commands.GetCredentialStatus(credential=self.createdCredential),
endpoint=self.endpoint)
asserts.assert_true(type_matches(ret, Clusters.Objects.DoorLock.Commands.GetCredentialStatusResponse),
"Unexpected return type for GetCredentialStatus")
asserts.assert_false(ret.credentialExists, "Error clearing Credential (credentialExists==True)")
async def cleanup_users_and_credentials(self):
self.print_step("Cleanup", "Clear created User and Credential on the DUT")
if self.createdCredential:
await self.send_clear_credential_cmd(self.createdCredential)
logging.info("Credential cleared at CredentialIndex %d" % (self.createdCredential.credentialIndex))
self.createdCredential = None
async def generate_pincode(self, maxPincodeLength):
return ''.join(random.choices(string.digits, k=maxPincodeLength))
async def teardown(self):
await self.cleanup_users_and_credentials()
async def run_drlk_test_common(self, lockUnlockCommand, lockUnlockCmdRspPICS, lockUnlockText, doAutoRelockTest):
is_ci = self.check_pics('PICS_SDK_CI_ONLY')
self.createdCredential = None
self.endpoint = self.user_params.get("endpoint", 1)
# Allow for user overrides of these values
credentialIndex = self.user_params.get("credential_index", 1)
userCodeTemporaryDisableTime = self.user_params.get("user_code_temporary_disable_time", 15)
wrongCodeEntryLimit = self.user_params.get("wrong_code_entry_limit", 3)
autoRelockTime = self.user_params.get("auto_relock_time", 60)
if is_ci:
autoRelockTime = 10
cluster = Clusters.Objects.DoorLock
attributes = Clusters.DoorLock.Attributes
validPincode = None
invalidPincode = None
self.print_step(0, "Commissioning, already done")
if self.check_pics("DRLK.S.F00") and self.check_pics("DRLK.S.F07"):
self.print_step("Preconditions.1a",
"TH reads MaxPINCodeLength attribute from DUT and generates a valid PINCode")
maxPincodeLength_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.MaxPINCodeLength)
logging.info("MaxPINCodeLength value is %s" % (maxPincodeLength_dut))
validPincodeString = await self.generate_pincode(maxPincodeLength_dut)
while True:
invalidPincodeString = await self.generate_pincode(maxPincodeLength_dut)
if invalidPincodeString != validPincodeString:
break
logging.info("Valid PinCode=%s, Invalid PinCode=%s" % (validPincodeString, invalidPincodeString))
validPincode = bytes(validPincodeString, 'ascii')
invalidPincode = bytes(invalidPincodeString, 'ascii')
self.print_step("Preconditions.1b",
"TH sends SetCredential command to DUT to set up User and Credential at CredentialIndex {}".format(str(credentialIndex)))
credential = cluster.Structs.CredentialStruct(credentialIndex=credentialIndex,
credentialType=Clusters.DoorLock.Enums.CredentialTypeEnum.kPin)
ret = await self.send_set_credential_cmd(Clusters.DoorLock.Enums.DataOperationTypeEnum.kAdd,
credential,
validPincode,
NullValue,
NullValue,
NullValue)
logging.info("Credential created at CredentialIndex %d, UserIndex %d." % (credentialIndex, ret.userIndex))
self.createdCredential = credential
requirePinForRemoteOperation_dut = False
if self.check_pics("DRLK.S.F00") and self.check_pics("DRLK.S.F07"):
self.print_step("1", "TH writes the RequirePINforRemoteOperation attribute value as false on the DUT")
attribute = attributes.RequirePINforRemoteOperation(False)
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
print("---------------------- PICS is true")
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
print("---------------------- PICS is false")
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
if self.check_pics("DRLK.S.A0033"):
self.print_step("2", "TH reads and saves the value of the RequirePINforRemoteOperation attribute from the DUT")
requirePinForRemoteOperation_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.RequirePINforRemoteOperation)
logging.info("Current RequirePINforRemoteOperation value is %s" % (requirePinForRemoteOperation_dut))
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
self.print_step("2a", "TH verifies that RequirePINforRemoteOperation is FALSE")
asserts.assert_false(requirePinForRemoteOperation_dut, "RequirePINforRemoteOperation is expected to be FALSE")
else:
asserts.assert_true(
False, "RequirePINforRemoteOperation is a mandatory attribute if DRLK.S.F07(COTA) & DRLK.S.F00(PIN)")
if self.check_pics(lockUnlockCmdRspPICS):
self.print_step("3", "TH sends %s Command to the DUT without PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=None)
if requirePinForRemoteOperation_dut:
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
else:
await self.send_drlk_cmd_expect_success(command=command)
self.print_step("4", "TH sends %s Command to the DUT with valid PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
else:
asserts.assert_true(False, "%sResponse is a mandatory command response and must be supported in PICS" % lockUnlockText)
if self.check_pics("DRLK.S.F00") and self.check_pics("DRLK.S.F07"):
self.print_step("5", "TH writes the RequirePINforRemoteOperation attribute value as true on the DUT")
attribute = attributes.RequirePINforRemoteOperation(True)
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
if self.check_pics("DRLK.S.A0033"):
self.print_step("6", "TH reads and saves the value of the RequirePINforRemoteOperation attribute from the DUT")
requirePinForRemoteOperation_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.RequirePINforRemoteOperation)
logging.info("Current RequirePINforRemoteOperation value is %s" % (requirePinForRemoteOperation_dut))
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
self.print_step("6a", "TH verifies that RequirePINforRemoteOperation is TRUE")
asserts.assert_true(requirePinForRemoteOperation_dut, "RequirePINforRemoteOperation is expected to be TRUE")
if self.check_pics("DRLK.S.F00") and self.check_pics(lockUnlockCmdRspPICS) and self.check_pics("DRLK.S.A0033"):
self.print_step("7", "TH sends %s Command to the DUT with an invalid PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=invalidPincode)
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
self.print_step("8", "TH sends %s Command to the DUT without PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=None)
if requirePinForRemoteOperation_dut:
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
else:
await self.send_drlk_cmd_expect_success(command=command)
if self.check_pics(lockUnlockCmdRspPICS) and self.check_pics("DRLK.S.A0033"):
self.print_step("9", "TH sends %s Command to the DUT with valid PINCode" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
if self.check_pics("DRLK.S.F00") or self.check_pics("DRLK.S.F01"):
self.print_step("10a", "TH writes the WrongCodeEntryLimit to any value between 1 and 255")
attribute = attributes.WrongCodeEntryLimit(wrongCodeEntryLimit)
if self.check_pics("DRLK.S.M.WrongCodeEntryLimitAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
self.print_step("10b", "TH reads the value of WrongCodeEntryLimit attribute. Verify a range of 1-255")
wrongCodeEntryLimit_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.WrongCodeEntryLimit)
logging.info("WrongCodeEntryLimit value is %s" % (wrongCodeEntryLimit_dut))
asserts.assert_in(wrongCodeEntryLimit_dut, range(1, 255), "WrongCodeEntryLimit value is out of range")
self.print_step("11a", "TH writes the UserCodeTemporaryDisableTime to any value between 1 and 255")
attribute = attributes.UserCodeTemporaryDisableTime(userCodeTemporaryDisableTime)
if self.check_pics("DRLK.S.M.UserCodedTemporaryDisableTimeAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
self.print_step("11b", "TH reads the value of UserCodeTemporaryDisableTime attribute. Verify a range of 1-255")
userCodeTemporaryDisableTime_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.UserCodeTemporaryDisableTime)
logging.info("UserCodeTemporaryDisableTime value is %s" % (userCodeTemporaryDisableTime_dut))
asserts.assert_in(userCodeTemporaryDisableTime_dut, range(1, 255), "UserCodeTemporaryDisableTime value is out of range")
if self.check_pics(lockUnlockCmdRspPICS) and self.check_pics("DRLK.S.F00"):
self.print_step("12", "TH sends {} Command to the DUT with an invalid PINCode, repeated {} times".format(
lockUnlockText, wrongCodeEntryLimit_dut))
for i in range(wrongCodeEntryLimit_dut):
command = lockUnlockCommand(PINCode=invalidPincode)
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
self.print_step("13", "TH sends %s Command to the DUT with valid PINCode. Verify failure or no response" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_error(command=command, error=Status.Failure)
if self.check_pics("DRLK.S.A0031"):
self.print_step("14", "Wait for UserCodeTemporaryDisableTime seconds")
time.sleep(userCodeTemporaryDisableTime_dut)
if not doAutoRelockTest:
self.print_step("15", "Send %s with valid Pincode and verify success" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
if doAutoRelockTest:
if self.check_pics("DRLK.S.A0023"):
self.print_step("15", "TH writes the AutoRelockTime attribute value on the DUT")
attribute = attributes.AutoRelockTime(autoRelockTime)
if self.check_pics("DRLK.S.M.AutoRelockTimeAttributeWritable"):
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)
self.print_step("16", "TH reads the value of AutoRelockTime attribute.")
autoRelockTime_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.AutoRelockTime)
logging.info("AutoRelockTime value is %s" % (autoRelockTime_dut))
if self.check_pics(lockUnlockCmdRspPICS):
self.print_step("17", "Send %s with valid Pincode and verify success" % lockUnlockText)
command = lockUnlockCommand(PINCode=validPincode)
await self.send_drlk_cmd_expect_success(command=command)
if self.check_pics("DRLK.S.A0023"):
self.print_step("18a", "Wait for AutoRelockTime seconds")
# Add additional wait time buffer for motor movement, etc.
time.sleep(autoRelockTime_dut + 5)
if self.check_pics("DRLK.S.A0000"):
self.print_step("18b", "TH reads LockState attribute after AutoRelockTime Expires")
lockstate_dut = await self.read_drlk_attribute_expect_success(attribute=attributes.LockState)
logging.info("Current LockState is %s" % (lockstate_dut))
asserts.assert_equal(lockstate_dut, Clusters.DoorLock.Enums.DlLockState.kLocked,
"LockState expected to be value==Locked")
await self.cleanup_users_and_credentials()
async def run_drlk_test_2_2(self):
await self.run_drlk_test_common(lockUnlockCommand=Clusters.Objects.DoorLock.Commands.LockDoor,
lockUnlockCmdRspPICS="DRLK.S.C00.Rsp",
lockUnlockText="LockDoor",
doAutoRelockTest=False)
async def run_drlk_test_2_3(self):
await self.run_drlk_test_common(lockUnlockCommand=Clusters.Objects.DoorLock.Commands.UnlockDoor,
lockUnlockCmdRspPICS="DRLK.S.C01.Rsp",
lockUnlockText="UnlockDoor",
doAutoRelockTest=True)
async def run_drlk_test_2_12(self):
await self.run_drlk_test_common(lockUnlockCommand=Clusters.Objects.DoorLock.Commands.UnboltDoor,
lockUnlockCmdRspPICS="DRLK.S.C27.Rsp",
lockUnlockText="UnboltDoor",
doAutoRelockTest=True)