diff --git a/src/controller/python/test/test_scripts/base.py b/src/controller/python/test/test_scripts/base.py index 6d4017a5b70ed8..9fc9300f4c8116 100644 --- a/src/controller/python/test/test_scripts/base.py +++ b/src/controller/python/test/test_scripts/base.py @@ -697,13 +697,13 @@ async def TestCaseEviction(self, nodeid: int): # on the sub we established previously. Since it was just marked defunct, it should return back to being # active and a report should get delivered. # - sawValueChange = False + sawValueChangeEvent = asyncio.Event() + loop = asyncio.get_running_loop() def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.SubscriptionTransaction) -> None: - nonlocal sawValueChange self.logger.info("Saw value change!") if (path.AttributeType == Clusters.UnitTesting.Attributes.Int8u and path.Path.EndpointId == 1): - sawValueChange = True + loop.call_soon_threadsafe(sawValueChangeEvent.set) self.logger.info("Testing CASE defunct logic") @@ -720,14 +720,15 @@ def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.Sub # was received. # await self.devCtrl2.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.Int8u(4))]) - time.sleep(2) - sub.Shutdown() - - if sawValueChange is False: + try: + await asyncio.wait_for(sawValueChangeEvent.wait(), 2) + except TimeoutError: self.logger.error( "Didn't see value change in time, likely because sub got terminated due to unexpected session eviction!") return False + finally: + sub.Shutdown() # # In this test, we're going to setup a subscription on fabric1 through devCtl, then, constantly keep @@ -739,7 +740,7 @@ def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.Sub # self.logger.info("Testing fabric-isolated CASE eviction") - sawValueChange = False + sawValueChangeEvent.clear() sub = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.UnitTesting.Attributes.Int8u)], reportInterval=(0, 1)) sub.SetAttributeUpdateCallback(OnValueChange) @@ -752,13 +753,14 @@ def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.Sub # was received. Use a different value from before, so there is an actual change. # await self.devCtrl2.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.Int8u(5))]) - time.sleep(2) - - sub.Shutdown() - if sawValueChange is False: + try: + await asyncio.wait_for(sawValueChangeEvent.wait(), 2) + except TimeoutError: self.logger.error("Didn't see value change in time, likely because sub got terminated due to other fabric (fabric1)") return False + finally: + sub.Shutdown() # # Do the same test again, but reversing the roles of fabric1 and fabric2. And again @@ -766,7 +768,7 @@ def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.Sub # self.logger.info("Testing fabric-isolated CASE eviction (reverse)") - sawValueChange = False + sawValueChangeEvent.clear() sub = await self.devCtrl2.ReadAttribute(nodeid, [(Clusters.UnitTesting.Attributes.Int8u)], reportInterval=(0, 1)) sub.SetAttributeUpdateCallback(OnValueChange) @@ -775,13 +777,13 @@ def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.Sub await self.devCtrl.ReadAttribute(nodeid, [(Clusters.BasicInformation.Attributes.ClusterRevision)]) await self.devCtrl.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.Int8u(6))]) - time.sleep(2) - - sub.Shutdown() - - if sawValueChange is False: + try: + await asyncio.wait_for(sawValueChangeEvent.wait(), 2) + except TimeoutError: self.logger.error("Didn't see value change in time, likely because sub got terminated due to other fabric (fabric2)") return False + finally: + sub.Shutdown() return True @@ -1199,121 +1201,114 @@ def TestReadBasicAttributes(self, nodeid: int, endpoint: int, group: int): return False return True - def TestWriteBasicAttributes(self, nodeid: int, endpoint: int, group: int): + async def TestWriteBasicAttributes(self, nodeid: int, endpoint: int): @ dataclass class AttributeWriteRequest: - cluster: str - attribute: str + cluster: Clusters.ClusterObjects.Cluster + attribute: Clusters.ClusterObjects.ClusterAttributeDescriptor value: Any expected_status: IM.Status = IM.Status.Success requests = [ - AttributeWriteRequest("BasicInformation", "NodeLabel", "Test"), - AttributeWriteRequest("BasicInformation", "Location", + AttributeWriteRequest(Clusters.BasicInformation, Clusters.BasicInformation.Attributes.NodeLabel, "Test"), + AttributeWriteRequest(Clusters.BasicInformation, Clusters.BasicInformation.Attributes.Location, "a pretty loooooooooooooog string", IM.Status.ConstraintError), ] - failed_zcl = [] + failed_attribute_write = [] for req in requests: try: try: - self.devCtrl.ZCLWriteAttribute(cluster=req.cluster, - attribute=req.attribute, - nodeid=nodeid, - endpoint=endpoint, - groupid=group, - value=req.value) + await self.devCtrl.WriteAttribute(nodeid, [(endpoint, req.attribute, 0)]) if req.expected_status != IM.Status.Success: raise AssertionError( - f"Write attribute {req.cluster}.{req.attribute} expects failure but got success response") + f"Write attribute {req.attribute.__qualname__} expects failure but got success response") except Exception as ex: if req.expected_status != IM.Status.Success: continue else: raise ex - res = self.devCtrl.ZCLReadAttribute( - cluster=req.cluster, attribute=req.attribute, nodeid=nodeid, endpoint=endpoint, groupid=group) - TestResult(f"Read attribute {req.cluster}.{req.attribute}", res).assertValueEqual( - req.value) + + res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, req.attribute)]) + val = res[endpoint][req.cluster][req.attribute] + if val != req.value: + raise Exception( + f"Read attribute {req.attribute.__qualname__}: expected value {req.value}, got {val}") except Exception as ex: - failed_zcl.append(str(ex)) - if failed_zcl: - self.logger.exception(f"Following attributes failed: {failed_zcl}") + failed_attribute_write.append(str(ex)) + if failed_attribute_write: + self.logger.exception(f"Following attributes failed: {failed_attribute_write}") return False return True - def TestSubscription(self, nodeid: int, endpoint: int): + async def TestSubscription(self, nodeid: int, endpoint: int): desiredPath = None receivedUpdate = 0 - updateLock = threading.Lock() - updateCv = threading.Condition(updateLock) + updateEvent = asyncio.Event() + loop = asyncio.get_running_loop() def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.SubscriptionTransaction) -> None: - nonlocal desiredPath, updateCv, updateLock, receivedUpdate + nonlocal desiredPath, updateEvent, receivedUpdate if path.Path != desiredPath: return data = transaction.GetAttribute(path) logger.info( f"Received report from server: path: {path.Path}, value: {data}") - with updateLock: - receivedUpdate += 1 - updateCv.notify_all() - - class _conductAttributeChange(threading.Thread): - def __init__(self, devCtrl: ChipDeviceCtrl.ChipDeviceController, nodeid: int, endpoint: int): - super(_conductAttributeChange, self).__init__() - self.nodeid = nodeid - self.endpoint = endpoint - self.devCtrl = devCtrl - - def run(self): - for i in range(5): - time.sleep(3) - self.devCtrl.ZCLSend( - "OnOff", "Toggle", self.nodeid, self.endpoint, 0, {}) + receivedUpdate += 1 + loop.call_soon_threadsafe(updateEvent.set) + + async def _conductAttributeChange(devCtrl: ChipDeviceCtrl.ChipDeviceController, nodeid: int, endpoint: int): + for i in range(5): + await asyncio.sleep(3) + await self.devCtrl.SendCommand(nodeid, endpoint, Clusters.OnOff.Commands.Toggle()) try: desiredPath = Clusters.Attribute.AttributePath( EndpointId=1, ClusterId=6, AttributeId=0) # OnOff Cluster, OnOff Attribute - subscription = self.devCtrl.ZCLSubscribeAttribute( - "OnOff", "OnOff", nodeid, endpoint, 1, 10) + subscription = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, Clusters.OnOff.Attributes.OnOff)], None, False, reportInterval=(1, 10), + keepSubscriptions=False, autoResubscribe=True) subscription.SetAttributeUpdateCallback(OnValueChange) - changeThread = _conductAttributeChange( - self.devCtrl, nodeid, endpoint) # Reset the number of subscriptions received as subscribing causes a callback. - changeThread.start() - with updateCv: - while receivedUpdate < 5: - # We should observe 5 attribute changes - # The changing thread will change the value after 3 seconds. If we're waiting more than 10, assume something - # is really wrong and bail out here with some information. - if not updateCv.wait(10.0): - self.logger.error( - "Failed to receive subscription update") - break - - # thread changes 5 times, and sleeps for 3 seconds in between. - # Add an additional 3 seconds of slack. Timeout is in seconds. - changeThread.join(18.0) + taskAttributeChange = loop.create_task(_conductAttributeChange(self.devCtrl, nodeid, endpoint)) - # - # Clean-up by shutting down the sub. Otherwise, we're going to get callbacks through - # OnValueChange on what will soon become an invalid - # execution context above. - # - subscription.Shutdown() + while receivedUpdate < 5: + # We should observe 5 attribute changes + # The changing thread will change the value after 3 seconds. If we're waiting more than 10, assume something + # is really wrong and bail out here with some information. + try: + await asyncio.wait_for(updateEvent.wait(), 10) + updateEvent.clear() + except TimeoutError: + self.logger.error( + "Failed to receive subscription update") + break - if changeThread.is_alive(): - # Thread join timed out - self.logger.error("Failed to join change thread") - return False + # At this point the task should really have done the three attribute, + # otherwise something is wrong. Wait for just 1s in case of a race + # condition between the last attribute update and the callback. + try: + await asyncio.wait_for(taskAttributeChange, 1) + except asyncio.TimeoutError: + # If attribute change task did not finish something is wrong. Cancel + # the task. + taskAttributeChange.cancel() + # This will throw a asyncio.CancelledError and makes sure the test + # is declared failed. + await taskAttributeChange return True if receivedUpdate == 5 else False except Exception as ex: self.logger.exception(f"Failed to finish API test: {ex}") return False + finally: + # + # Clean-up by shutting down the sub. Otherwise, we're going to get callbacks through + # OnValueChange on what will soon become an invalid + # execution context above. + # + subscription.Shutdown() return True @@ -1347,7 +1342,7 @@ def TestFabricScopedCommandDuringPase(self, nodeid: int): return status == IM.Status.UnsupportedAccess - def TestSubscriptionResumption(self, nodeid: int, endpoint: int, remote_ip: str, ssh_port: int, remote_server_app: str): + async def TestSubscriptionResumption(self, nodeid: int, endpoint: int, remote_ip: str, ssh_port: int, remote_server_app: str): ''' This test validates that the device can resume the subscriptions after restarting. It is executed in Linux Cirque tests and the steps of this test are: @@ -1356,42 +1351,40 @@ def TestSubscriptionResumption(self, nodeid: int, endpoint: int, remote_ip: str, 3. Validate that the controller can receive a report from the remote server app ''' desiredPath = None - receivedUpdate = False - updateLock = threading.Lock() - updateCv = threading.Condition(updateLock) + updateEvent = asyncio.Event() + loop = asyncio.get_running_loop() def OnValueReport(path: Attribute.TypedAttributePath, transaction: Attribute.SubscriptionTransaction) -> None: - nonlocal desiredPath, updateCv, updateLock, receivedUpdate + nonlocal desiredPath, updateEvent, receivedUpdate if path.Path != desiredPath: return data = transaction.GetAttribute(path) logger.info( f"Received report from server: path: {path.Path}, value: {data}") - with updateLock: - receivedUpdate = True - updateCv.notify_all() + loop.call_soon_threadsafe(updateEvent.set) try: desiredPath = Clusters.Attribute.AttributePath( EndpointId=0, ClusterId=0x28, AttributeId=5) # BasicInformation Cluster, NodeLabel Attribute - subscription = self.devCtrl.ZCLSubscribeAttribute( - "BasicInformation", "NodeLabel", nodeid, endpoint, 1, 50, keepSubscriptions=True, autoResubscribe=False) + subscription = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, Clusters.BasicInformation.Attributes.NodeLabel)], None, False, reportInterval=(1, 50), + keepSubscriptions=True, autoResubscribe=False) subscription.SetAttributeUpdateCallback(OnValueReport) - self.logger.info("Restart remote deivce") + self.logger.info("Restart remote device") restartRemoteThread = restartRemoteDevice( remote_ip, ssh_port, "root", "admin", remote_server_app, "--thread --discriminator 3840") restartRemoteThread.start() # After device restarts, the attribute will be set dirty so the subscription can receive # the update - with updateCv: - while receivedUpdate is False: - if not updateCv.wait(10.0): - self.logger.error( - "Failed to receive subscription resumption report") - break + receivedUpdate = False + try: + await asyncio.wait_for(updateEvent.wait(), 10) + receivedUpdate = True + except TimeoutError: + self.logger.error( + "Failed to receive subscription resumption report") restartRemoteThread.join(10.0) @@ -1438,25 +1431,26 @@ def OnValueReport(path: Attribute.TypedAttributePath, transaction: Attribute.Sub controller 1 in container 1 while the Step2 is executed in controller 2 in container 2 ''' - def TestSubscriptionResumptionCapacityStep1(self, nodeid: int, endpoint: int, passcode: int, subscription_capacity: int): + async def TestSubscriptionResumptionCapacityStep1(self, nodeid: int, endpoint: int, passcode: int, subscription_capacity: int): try: # BasicInformation Cluster, NodeLabel Attribute for i in range(subscription_capacity): - self.devCtrl.ZCLSubscribeAttribute( - "BasicInformation", "NodeLabel", nodeid, endpoint, 1, 50, keepSubscriptions=True, autoResubscribe=False) + await self.devCtrl.ReadAttribute(nodeid, [(endpoint, Clusters.BasicInformation.Attributes.NodeLabel)], None, + False, reportInterval=(1, 50), + keepSubscriptions=True, autoResubscribe=False) logger.info("Send OpenCommissioningWindow command on fist controller") discriminator = 3840 salt = secrets.token_bytes(16) iterations = 2000 verifier = GenerateVerifier(passcode, salt, iterations) - asyncio.run(self.devCtrl.SendCommand( + await self.devCtrl.SendCommand( nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow( commissioningTimeout=180, PAKEPasscodeVerifier=verifier, discriminator=discriminator, iterations=iterations, - salt=salt), timedRequestTimeoutMs=10000)) + salt=salt), timedRequestTimeoutMs=10000) return True except Exception as ex: @@ -1465,8 +1459,8 @@ def TestSubscriptionResumptionCapacityStep1(self, nodeid: int, endpoint: int, pa return True - def TestSubscriptionResumptionCapacityStep2(self, nodeid: int, endpoint: int, remote_ip: str, ssh_port: int, - remote_server_app: str, subscription_capacity: int): + async def TestSubscriptionResumptionCapacityStep2(self, nodeid: int, endpoint: int, remote_ip: str, ssh_port: int, + remote_server_app: str, subscription_capacity: int): try: self.logger.info("Restart remote deivce") extra_agrs = f"--thread --discriminator 3840 --subscription-capacity {subscription_capacity}" @@ -1480,8 +1474,9 @@ def TestSubscriptionResumptionCapacityStep2(self, nodeid: int, endpoint: int, re self.logger.info("Send a new subscription request from the second controller") # Close previous session so that the second controller will res-establish the session with the remote device self.devCtrl.CloseSession(nodeid) - self.devCtrl.ZCLSubscribeAttribute( - "BasicInformation", "NodeLabel", nodeid, endpoint, 1, 50, keepSubscriptions=True, autoResubscribe=False) + await self.devCtrl.ReadAttribute(nodeid, [(endpoint, Clusters.BasicInformation.Attributes.NodeLabel)], None, + False, reportInterval=(1, 50), + keepSubscriptions=True, autoResubscribe=False) if restartRemoteThread.is_alive(): # Thread join timed out diff --git a/src/controller/python/test/test_scripts/mobile-device-test.py b/src/controller/python/test/test_scripts/mobile-device-test.py index 9ceaa35d24c291..33ae713fe02cb2 100755 --- a/src/controller/python/test/test_scripts/mobile-device-test.py +++ b/src/controller/python/test/test_scripts/mobile-device-test.py @@ -129,9 +129,8 @@ def TestDatamodel(test: BaseTestHelper, device_nodeid: int): "Failed to test Read Basic Attributes") logger.info("Testing attribute writing") - FailIfNot(test.TestWriteBasicAttributes(nodeid=device_nodeid, - endpoint=ENDPOINT_ID, - group=GROUP_ID), + FailIfNot(asyncio.run(test.TestWriteBasicAttributes(nodeid=device_nodeid, + endpoint=ENDPOINT_ID)), "Failed to test Write Basic Attributes") logger.info("Testing attribute reading basic again") @@ -141,11 +140,11 @@ def TestDatamodel(test: BaseTestHelper, device_nodeid: int): "Failed to test Read Basic Attributes") logger.info("Testing subscription") - FailIfNot(test.TestSubscription(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID), + FailIfNot(asyncio.run(test.TestSubscription(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID)), "Failed to subscribe attributes.") logger.info("Testing another subscription that kills previous subscriptions") - FailIfNot(test.TestSubscription(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID), + FailIfNot(asyncio.run(test.TestSubscription(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID)), "Failed to subscribe attributes.") logger.info("Testing re-subscription") diff --git a/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl1.py b/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl1.py index 19065b8a35396a..e02564e293c04a 100755 --- a/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl1.py +++ b/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl1.py @@ -19,6 +19,7 @@ # Commissioning test. +import asyncio import os import sys from optparse import OptionParser @@ -113,8 +114,8 @@ def main(): "Failed on on-network commissioing") FailIfNot( - test.TestSubscriptionResumptionCapacityStep1( - options.nodeid, TEST_ENDPOINT_ID, options.setuppin, options.subscriptionCapacity), + asyncio.run(test.TestSubscriptionResumptionCapacityStep1( + options.nodeid, TEST_ENDPOINT_ID, options.setuppin, options.subscriptionCapacity)), "Failed on step 1 of testing subscription resumption capacity") timeoutTicker.stop() diff --git a/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl2.py b/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl2.py index 2f3058afcd3bca..ac449a9f5478ac 100755 --- a/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl2.py +++ b/src/controller/python/test/test_scripts/subscription_resumption_capacity_test_ctrl2.py @@ -19,6 +19,7 @@ # Commissioning test. +import asyncio import os import sys from optparse import OptionParser @@ -125,8 +126,9 @@ def main(): "Failed on on-network commissioing") FailIfNot( - test.TestSubscriptionResumptionCapacityStep2(options.nodeid, TEST_ENDPOINT_ID, options.deviceAddress, - TEST_SSH_PORT, options.remoteServerApp, options.subscriptionCapacity), + asyncio.run( + test.TestSubscriptionResumptionCapacityStep2(options.nodeid, TEST_ENDPOINT_ID, options.deviceAddress, + TEST_SSH_PORT, options.remoteServerApp, options.subscriptionCapacity)), "Failed on testing subscription resumption capacity") timeoutTicker.stop() diff --git a/src/controller/python/test/test_scripts/subscription_resumption_test.py b/src/controller/python/test/test_scripts/subscription_resumption_test.py index 8b2000fb070cd7..79edf6a2898d0e 100755 --- a/src/controller/python/test/test_scripts/subscription_resumption_test.py +++ b/src/controller/python/test/test_scripts/subscription_resumption_test.py @@ -19,6 +19,7 @@ # Commissioning test. +import asyncio import os import sys from optparse import OptionParser @@ -115,8 +116,8 @@ def main(): "Failed on on-network commissioing") FailIfNot( - test.TestSubscriptionResumption(options.nodeid, TEST_ENDPOINT_ID, options.deviceAddress, - TEST_SSH_PORT, options.remoteServerApp), "Failed to resume subscription") + asyncio.run(test.TestSubscriptionResumption(options.nodeid, TEST_ENDPOINT_ID, options.deviceAddress, + TEST_SSH_PORT, options.remoteServerApp)), "Failed to resume subscription") timeoutTicker.stop() diff --git a/src/controller/python/test/test_scripts/subscription_resumption_timeout_test.py b/src/controller/python/test/test_scripts/subscription_resumption_timeout_test.py index 1f6411f63699c3..4932e5b4cc0582 100755 --- a/src/controller/python/test/test_scripts/subscription_resumption_timeout_test.py +++ b/src/controller/python/test/test_scripts/subscription_resumption_timeout_test.py @@ -19,11 +19,13 @@ # Commissioning test. +import asyncio import os import sys from optparse import OptionParser from base import BaseTestHelper, FailIfNot, TestFail, TestTimeout, logger +from chip import clusters as Clusters TEST_DISCRIMINATOR = 3840 TEST_SETUPPIN = 20202021 @@ -101,10 +103,12 @@ def main(): FailIfNot( test.TestOnNetworkCommissioning(options.discriminator, options.setuppin, options.nodeid, options.deviceAddress), - "Failed on on-network commissioing") + "Failed on on-network commissioning") + try: - test.devCtrl.ZCLSubscribeAttribute("BasicInformation", "NodeLabel", options.nodeid, TEST_ENDPOINT_ID, 1, 2, - keepSubscriptions=True, autoResubscribe=False) + asyncio.run(test.devCtrl.ReadAttribute(options.nodeid, + [(TEST_ENDPOINT_ID, Clusters.BasicInformation.Attributes.NodeLabel)], + None, False, reportInterval=(1, 2), keepSubscriptions=True, autoResubscribe=False)) except Exception as ex: TestFail(f"Failed to subscribe attribute: {ex}")