@@ -1243,79 +1243,67 @@ class AttributeWriteRequest:
1243
1243
return False
1244
1244
return True
1245
1245
1246
- def TestSubscription (self , nodeid : int , endpoint : int ):
1246
+ async def TestSubscription (self , nodeid : int , endpoint : int ):
1247
1247
desiredPath = None
1248
1248
receivedUpdate = 0
1249
- updateLock = threading . Lock ()
1250
- updateCv = threading . Condition ( updateLock )
1249
+ updateEvent = asyncio . Event ()
1250
+ loop = asyncio . get_running_loop ( )
1251
1251
1252
1252
def OnValueChange (path : Attribute .TypedAttributePath , transaction : Attribute .SubscriptionTransaction ) -> None :
1253
- nonlocal desiredPath , updateCv , updateLock , receivedUpdate
1253
+ nonlocal desiredPath , updateEvent , receivedUpdate
1254
1254
if path .Path != desiredPath :
1255
1255
return
1256
1256
1257
1257
data = transaction .GetAttribute (path )
1258
1258
logger .info (
1259
1259
f"Received report from server: path: { path .Path } , value: { data } " )
1260
- with updateLock :
1261
- receivedUpdate += 1
1262
- updateCv .notify_all ()
1260
+ receivedUpdate += 1
1261
+ loop .call_soon_threadsafe (updateEvent .set )
1263
1262
1264
- class _conductAttributeChange (threading .Thread ):
1265
- def __init__ (self , devCtrl : ChipDeviceCtrl .ChipDeviceController , nodeid : int , endpoint : int ):
1266
- super (_conductAttributeChange , self ).__init__ ()
1267
- self .nodeid = nodeid
1268
- self .endpoint = endpoint
1269
- self .devCtrl = devCtrl
1270
-
1271
- def run (self ):
1272
- for i in range (5 ):
1273
- time .sleep (3 )
1274
- self .devCtrl .ZCLSend (
1275
- "OnOff" , "Toggle" , self .nodeid , self .endpoint , 0 , {})
1263
+ async def _conductAttributeChange (devCtrl : ChipDeviceCtrl .ChipDeviceController , nodeid : int , endpoint : int ):
1264
+ for i in range (5 ):
1265
+ await asyncio .sleep (3 )
1266
+ await self .devCtrl .SendCommand (nodeid , endpoint , Clusters .OnOff .Commands .Toggle ())
1276
1267
1277
1268
try :
1278
1269
desiredPath = Clusters .Attribute .AttributePath (
1279
1270
EndpointId = 1 , ClusterId = 6 , AttributeId = 0 )
1280
1271
# OnOff Cluster, OnOff Attribute
1281
- subscription = self .devCtrl .ZCLSubscribeAttribute (
1282
- "OnOff" , "OnOff" , nodeid , endpoint , 1 , 10 )
1272
+ subscription = await self .devCtrl .ReadAttribute ( nodeid , [( endpoint , Clusters . OnOff . Attributes . OnOff )], None , False , reportInterval = ( 1 , 10 ),
1273
+ keepSubscriptions = False , autoResubscribe = True )
1283
1274
subscription .SetAttributeUpdateCallback (OnValueChange )
1284
- changeThread = _conductAttributeChange (
1285
- self .devCtrl , nodeid , endpoint )
1286
1275
# Reset the number of subscriptions received as subscribing causes a callback.
1287
- changeThread .start ()
1288
- with updateCv :
1289
- while receivedUpdate < 5 :
1290
- # We should observe 5 attribute changes
1291
- # The changing thread will change the value after 3 seconds. If we're waiting more than 10, assume something
1292
- # is really wrong and bail out here with some information.
1293
- if not updateCv .wait (10.0 ):
1294
- self .logger .error (
1295
- "Failed to receive subscription update" )
1296
- break
1276
+ taskAttributeChange = loop .create_task (_conductAttributeChange (self .devCtrl , nodeid , endpoint ))
1277
+
1278
+ while receivedUpdate < 5 :
1279
+ # We should observe 5 attribute changes
1280
+ # The changing thread will change the value after 3 seconds. If we're waiting more than 10, assume something
1281
+ # is really wrong and bail out here with some information.
1282
+ try :
1283
+ await asyncio .wait_for (updateEvent .wait (), 10 )
1284
+ updateEvent .clear ()
1285
+ except TimeoutError :
1286
+ self .logger .error (
1287
+ "Failed to receive subscription update" )
1288
+ break
1297
1289
1298
1290
# thread changes 5 times, and sleeps for 3 seconds in between.
1299
1291
# Add an additional 3 seconds of slack. Timeout is in seconds.
1300
- changeThread . join ( 18.0 )
1292
+ await asyncio . wait_for ( taskAttributeChange , 3 )
1301
1293
1294
+ return True if receivedUpdate == 5 else False
1295
+
1296
+ except Exception as ex :
1297
+ self .logger .exception (f"Failed to finish API test: { ex } " )
1298
+ return False
1299
+ finally :
1302
1300
#
1303
1301
# Clean-up by shutting down the sub. Otherwise, we're going to get callbacks through
1304
1302
# OnValueChange on what will soon become an invalid
1305
1303
# execution context above.
1306
1304
#
1307
1305
subscription .Shutdown ()
1308
1306
1309
- if changeThread .is_alive ():
1310
- # Thread join timed out
1311
- self .logger .error ("Failed to join change thread" )
1312
- return False
1313
-
1314
- return True if receivedUpdate == 5 else False
1315
-
1316
- except Exception as ex :
1317
- self .logger .exception (f"Failed to finish API test: { ex } " )
1318
- return False
1319
1307
1320
1308
return True
1321
1309
0 commit comments