forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTC_CGEN_2_4.py
161 lines (139 loc) · 9.44 KB
/
TC_CGEN_2_4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import random
import time
import chip.CertificateAuthority
import chip.clusters as Clusters
import chip.clusters.enum
import chip.FabricAdmin
from chip import ChipDeviceCtrl
from chip.ChipDeviceCtrl import CommissioningParameters
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
# Commissioning stage numbers - we should find a better way to match these to the C++ code
kArmFailsafe = 4
kConfigRegulatory = 5
kSendPAICertificateRequest = 10
kSendDACCertificateRequest = 11
kSendAttestationRequest = 12
kSendOpCertSigningRequest = 15
kSendTrustedRootCert = 18
kSendNOC = 19
class TC_CGEN_2_4(MatterBaseTest):
def OpenCommissioningWindow(self) -> CommissioningParameters:
try:
params = self.th1.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=self.discriminator, option=1)
return params
except Exception as e:
logging.exception('Error running OpenCommissioningWindow %s', e)
asserts.assert_true(False, 'Failed to open commissioning window')
async def CommissionToStageSendCompleteAndCleanup(
self, stage: int, expectedErrorPart: chip.native.ErrorSDKPart, expectedErrCode: int):
logging.info("-----------------Fail on step {}-------------------------".format(stage))
params = self.OpenCommissioningWindow()
self.th2.ResetTestCommissioner()
# This will run the commissioning up to the point where stage x is run and the
# response is sent before the test commissioner simulates a failure
self.th2.SetTestCommissionerPrematureCompleteAfter(stage)
errcode = self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(errcode.is_success, errcode))
asserts.assert_false(errcode.is_success, 'Commissioning complete did not error as expected')
asserts.assert_true(errcode.sdk_part == expectedErrorPart, 'Unexpected error type returned from CommissioningComplete')
asserts.assert_true(errcode.sdk_code == expectedErrCode, 'Unexpected error code returned from CommissioningComplete')
revokeCmd = Clusters.AdministratorCommissioning.Commands.RevokeCommissioning()
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=revokeCmd, timedRequestTimeoutMs=6000)
# The failsafe cleanup is scheduled after the command completes, so give it a bit of time to do that
time.sleep(1)
@async_test_body
async def test_TC_CGEN_2_4(self):
self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)
th2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
th2_fabric_admin = th2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.th1.fabricId + 1)
self.th2 = th2_fabric_admin.NewController(nodeId=2, useTestCommissioner=True)
# kArmFailsafe, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kArmFailsafe, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kConfigRegulatory, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kConfigRegulatory, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kSendPAICertificateRequest, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kSendPAICertificateRequest, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kSendDACCertificateRequest, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kSendDACCertificateRequest, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kSendAttestationRequest, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kSendAttestationRequest, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kSendOpCertSigningRequest, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kSendOpCertSigningRequest, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kSendTrustedRootCert, expect General error 0x7e (UNSUPPORTED_ACCESS)
await self.CommissionToStageSendCompleteAndCleanup(kSendTrustedRootCert, chip.native.ErrorSDKPart.IM_GLOBAL_STATUS, 0x7e)
# kSendNOC, expect cluster error InvalidAuthentication
await self.CommissionToStageSendCompleteAndCleanup(kSendNOC, chip.native.ErrorSDKPart.IM_CLUSTER_STATUS, 0x02)
logging.info('Step 15 - TH1 opens a commissioning window')
params = self.OpenCommissioningWindow()
logging.info('Step 16 - TH2 fully commissions the DUT')
self.th2.ResetTestCommissioner()
errcode = self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(errcode.is_success, errcode))
logging.info('Step 17 - TH1 sends an arm failsafe')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=0)
# This will throw if not successful
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
logging.info('Step 18 - TH1 reads the location capability')
attr = Clusters.GeneralCommissioning.Attributes.LocationCapability
cap = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
if cap == Clusters.GeneralCommissioning.Enums.RegulatoryLocationTypeEnum.kIndoor:
newloc = Clusters.GeneralCommissioning.Enums.RegulatoryLocationTypeEnum.kOutdoor
elif cap == Clusters.GeneralCommissioning.Enums.RegulatoryLocationTypeEnum.kOutdoor:
newloc = Clusters.GeneralCommissioning.Enums.RegulatoryLocationTypeEnum.kIndoor
else:
newloc = Clusters.GeneralCommissioning.Enums.RegulatoryLocationTypeEnum.extend_enum_if_value_doesnt_exist(3)
logging.info('Step 19 Send SetRgulatoryConfig with incorrect location newloc = {}'.format(newloc))
cmd = Clusters.GeneralCommissioning.Commands.SetRegulatoryConfig(
newRegulatoryConfig=newloc, countryCode="XX", breadcrumb=0)
ret = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.assert_true(ret.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kValueOutsideRange)
logging.info('Step 20 - TH2 sends CommissioningComplete')
cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th2.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.assert_true(isinstance(resp, Clusters.GeneralCommissioning.Commands.CommissioningCompleteResponse),
'Incorrect response type from command')
asserts.assert_true(
resp.errorCode == Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kInvalidAuthentication, 'Incorrect error code')
logging.info('Step 21 - TH1 sends an arm failsafe with timeout==0')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=0)
# This will throw if not successful
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
logging.info('Step 22 - TH1 sends CommissioningComplete')
cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th2.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.assert_true(isinstance(resp, Clusters.GeneralCommissioning.Commands.CommissioningCompleteResponse),
'Incorrect response type from command')
asserts.assert_true(resp.errorCode == Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoFailSafe,
'Incorrect error code')
logging.info('Step 23 - TH2 reads fabric index')
attr = Clusters.OperationalCredentials.Attributes.CurrentFabricIndex
th2FabricIndex = await self.read_single_attribute(dev_ctrl=self.th2, node_id=self.dut_node_id, endpoint=0, attribute=attr)
logging.info('Step 24 - TH1 removes TH2')
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=th2FabricIndex)
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
if __name__ == "__main__":
default_matter_test_main()