Skip to content

Commit 4d0a1f0

Browse files
authored
[Python] Call SDK asyncio friendly (#32764)
* [Python] Rename CallAsync to CallAsyncWithCallback CallAsync continuously calls a callback function during the wait for the call. Rename the function to reflect that fact. This frees up CallAsync for an asyncio friendly implementation. * [Python] Implement asyncio variant of CallAsync Call Matter SDK in a asyncio friendly way. During posting of the task onto the CHIP mainloop, it makes sure that the asyncio loop is not blocked. * [Python] Use CallAsync where appropriate * Rename AsyncSimpleCallableHandle to AsyncioCallableHandle * Rename CallAsyncWithCallback to CallAsyncWithCompleteCallback Also add a comment that the function needs to be released by registering a callback and setting the complete event. * Add comments about lock
1 parent 3e6657c commit 4d0a1f0

File tree

3 files changed

+83
-29
lines changed

3 files changed

+83
-29
lines changed

src/controller/python/chip/ChipDeviceCtrl.py

+23-20
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None):
186186
def __del__(self):
187187
if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
188188
# This destructor is called from any threading context, including on the Matter threading context.
189-
# So, we cannot call chipStack.Call or chipStack.CallAsync which waits for the posted work to
189+
# So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to
190190
# actually be executed. Instead, we just post/schedule the work and move on.
191191
builtins.chipStack.PostTaskOnChipThread(lambda: self._dmLib.pychip_FreeOperationalDeviceProxy(self._deviceProxy))
192192

@@ -447,7 +447,7 @@ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError:
447447

448448
self.state = DCState.COMMISSIONING
449449
self._enablePairingCompeleteCallback(True)
450-
self._ChipStack.CallAsync(
450+
self._ChipStack.CallAsyncWithCompleteCallback(
451451
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
452452
self.devCtrl, discriminator, setupPinCode, nodeid)
453453
).raise_on_error()
@@ -459,7 +459,7 @@ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError:
459459
def UnpairDevice(self, nodeid: int):
460460
self.CheckIsActive()
461461

462-
return self._ChipStack.CallAsync(
462+
return self._ChipStack.CallAsyncWithCompleteCallback(
463463
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
464464
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
465465
).raise_on_error()
@@ -498,7 +498,7 @@ def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid:
498498

499499
self.state = DCState.RENDEZVOUS_ONGOING
500500
self._enablePairingCompeleteCallback(True)
501-
return self._ChipStack.CallAsync(
501+
return self._ChipStack.CallAsyncWithCompleteCallback(
502502
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE(
503503
self.devCtrl, setupPinCode, discriminator, nodeid)
504504
)
@@ -508,7 +508,7 @@ def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, po
508508

509509
self.state = DCState.RENDEZVOUS_ONGOING
510510
self._enablePairingCompeleteCallback(True)
511-
return self._ChipStack.CallAsync(
511+
return self._ChipStack.CallAsyncWithCompleteCallback(
512512
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
513513
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port)
514514
)
@@ -518,7 +518,7 @@ def EstablishPASESession(self, setUpCode: str, nodeid: int):
518518

519519
self.state = DCState.RENDEZVOUS_ONGOING
520520
self._enablePairingCompeleteCallback(True)
521-
return self._ChipStack.CallAsync(
521+
return self._ChipStack.CallAsyncWithCompleteCallback(
522522
lambda: self._dmLib.pychip_DeviceController_EstablishPASESession(
523523
self.devCtrl, setUpCode.encode("utf-8"), nodeid)
524524
)
@@ -737,7 +737,7 @@ def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int,
737737
Returns CommissioningParameters
738738
'''
739739
self.CheckIsActive()
740-
self._ChipStack.CallAsync(
740+
self._ChipStack.CallAsyncWithCompleteCallback(
741741
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
742742
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
743743
).raise_on_error()
@@ -858,7 +858,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in
858858

859859
if allowPASE:
860860
returnDevice = c_void_p(None)
861-
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
861+
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
862862
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
863863
if res.is_success:
864864
logging.info('Using PASE connection')
@@ -888,11 +888,12 @@ def deviceAvailable(self, device, err):
888888

889889
closure = DeviceAvailableClosure(eventLoop, future)
890890
ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
891-
self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
891+
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
892892
self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback),
893-
timeoutMs).raise_on_error()
893+
timeoutMs)
894+
res.raise_on_error()
894895

895-
# The callback might have been received synchronously (during self._ChipStack.Call()).
896+
# The callback might have been received synchronously (during self._ChipStack.CallAsync()).
896897
# In that case the Future has already been set it will return immediately
897898
if timeoutMs is not None:
898899
timeout = float(timeoutMs) / 1000
@@ -1020,13 +1021,14 @@ async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.
10201021
future = eventLoop.create_future()
10211022

10221023
device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
1023-
ClusterCommand.SendCommand(
1024+
res = await ClusterCommand.SendCommand(
10241025
future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath(
10251026
EndpointId=endpoint,
10261027
ClusterId=payload.cluster_id,
10271028
CommandId=payload.command_id,
10281029
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs,
1029-
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
1030+
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
1031+
res.raise_on_error()
10301032
return await future
10311033

10321034
async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo],
@@ -1062,10 +1064,11 @@ async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterComm
10621064

10631065
device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
10641066

1065-
ClusterCommand.SendBatchCommands(
1067+
res = await ClusterCommand.SendBatchCommands(
10661068
future, eventLoop, device.deviceProxy, commands,
10671069
timedRequestTimeoutMs=timedRequestTimeoutMs,
1068-
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
1070+
interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
1071+
res.raise_on_error()
10691072
return await future
10701073

10711074
def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Union[None, int] = None):
@@ -1895,7 +1898,7 @@ def Commission(self, nodeid) -> PyChipError:
18951898
self._ChipStack.commissioningCompleteEvent.clear()
18961899
self.state = DCState.COMMISSIONING
18971900

1898-
self._ChipStack.CallAsync(
1901+
self._ChipStack.CallAsyncWithCompleteCallback(
18991902
lambda: self._dmLib.pychip_DeviceController_Commission(
19001903
self.devCtrl, nodeid)
19011904
)
@@ -2011,7 +2014,7 @@ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int,
20112014
self._ChipStack.commissioningCompleteEvent.clear()
20122015

20132016
self._enablePairingCompeleteCallback(True)
2014-
self._ChipStack.CallAsync(
2017+
self._ChipStack.CallAsyncWithCompleteCallback(
20152018
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
20162019
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec)
20172020
)
@@ -2035,7 +2038,7 @@ def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: Disc
20352038
self._ChipStack.commissioningCompleteEvent.clear()
20362039

20372040
self._enablePairingCompeleteCallback(True)
2038-
self._ChipStack.CallAsync(
2041+
self._ChipStack.CallAsyncWithCompleteCallback(
20392042
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
20402043
self.devCtrl, setupPayload, nodeid, discoveryType.value)
20412044
)
@@ -2055,7 +2058,7 @@ def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipErr
20552058
self._ChipStack.commissioningCompleteEvent.clear()
20562059

20572060
self._enablePairingCompeleteCallback(True)
2058-
self._ChipStack.CallAsync(
2061+
self._ChipStack.CallAsyncWithCompleteCallback(
20592062
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
20602063
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
20612064
)
@@ -2069,7 +2072,7 @@ def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRRespons
20692072
The NOC chain will be provided in TLV cert format."""
20702073
self.CheckIsActive()
20712074

2072-
return self._ChipStack.CallAsync(
2075+
return self._ChipStack.CallAsyncWithCompleteCallback(
20732076
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
20742077
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
20752078
)

src/controller/python/chip/ChipStack.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from __future__ import absolute_import, print_function
2828

29+
import asyncio
2930
import builtins
3031
import logging
3132
import os
@@ -164,6 +165,35 @@ def Wait(self, timeoutMs: int = None):
164165
return self._res
165166

166167

168+
class AsyncioCallableHandle:
169+
"""Class which handles Matter SDK Calls asyncio friendly"""
170+
171+
def __init__(self, callback):
172+
self._callback = callback
173+
self._loop = asyncio.get_event_loop()
174+
self._future = self._loop.create_future()
175+
self._result = None
176+
self._exception = None
177+
178+
@property
179+
def future(self):
180+
return self._future
181+
182+
def _done(self):
183+
if self._exception:
184+
self._future.set_exception(self._exception)
185+
else:
186+
self._future.set_result(self._result)
187+
188+
def __call__(self):
189+
try:
190+
self._result = self._callback()
191+
except Exception as ex:
192+
self._exception = ex
193+
self._loop.call_soon_threadsafe(self._done)
194+
pythonapi.Py_DecRef(py_object(self))
195+
196+
167197
_CompleteFunct = CFUNCTYPE(None, c_void_p, c_void_p)
168198
_ErrorFunct = CFUNCTYPE(None, c_void_p, c_void_p,
169199
c_ulong, POINTER(DeviceStatusStruct))
@@ -178,6 +208,7 @@ def __init__(self, persistentStoragePath: str, installDefaultLogHandler=True,
178208
bluetoothAdapter=None, enableServerInteractions=True):
179209
builtins.enableDebugMode = False
180210

211+
# TODO: Probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
181212
self.networkLock = Lock()
182213
self.completeEvent = Event()
183214
self.commissioningCompleteEvent = Event()
@@ -318,6 +349,7 @@ def setLogFunct(self, logFunct):
318349
logFunct = 0
319350
if not isinstance(logFunct, _LogMessageFunct):
320351
logFunct = _LogMessageFunct(logFunct)
352+
# TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
321353
with self.networkLock:
322354
# NOTE: ChipStack must hold a reference to the CFUNCTYPE object while it is
323355
# set. Otherwise it may get garbage collected, and logging calls from the
@@ -360,21 +392,40 @@ def Call(self, callFunct, timeoutMs: int = None):
360392
# throw error if op in progress
361393
self.callbackRes = None
362394
self.completeEvent.clear()
395+
# TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
363396
with self.networkLock:
364397
res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)
365398
self.completeEvent.set()
366399
if res == 0 and self.callbackRes is not None:
367400
return self.callbackRes
368401
return res
369402

370-
def CallAsync(self, callFunct):
403+
async def CallAsync(self, callFunct, timeoutMs: int = None):
404+
'''Run a Python function on CHIP stack, and wait for the response.
405+
This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner.
406+
'''
407+
callObj = AsyncioCallableHandle(callFunct)
408+
pythonapi.Py_IncRef(py_object(callObj))
409+
410+
res = self._ChipStackLib.pychip_DeviceController_PostTaskOnChipThread(
411+
self.cbHandleChipThreadRun, py_object(callObj))
412+
413+
if not res.is_success:
414+
pythonapi.Py_DecRef(py_object(callObj))
415+
raise res.to_exception()
416+
417+
return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)
418+
419+
def CallAsyncWithCompleteCallback(self, callFunct):
371420
'''Run a Python function on CHIP stack, and wait for the application specific response.
372421
This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
373422
Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
423+
Make sure to register the necessary callbacks which release the function by setting the completeEvent.
374424
'''
375425
# throw error if op in progress
376426
self.callbackRes = None
377427
self.completeEvent.clear()
428+
# TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
378429
with self.networkLock:
379430
res = self.PostTaskOnChipThread(callFunct).Wait()
380431

src/controller/python/chip/clusters/Command.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,9 @@ def TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(future: Future, eventLo
291291
))
292292

293293

294-
def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
295-
timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None, busyWaitMs: Union[None, int] = None,
296-
suppressResponse: Union[None, bool] = None) -> PyChipError:
294+
async def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
295+
timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None,
296+
busyWaitMs: Union[None, int] = None, suppressResponse: Union[None, bool] = None) -> PyChipError:
297297
''' Send a cluster-object encapsulated command to a device and does the following:
298298
- On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
299299
- None (on a successful response containing no data)
@@ -316,7 +316,7 @@ def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPa
316316

317317
payloadTLV = payload.ToTLV()
318318
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
319-
return builtins.chipStack.Call(
319+
return await builtins.chipStack.CallAsync(
320320
lambda: handle.pychip_CommandSender_SendCommand(
321321
ctypes.py_object(transaction), device,
322322
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId,
@@ -353,9 +353,9 @@ def _BuildPyInvokeRequestData(commands: List[InvokeRequestInfo], timedRequestTim
353353
return pyBatchCommandsData
354354

355355

356-
def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
357-
timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, busyWaitMs: Optional[int] = None,
358-
suppressResponse: Optional[bool] = None) -> PyChipError:
356+
async def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
357+
timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None,
358+
busyWaitMs: Optional[int] = None, suppressResponse: Optional[bool] = None) -> PyChipError:
359359
''' Initiates an InvokeInteraction with the batch commands provided.
360360
361361
Arguments:
@@ -388,7 +388,7 @@ def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRe
388388
transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
389389
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
390390

391-
return builtins.chipStack.Call(
391+
return await builtins.chipStack.CallAsync(
392392
lambda: handle.pychip_CommandSender_SendBatchCommands(
393393
py_object(transaction), device,
394394
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),

0 commit comments

Comments
 (0)