Skip to content

Commit f9b0f60

Browse files
restyled-commitscecille
authored andcommitted
Restyled by autopep8
1 parent 2e056c3 commit f9b0f60

File tree

3 files changed

+67
-44
lines changed

3 files changed

+67
-44
lines changed

src/python_testing/TC_CCTRL_2_2.py

+22-15
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ async def setup_class(self):
5252
super().setup_class()
5353
# TODO: This needs to come from an arg and needs to be something available on the TH
5454
# TODO: confirm whether we can open processes like this on the TH
55-
app = os.path.join(pathlib.Path(__file__).resolve().parent, '..','..','out', 'linux-x64-all-clusters-no-ble', 'chip-all-clusters-app')
55+
app = os.path.join(pathlib.Path(__file__).resolve().parent, '..', '..', 'out',
56+
'linux-x64-all-clusters-no-ble', 'chip-all-clusters-app')
5657

5758
self.kvs = f'kvs_{str(uuid.uuid4())}'
5859
self.port = 5543
@@ -69,7 +70,6 @@ async def setup_class(self):
6970

7071
logging.info("Commissioning from separate fabric")
7172

72-
7373
# Create a second controller on a new fabric to communicate to the server
7474
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
7575
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
@@ -88,7 +88,6 @@ def teardown_class(self):
8888
os.remove(self.kvs)
8989
super().teardown_class()
9090

91-
9291
def steps_TC_CCTRL_2_2(self) -> list[TestStep]:
9392
steps = [TestStep(1, "Get number of fabrics from TH_SERVER", is_commissioning=True),
9493
TestStep(2, "Reading Attribute VendorId from TH_SERVER"),
@@ -138,7 +137,8 @@ async def test_TC_CCTRL_2_2(self):
138137

139138
self.step(5)
140139
ipaddr = ipaddress.IPv6Address('::1')
141-
cmd = Clusters.CommissionerControl.Commands.CommissionNode(requestId=1, responseTimeoutSeconds=30, ipAddress=ipaddr.packed, port=self.port)
140+
cmd = Clusters.CommissionerControl.Commands.CommissionNode(
141+
requestId=1, responseTimeoutSeconds=30, ipAddress=ipaddr.packed, port=self.port)
142142
try:
143143
await self.send_single_cmd(cmd)
144144
asserts.fail("Unexpected success on CommissionNode")
@@ -158,7 +158,8 @@ async def test_TC_CCTRL_2_2(self):
158158

159159
self.step(8)
160160
good_request_id = 0x1234567887654321
161-
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(requestId=good_request_id, vendorId=th_server_vid, productId=th_server_pid)
161+
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
162+
requestId=good_request_id, vendorId=th_server_vid, productId=th_server_pid)
162163
try:
163164
await self.send_single_cmd(cmd=cmd, node_id=pase_nodeid)
164165
asserts.fail("Unexpected success on RequestCommissioningApproval over PASE")
@@ -182,7 +183,8 @@ async def test_TC_CCTRL_2_2(self):
182183
# There should be nothing on the TH that uses VID 0x6006 as this is a real vendor ID.
183184
not_th_server_vid = 0x6006
184185
asserts.assert_not_equal(not_th_server_vid, th_server_vid, "Test implementation assumption incorrect")
185-
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(requestId=good_request_id, vendorId=not_th_server_vid, productId=th_server_pid)
186+
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
187+
requestId=good_request_id, vendorId=not_th_server_vid, productId=th_server_pid)
186188
# If no exception is raised, this is success
187189
await self.send_single_cmd(cmd)
188190

@@ -198,7 +200,8 @@ async def test_TC_CCTRL_2_2(self):
198200
new_event = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path, eventNumberFilter=max(event_nums)+1)
199201
asserts.assert_equal(len(new_event), 1, "Unexpected event list len")
200202
asserts.assert_equal(new_event[0].Data.statusCode, 0, "Unexpected status code")
201-
asserts.assert_equal(new_event[0].Data.clientNodeId, self.matter_test_config.controller_node_id, "Unexpected client node id")
203+
asserts.assert_equal(new_event[0].Data.clientNodeId,
204+
self.matter_test_config.controller_node_id, "Unexpected client node id")
202205
asserts.assert_equal(new_event[0].Data.requestId, good_request_id, "Unexpected request ID")
203206

204207
self.step(14)
@@ -229,7 +232,8 @@ async def test_TC_CCTRL_2_2(self):
229232
self.step(17)
230233
cmd = Clusters.CommissionerControl.Commands.CommissionNode(requestId=good_request_id, responseTimeoutSeconds=30)
231234
resp: Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow = await self.send_single_cmd(cmd)
232-
asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow, "Incorrect response type")
235+
asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow,
236+
"Incorrect response type")
233237

234238
self.step(18)
235239
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
@@ -255,7 +259,8 @@ async def test_TC_CCTRL_2_2(self):
255259

256260
self.step(22)
257261
good_request_id = 0x1234567812345678
258-
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(requestId=good_request_id, vendorId=th_server_vid, productId=th_server_pid, label="Test Ecosystem")
262+
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
263+
requestId=good_request_id, vendorId=th_server_vid, productId=th_server_pid, label="Test Ecosystem")
259264
await self.send_single_cmd(cmd)
260265

261266
self.step(23)
@@ -268,13 +273,15 @@ async def test_TC_CCTRL_2_2(self):
268273
new_event = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path, eventNumberFilter=max(event_nums)+1)
269274
asserts.assert_equal(len(new_event), 1, "Unexpected event list len")
270275
asserts.assert_equal(new_event[0].Data.statusCode, 0, "Unexpected status code")
271-
asserts.assert_equal(new_event[0].Data.clientNodeId, self.matter_test_config.controller_node_id, "Unexpected client node id")
276+
asserts.assert_equal(new_event[0].Data.clientNodeId,
277+
self.matter_test_config.controller_node_id, "Unexpected client node id")
272278
asserts.assert_equal(new_event[0].Data.requestId, good_request_id, "Unexpected request ID")
273279

274280
self.step(25)
275281
cmd = Clusters.CommissionerControl.Commands.CommissionNode(requestId=good_request_id, responseTimeoutSeconds=30)
276282
resp = await self.send_single_cmd(cmd)
277-
asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow, "Incorrect response type")
283+
asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow,
284+
"Incorrect response type")
278285

279286
self.step(26)
280287
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
@@ -290,11 +297,11 @@ async def test_TC_CCTRL_2_2(self):
290297

291298
self.step(28)
292299
th_server_fabrics_new = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
293-
#TODO: this should be mocked too.
300+
# TODO: this should be mocked too.
294301
if not self.is_ci:
295-
asserts.assert_equal(len(th_server_fabrics) + 1, len(th_server_fabrics_new), "Unexpected number of fabrics on TH_SERVER")
296-
302+
asserts.assert_equal(len(th_server_fabrics) + 1, len(th_server_fabrics_new),
303+
"Unexpected number of fabrics on TH_SERVER")
297304

298305

299306
if __name__ == "__main__":
300-
default_matter_test_main()
307+
default_matter_test_main()

src/python_testing/test_testing/MockTestRunner.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
class AsyncMock(MagicMock):
3535
async def __call__(self, *args, **kwargs):
3636
return super(AsyncMock, self).__call__(*args, **kwargs)
37+
38+
3739
class MockTestRunner():
3840

39-
def __init__(self, filename: str, classname: str, test: str, endpoint: int = 0, pics: dict[str, bool] = None, paa_trust_store_path = None):
41+
def __init__(self, filename: str, classname: str, test: str, endpoint: int = 0, pics: dict[str, bool] = None, paa_trust_store_path=None):
4042
self.test = test
4143
self.endpoint = endpoint
4244
self.pics = pics

src/python_testing/test_testing/test_TC_CCNTL_2_2.py

+42-28
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
invoke_call_count = 0
4040
event_call_count = 0
4141

42+
4243
def dynamic_invoke_return(*args, **argv):
4344
global invoke_call_count
4445
invoke_call_count += 1
@@ -49,54 +50,60 @@ def dynamic_invoke_return(*args, **argv):
4950
discriminator=2222, iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))
5051

5152
print(f'invoke call {invoke_call_count}')
52-
if invoke_call_count == 1: # Commission node with no prior request, return failure - step 5
53+
if invoke_call_count == 1: # Commission node with no prior request, return failure - step 5
5354
raise InteractionModelError(status=Status.Failure)
54-
elif invoke_call_count == 2: # Commission node over pase - return unsupported access - step 7
55+
elif invoke_call_count == 2: # Commission node over pase - return unsupported access - step 7
5556
raise InteractionModelError(status=Status.UnsupportedAccess)
56-
elif invoke_call_count == 3: # request commissioning approval over pase - return unsupported access - step 8
57+
elif invoke_call_count == 3: # request commissioning approval over pase - return unsupported access - step 8
5758
raise InteractionModelError(status=Status.UnsupportedAccess)
58-
elif invoke_call_count == 4: # good RevokeCommissioning over CASE with bad vid - step 9
59+
elif invoke_call_count == 4: # good RevokeCommissioning over CASE with bad vid - step 9
5960
return None
60-
elif invoke_call_count == 5: # good RequestCommissioningApproval over CASE with bad vid - step 10
61+
elif invoke_call_count == 5: # good RequestCommissioningApproval over CASE with bad vid - step 10
6162
return None
62-
elif invoke_call_count == 6: # CommissionNode with bad request id - step 14
63+
elif invoke_call_count == 6: # CommissionNode with bad request id - step 14
6364
raise InteractionModelError(status=Status.Failure)
64-
elif invoke_call_count == 7: # CommissionNode with bad timeout (low) - step 15
65+
elif invoke_call_count == 7: # CommissionNode with bad timeout (low) - step 15
6566
raise InteractionModelError(status=Status.Failure)
66-
elif invoke_call_count == 8: # CommissionNode with bad timeout (high) - step 16
67+
elif invoke_call_count == 8: # CommissionNode with bad timeout (high) - step 16
6768
raise InteractionModelError(status=Status.Failure)
68-
elif invoke_call_count == 9: # CommissionNode - step 17
69+
elif invoke_call_count == 9: # CommissionNode - step 17
6970
# passcode 20202024
7071
return reverse_open
71-
elif invoke_call_count == 10: # RequestCommissioningApproval with good vid - step 22
72+
elif invoke_call_count == 10: # RequestCommissioningApproval with good vid - step 22
7273
return None
73-
elif invoke_call_count == 11: # CommissionNode - step 25
74+
elif invoke_call_count == 11: # CommissionNode - step 25
7475
# passcode 20202024
7576
return reverse_open
7677
else:
7778
raise InteractionModelError(Status.Failure)
7879

80+
7981
def dynamic_event_return(*args, **argv):
8082
global event_call_count
8183
event_call_count += 1
8284

83-
if event_call_count == 1: # reading events, start empty - no events
85+
if event_call_count == 1: # reading events, start empty - no events
8486
return []
85-
elif event_call_count == 2: # read event with filter - expect empty
87+
elif event_call_count == 2: # read event with filter - expect empty
8688
return []
87-
elif event_call_count == 3: # returned event
88-
header = Attribute.EventHeader(EndpointId=0, ClusterId=Clusters.CommissionerControl.id, EventId=Clusters.CommissionerControl.Events.CommissioningRequestResult.event_id, EventNumber=1)
89-
data = Clusters.CommissionerControl.Events.CommissioningRequestResult(requestId=0x1234567887654321, clientNodeId=112233, statusCode=0)
89+
elif event_call_count == 3: # returned event
90+
header = Attribute.EventHeader(EndpointId=0, ClusterId=Clusters.CommissionerControl.id,
91+
EventId=Clusters.CommissionerControl.Events.CommissioningRequestResult.event_id, EventNumber=1)
92+
data = Clusters.CommissionerControl.Events.CommissioningRequestResult(
93+
requestId=0x1234567887654321, clientNodeId=112233, statusCode=0)
9094
result = Attribute.EventReadResult(Header=header, Status=Status.Success, Data=data)
9195
return [result]
92-
elif event_call_count == 4: # returned event with new request
93-
header = Attribute.EventHeader(EndpointId=0, ClusterId=Clusters.CommissionerControl.id, EventId=Clusters.CommissionerControl.Events.CommissioningRequestResult.event_id, EventNumber=1)
94-
data = Clusters.CommissionerControl.Events.CommissioningRequestResult(requestId=0x1234567812345678, clientNodeId=112233, statusCode=0)
96+
elif event_call_count == 4: # returned event with new request
97+
header = Attribute.EventHeader(EndpointId=0, ClusterId=Clusters.CommissionerControl.id,
98+
EventId=Clusters.CommissionerControl.Events.CommissioningRequestResult.event_id, EventNumber=1)
99+
data = Clusters.CommissionerControl.Events.CommissioningRequestResult(
100+
requestId=0x1234567812345678, clientNodeId=112233, statusCode=0)
95101
result = Attribute.EventReadResult(Header=header, Status=Status.Success, Data=data)
96102
return [result]
97103
else:
98104
raise InteractionModelError(Status.Failure)
99105

106+
100107
def wildcard() -> Attribute.AsyncReadTransaction.ReadResponse:
101108
cc = Clusters.CommissionerControl
102109
ei = Clusters.EcosystemInformation
@@ -105,22 +112,27 @@ def wildcard() -> Attribute.AsyncReadTransaction.ReadResponse:
105112

106113
# EP1 is aggregator device type with a commissioner control cluster
107114
# children - EP2 type bridged node endpoint, ecosystem information, bridged device basic information. Should also have and admin commissioning, but I don't need it for this test.
108-
desc_ep1 = {desc.Attributes.PartsList: [2], desc.Attributes.ServerList: [cc.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x000E, revision=2)]}
109-
desc_ep2 = {desc.Attributes.ServerList: [bdbi.id, ei.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x0013, revision=3)]}
115+
desc_ep1 = {desc.Attributes.PartsList: [2], desc.Attributes.ServerList: [
116+
cc.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x000E, revision=2)]}
117+
desc_ep2 = {desc.Attributes.ServerList: [bdbi.id, ei.id], desc.Attributes.DeviceTypeList: [
118+
desc.Structs.DeviceTypeStruct(deviceType=0x0013, revision=3)]}
110119

111120
# I'm not filling anything in here, because I don't care. I just care that the cluster exists.
112-
ei_attrs = {ei.Attributes.AttributeList:[ei.Attributes.DeviceDirectory.attribute_id, ei.Attributes.LocationDirectory.attribute_id], ei.Attributes.DeviceDirectory:[], ei.Attributes.LocationDirectory:[]}
121+
ei_attrs = {ei.Attributes.AttributeList: [ei.Attributes.DeviceDirectory.attribute_id,
122+
ei.Attributes.LocationDirectory.attribute_id], ei.Attributes.DeviceDirectory: [], ei.Attributes.LocationDirectory: []}
113123

114124
# This cluster just needs to exist, so I'm just going to throw on the mandatory items for now.
115-
bdbi_attrs = {bdbi.Attributes.AttributeList:[bdbi.Attributes.Reachable.attribute_id, bdbi.Attributes.UniqueID.attribute_id], bdbi.Attributes.Reachable:True, bdbi.Attributes.UniqueID:'something'}
125+
bdbi_attrs = {bdbi.Attributes.AttributeList: [bdbi.Attributes.Reachable.attribute_id,
126+
bdbi.Attributes.UniqueID.attribute_id], bdbi.Attributes.Reachable: True, bdbi.Attributes.UniqueID: 'something'}
116127

117-
cc_attrs = {cc.Attributes.AttributeList:[cc.Attributes.SupportedDeviceCategories], cc.Attributes.AcceptedCommandList:[cc.Commands.RequestCommissioningApproval, cc.Commands.CommissionNode],
118-
cc.Attributes.GeneratedCommandList:[cc.Commands.RequestCommissioningApproval], cc.Attributes.SupportedDeviceCategories:1}
128+
cc_attrs = {cc.Attributes.AttributeList: [cc.Attributes.SupportedDeviceCategories], cc.Attributes.AcceptedCommandList: [cc.Commands.RequestCommissioningApproval, cc.Commands.CommissionNode],
129+
cc.Attributes.GeneratedCommandList: [cc.Commands.RequestCommissioningApproval], cc.Attributes.SupportedDeviceCategories: 1}
119130

120131
resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
121-
resp.attributes = {1: {desc: desc_ep1, cc:cc_attrs}, 2:{desc:desc_ep2, ei:ei_attrs, bdbi:bdbi_attrs}}
132+
resp.attributes = {1: {desc: desc_ep1, cc: cc_attrs}, 2: {desc: desc_ep2, ei: ei_attrs, bdbi: bdbi_attrs}}
122133
return resp
123134

135+
124136
class MyMock(MockTestRunner):
125137
# TODO consolidate with above
126138
def run_test_with_mock(self, dynamic_invoke_return: typing.Callable, dynamic_event_return: typing.Callable, read_cache: Attribute.AsyncReadTransaction.ReadResponse, hooks=None):
@@ -135,17 +147,19 @@ def run_test_with_mock(self, dynamic_invoke_return: typing.Callable, dynamic_eve
135147

136148
return run_tests_no_exit(self.test_class, self.config, hooks, self.default_controller, self.stack)
137149

150+
138151
def main():
139-
root = os.path.abspath(os.path.join(pathlib.Path(__file__).resolve().parent, '..','..','..'))
152+
root = os.path.abspath(os.path.join(pathlib.Path(__file__).resolve().parent, '..', '..', '..'))
140153
print(f'root = {root}')
141154
paa_path = get_default_paa_trust_store(root)
142155
print(f'paa = {paa_path}')
143156

144-
pics = {"PICS_SDK_CI_ONLY": True }
157+
pics = {"PICS_SDK_CI_ONLY": True}
145158
test_runner = MyMock('TC_CCTRL_2_2', 'TC_CCTRL_2_2', 'test_TC_CCTRL_2_2', 1, paa_trust_store_path=paa_path, pics=pics)
146159

147160
test_runner.run_test_with_mock(dynamic_invoke_return, dynamic_event_return, wildcard())
148161
test_runner.Shutdown()
149162

163+
150164
if __name__ == "__main__":
151165
sys.exit(main())

0 commit comments

Comments
 (0)