[Python] Call SDK asyncio friendly (#32764)
* [Python] Rename CallAsync to CallAsyncWithCallback
CallAsync continuously calls a callback function during the wait
for the call. Rename the function to reflect that fact.
This frees up CallAsync for an asyncio friendly implementation.
* [Python] Implement asyncio variant of CallAsync
Call Matter SDK in a asyncio friendly way. During posting of the task
onto the CHIP mainloop, it makes sure that the asyncio loop is not
blocked.
* [Python] Use CallAsync where appropriate
* Rename AsyncSimpleCallableHandle to AsyncioCallableHandle
* Rename CallAsyncWithCallback to CallAsyncWithCompleteCallback
Also add a comment that the function needs to be released by registering
a callback and setting the complete event.
* Add comments about lock
diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py
index 4e57e3d..d63a377 100644
--- a/src/controller/python/chip/ChipDeviceCtrl.py
+++ b/src/controller/python/chip/ChipDeviceCtrl.py
@@ -186,7 +186,7 @@
def __del__(self):
if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
# This destructor is called from any threading context, including on the Matter threading context.
- # So, we cannot call chipStack.Call or chipStack.CallAsync which waits for the posted work to
+ # So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to
# actually be executed. Instead, we just post/schedule the work and move on.
builtins.chipStack.PostTaskOnChipThread(lambda: self._dmLib.pychip_FreeOperationalDeviceProxy(self._deviceProxy))
@@ -447,7 +447,7 @@
self.state = DCState.COMMISSIONING
self._enablePairingCompeleteCallback(True)
- self._ChipStack.CallAsync(
+ self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, setupPinCode, nodeid)
).raise_on_error()
@@ -459,7 +459,7 @@
def UnpairDevice(self, nodeid: int):
self.CheckIsActive()
- return self._ChipStack.CallAsync(
+ return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
).raise_on_error()
@@ -498,7 +498,7 @@
self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
- return self._ChipStack.CallAsync(
+ return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE(
self.devCtrl, setupPinCode, discriminator, nodeid)
)
@@ -508,7 +508,7 @@
self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
- return self._ChipStack.CallAsync(
+ return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port)
)
@@ -518,7 +518,7 @@
self.state = DCState.RENDEZVOUS_ONGOING
self._enablePairingCompeleteCallback(True)
- return self._ChipStack.CallAsync(
+ return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESession(
self.devCtrl, setUpCode.encode("utf-8"), nodeid)
)
@@ -737,7 +737,7 @@
Returns CommissioningParameters
'''
self.CheckIsActive()
- self._ChipStack.CallAsync(
+ self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
).raise_on_error()
@@ -858,7 +858,7 @@
if allowPASE:
returnDevice = c_void_p(None)
- res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
+ res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
logging.info('Using PASE connection')
@@ -888,11 +888,12 @@
closure = DeviceAvailableClosure(eventLoop, future)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
- self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
+ res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback),
- timeoutMs).raise_on_error()
+ timeoutMs)
+ res.raise_on_error()
- # The callback might have been received synchronously (during self._ChipStack.Call()).
+ # The callback might have been received synchronously (during self._ChipStack.CallAsync()).
# In that case the Future has already been set it will return immediately
if timeoutMs is not None:
timeout = float(timeoutMs) / 1000
@@ -1020,13 +1021,14 @@
future = eventLoop.create_future()
device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
- ClusterCommand.SendCommand(
+ res = await ClusterCommand.SendCommand(
future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath(
EndpointId=endpoint,
ClusterId=payload.cluster_id,
CommandId=payload.command_id,
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs,
- interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
+ interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
+ res.raise_on_error()
return await future
async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo],
@@ -1062,10 +1064,11 @@
device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
- ClusterCommand.SendBatchCommands(
+ res = await ClusterCommand.SendBatchCommands(
future, eventLoop, device.deviceProxy, commands,
timedRequestTimeoutMs=timedRequestTimeoutMs,
- interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
+ interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
+ res.raise_on_error()
return await future
def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Union[None, int] = None):
@@ -1895,7 +1898,7 @@
self._ChipStack.commissioningCompleteEvent.clear()
self.state = DCState.COMMISSIONING
- self._ChipStack.CallAsync(
+ self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
@@ -2011,7 +2014,7 @@
self._ChipStack.commissioningCompleteEvent.clear()
self._enablePairingCompeleteCallback(True)
- self._ChipStack.CallAsync(
+ self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec)
)
@@ -2035,7 +2038,7 @@
self._ChipStack.commissioningCompleteEvent.clear()
self._enablePairingCompeleteCallback(True)
- self._ChipStack.CallAsync(
+ self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload, nodeid, discoveryType.value)
)
@@ -2055,7 +2058,7 @@
self._ChipStack.commissioningCompleteEvent.clear()
self._enablePairingCompeleteCallback(True)
- self._ChipStack.CallAsync(
+ self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
@@ -2069,7 +2072,7 @@
The NOC chain will be provided in TLV cert format."""
self.CheckIsActive()
- return self._ChipStack.CallAsync(
+ return self._ChipStack.CallAsyncWithCompleteCallback(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
)
diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py
index 6df7e41..35f9e24 100644
--- a/src/controller/python/chip/ChipStack.py
+++ b/src/controller/python/chip/ChipStack.py
@@ -26,6 +26,7 @@
from __future__ import absolute_import, print_function
+import asyncio
import builtins
import logging
import os
@@ -164,6 +165,35 @@
return self._res
+class AsyncioCallableHandle:
+ """Class which handles Matter SDK Calls asyncio friendly"""
+
+ def __init__(self, callback):
+ self._callback = callback
+ self._loop = asyncio.get_event_loop()
+ self._future = self._loop.create_future()
+ self._result = None
+ self._exception = None
+
+ @property
+ def future(self):
+ return self._future
+
+ def _done(self):
+ if self._exception:
+ self._future.set_exception(self._exception)
+ else:
+ self._future.set_result(self._result)
+
+ def __call__(self):
+ try:
+ self._result = self._callback()
+ except Exception as ex:
+ self._exception = ex
+ self._loop.call_soon_threadsafe(self._done)
+ pythonapi.Py_DecRef(py_object(self))
+
+
_CompleteFunct = CFUNCTYPE(None, c_void_p, c_void_p)
_ErrorFunct = CFUNCTYPE(None, c_void_p, c_void_p,
c_ulong, POINTER(DeviceStatusStruct))
@@ -178,6 +208,7 @@
bluetoothAdapter=None, enableServerInteractions=True):
builtins.enableDebugMode = False
+ # TODO: Probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
self.networkLock = Lock()
self.completeEvent = Event()
self.commissioningCompleteEvent = Event()
@@ -318,6 +349,7 @@
logFunct = 0
if not isinstance(logFunct, _LogMessageFunct):
logFunct = _LogMessageFunct(logFunct)
+ # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
with self.networkLock:
# NOTE: ChipStack must hold a reference to the CFUNCTYPE object while it is
# set. Otherwise it may get garbage collected, and logging calls from the
@@ -360,6 +392,7 @@
# throw error if op in progress
self.callbackRes = None
self.completeEvent.clear()
+ # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
with self.networkLock:
res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)
self.completeEvent.set()
@@ -367,14 +400,32 @@
return self.callbackRes
return res
- def CallAsync(self, callFunct):
+ async def CallAsync(self, callFunct, timeoutMs: int = None):
+ '''Run a Python function on CHIP stack, and wait for the response.
+ This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner.
+ '''
+ callObj = AsyncioCallableHandle(callFunct)
+ pythonapi.Py_IncRef(py_object(callObj))
+
+ res = self._ChipStackLib.pychip_DeviceController_PostTaskOnChipThread(
+ self.cbHandleChipThreadRun, py_object(callObj))
+
+ if not res.is_success:
+ pythonapi.Py_DecRef(py_object(callObj))
+ raise res.to_exception()
+
+ return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)
+
+ def CallAsyncWithCompleteCallback(self, callFunct):
'''Run a Python function on CHIP stack, and wait for the application specific response.
This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
+ Make sure to register the necessary callbacks which release the function by setting the completeEvent.
'''
# throw error if op in progress
self.callbackRes = None
self.completeEvent.clear()
+ # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
with self.networkLock:
res = self.PostTaskOnChipThread(callFunct).Wait()
diff --git a/src/controller/python/chip/clusters/Command.py b/src/controller/python/chip/clusters/Command.py
index 89aae53..6ef25cb 100644
--- a/src/controller/python/chip/clusters/Command.py
+++ b/src/controller/python/chip/clusters/Command.py
@@ -291,9 +291,9 @@
))
-def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
- timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None, busyWaitMs: Union[None, int] = None,
- suppressResponse: Union[None, bool] = None) -> PyChipError:
+async def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
+ timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None,
+ busyWaitMs: Union[None, int] = None, suppressResponse: Union[None, bool] = None) -> PyChipError:
''' Send a cluster-object encapsulated command to a device and does the following:
- On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
- None (on a successful response containing no data)
@@ -316,7 +316,7 @@
payloadTLV = payload.ToTLV()
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
- return builtins.chipStack.Call(
+ return await builtins.chipStack.CallAsync(
lambda: handle.pychip_CommandSender_SendCommand(
ctypes.py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId,
@@ -353,9 +353,9 @@
return pyBatchCommandsData
-def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
- timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, busyWaitMs: Optional[int] = None,
- suppressResponse: Optional[bool] = None) -> PyChipError:
+async def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
+ timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None,
+ busyWaitMs: Optional[int] = None, suppressResponse: Optional[bool] = None) -> PyChipError:
''' Initiates an InvokeInteraction with the batch commands provided.
Arguments:
@@ -388,7 +388,7 @@
transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
- return builtins.chipStack.Call(
+ return await builtins.chipStack.CallAsync(
lambda: handle.pychip_CommandSender_SendBatchCommands(
py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),