[Python] Call SDK asyncio friendly (#32764)

* [Python] Rename CallAsync to CallAsyncWithCallback

CallAsync continuously calls a callback function during the wait
for the call. Rename the function to reflect that fact.

This frees up CallAsync for an asyncio friendly implementation.

* [Python] Implement asyncio variant of CallAsync

Call Matter SDK in a asyncio friendly way. During posting of the task
onto the CHIP mainloop, it makes sure that the asyncio loop is not
blocked.

* [Python] Use CallAsync where appropriate

* Rename AsyncSimpleCallableHandle to AsyncioCallableHandle

* Rename CallAsyncWithCallback to CallAsyncWithCompleteCallback

Also add a comment that the function needs to be released by registering
a callback and setting the complete event.

* Add comments about lock
diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py
index 4e57e3d..d63a377 100644
--- a/src/controller/python/chip/ChipDeviceCtrl.py
+++ b/src/controller/python/chip/ChipDeviceCtrl.py
@@ -186,7 +186,7 @@
     def __del__(self):
         if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
             # This destructor is called from any threading context, including on the Matter threading context.
-            # So, we cannot call chipStack.Call or chipStack.CallAsync which waits for the posted work to
+            # So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to
             # actually be executed. Instead, we just post/schedule the work and move on.
             builtins.chipStack.PostTaskOnChipThread(lambda: self._dmLib.pychip_FreeOperationalDeviceProxy(self._deviceProxy))
 
@@ -447,7 +447,7 @@
 
         self.state = DCState.COMMISSIONING
         self._enablePairingCompeleteCallback(True)
-        self._ChipStack.CallAsync(
+        self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
                 self.devCtrl, discriminator, setupPinCode, nodeid)
         ).raise_on_error()
@@ -459,7 +459,7 @@
     def UnpairDevice(self, nodeid: int):
         self.CheckIsActive()
 
-        return self._ChipStack.CallAsync(
+        return self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
                 self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
         ).raise_on_error()
@@ -498,7 +498,7 @@
 
         self.state = DCState.RENDEZVOUS_ONGOING
         self._enablePairingCompeleteCallback(True)
-        return self._ChipStack.CallAsync(
+        return self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE(
                 self.devCtrl, setupPinCode, discriminator, nodeid)
         )
@@ -508,7 +508,7 @@
 
         self.state = DCState.RENDEZVOUS_ONGOING
         self._enablePairingCompeleteCallback(True)
-        return self._ChipStack.CallAsync(
+        return self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
                 self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port)
         )
@@ -518,7 +518,7 @@
 
         self.state = DCState.RENDEZVOUS_ONGOING
         self._enablePairingCompeleteCallback(True)
-        return self._ChipStack.CallAsync(
+        return self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_EstablishPASESession(
                 self.devCtrl, setUpCode.encode("utf-8"), nodeid)
         )
@@ -737,7 +737,7 @@
             Returns CommissioningParameters
         '''
         self.CheckIsActive()
-        self._ChipStack.CallAsync(
+        self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
                 self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
         ).raise_on_error()
@@ -858,7 +858,7 @@
 
         if allowPASE:
             returnDevice = c_void_p(None)
-            res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
+            res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
                 self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
             if res.is_success:
                 logging.info('Using PASE connection')
@@ -888,11 +888,12 @@
 
         closure = DeviceAvailableClosure(eventLoop, future)
         ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
-        self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
+        res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
             self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback),
-            timeoutMs).raise_on_error()
+            timeoutMs)
+        res.raise_on_error()
 
-        # The callback might have been received synchronously (during self._ChipStack.Call()).
+        # The callback might have been received synchronously (during self._ChipStack.CallAsync()).
         # In that case the Future has already been set it will return immediately
         if timeoutMs is not None:
             timeout = float(timeoutMs) / 1000
@@ -1020,13 +1021,14 @@
         future = eventLoop.create_future()
 
         device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
-        ClusterCommand.SendCommand(
+        res = await ClusterCommand.SendCommand(
             future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath(
                 EndpointId=endpoint,
                 ClusterId=payload.cluster_id,
                 CommandId=payload.command_id,
             ), payload, timedRequestTimeoutMs=timedRequestTimeoutMs,
-            interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
+            interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
+        res.raise_on_error()
         return await future
 
     async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo],
@@ -1062,10 +1064,11 @@
 
         device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs)
 
-        ClusterCommand.SendBatchCommands(
+        res = await ClusterCommand.SendBatchCommands(
             future, eventLoop, device.deviceProxy, commands,
             timedRequestTimeoutMs=timedRequestTimeoutMs,
-            interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse).raise_on_error()
+            interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse)
+        res.raise_on_error()
         return await future
 
     def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Union[None, int] = None):
@@ -1895,7 +1898,7 @@
         self._ChipStack.commissioningCompleteEvent.clear()
         self.state = DCState.COMMISSIONING
 
-        self._ChipStack.CallAsync(
+        self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_Commission(
                 self.devCtrl, nodeid)
         )
@@ -2011,7 +2014,7 @@
         self._ChipStack.commissioningCompleteEvent.clear()
 
         self._enablePairingCompeleteCallback(True)
-        self._ChipStack.CallAsync(
+        self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
                 self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec)
         )
@@ -2035,7 +2038,7 @@
         self._ChipStack.commissioningCompleteEvent.clear()
 
         self._enablePairingCompeleteCallback(True)
-        self._ChipStack.CallAsync(
+        self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
                 self.devCtrl, setupPayload, nodeid, discoveryType.value)
         )
@@ -2055,7 +2058,7 @@
         self._ChipStack.commissioningCompleteEvent.clear()
 
         self._enablePairingCompeleteCallback(True)
-        self._ChipStack.CallAsync(
+        self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_ConnectIP(
                 self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
         )
@@ -2069,7 +2072,7 @@
         The NOC chain will be provided in TLV cert format."""
         self.CheckIsActive()
 
-        return self._ChipStack.CallAsync(
+        return self._ChipStack.CallAsyncWithCompleteCallback(
             lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
                 self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
         )
diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py
index 6df7e41..35f9e24 100644
--- a/src/controller/python/chip/ChipStack.py
+++ b/src/controller/python/chip/ChipStack.py
@@ -26,6 +26,7 @@
 
 from __future__ import absolute_import, print_function
 
+import asyncio
 import builtins
 import logging
 import os
@@ -164,6 +165,35 @@
             return self._res
 
 
+class AsyncioCallableHandle:
+    """Class which handles Matter SDK Calls asyncio friendly"""
+
+    def __init__(self, callback):
+        self._callback = callback
+        self._loop = asyncio.get_event_loop()
+        self._future = self._loop.create_future()
+        self._result = None
+        self._exception = None
+
+    @property
+    def future(self):
+        return self._future
+
+    def _done(self):
+        if self._exception:
+            self._future.set_exception(self._exception)
+        else:
+            self._future.set_result(self._result)
+
+    def __call__(self):
+        try:
+            self._result = self._callback()
+        except Exception as ex:
+            self._exception = ex
+        self._loop.call_soon_threadsafe(self._done)
+        pythonapi.Py_DecRef(py_object(self))
+
+
 _CompleteFunct = CFUNCTYPE(None, c_void_p, c_void_p)
 _ErrorFunct = CFUNCTYPE(None, c_void_p, c_void_p,
                         c_ulong, POINTER(DeviceStatusStruct))
@@ -178,6 +208,7 @@
                  bluetoothAdapter=None, enableServerInteractions=True):
         builtins.enableDebugMode = False
 
+        # TODO: Probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
         self.networkLock = Lock()
         self.completeEvent = Event()
         self.commissioningCompleteEvent = Event()
@@ -318,6 +349,7 @@
             logFunct = 0
         if not isinstance(logFunct, _LogMessageFunct):
             logFunct = _LogMessageFunct(logFunct)
+        # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
         with self.networkLock:
             # NOTE: ChipStack must hold a reference to the CFUNCTYPE object while it is
             # set. Otherwise it may get garbage collected, and logging calls from the
@@ -360,6 +392,7 @@
         # throw error if op in progress
         self.callbackRes = None
         self.completeEvent.clear()
+        # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
         with self.networkLock:
             res = self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)
         self.completeEvent.set()
@@ -367,14 +400,32 @@
             return self.callbackRes
         return res
 
-    def CallAsync(self, callFunct):
+    async def CallAsync(self, callFunct, timeoutMs: int = None):
+        '''Run a Python function on CHIP stack, and wait for the response.
+        This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner.
+        '''
+        callObj = AsyncioCallableHandle(callFunct)
+        pythonapi.Py_IncRef(py_object(callObj))
+
+        res = self._ChipStackLib.pychip_DeviceController_PostTaskOnChipThread(
+            self.cbHandleChipThreadRun, py_object(callObj))
+
+        if not res.is_success:
+            pythonapi.Py_DecRef(py_object(callObj))
+            raise res.to_exception()
+
+        return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)
+
+    def CallAsyncWithCompleteCallback(self, callFunct):
         '''Run a Python function on CHIP stack, and wait for the application specific response.
         This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics.
         Calling this function on CHIP on CHIP mainloop thread will cause deadlock.
+        Make sure to register the necessary callbacks which release the function by setting the completeEvent.
         '''
         # throw error if op in progress
         self.callbackRes = None
         self.completeEvent.clear()
+        # TODO: Lock probably no longer necessary, see https://github.com/project-chip/connectedhomeip/issues/33321.
         with self.networkLock:
             res = self.PostTaskOnChipThread(callFunct).Wait()
 
diff --git a/src/controller/python/chip/clusters/Command.py b/src/controller/python/chip/clusters/Command.py
index 89aae53..6ef25cb 100644
--- a/src/controller/python/chip/clusters/Command.py
+++ b/src/controller/python/chip/clusters/Command.py
@@ -291,9 +291,9 @@
         ))
 
 
-def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
-                timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None, busyWaitMs: Union[None, int] = None,
-                suppressResponse: Union[None, bool] = None) -> PyChipError:
+async def SendCommand(future: Future, eventLoop, responseType: Type, device, commandPath: CommandPath, payload: ClusterCommand,
+                      timedRequestTimeoutMs: Union[None, int] = None, interactionTimeoutMs: Union[None, int] = None,
+                      busyWaitMs: Union[None, int] = None, suppressResponse: Union[None, bool] = None) -> PyChipError:
     ''' Send a cluster-object encapsulated command to a device and does the following:
             - On receipt of a successful data response, returns the cluster-object equivalent through the provided future.
             - None (on a successful response containing no data)
@@ -316,7 +316,7 @@
 
     payloadTLV = payload.ToTLV()
     ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
-    return builtins.chipStack.Call(
+    return await builtins.chipStack.CallAsync(
         lambda: handle.pychip_CommandSender_SendCommand(
             ctypes.py_object(transaction), device,
             c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId,
@@ -353,9 +353,9 @@
     return pyBatchCommandsData
 
 
-def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
-                      timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None, busyWaitMs: Optional[int] = None,
-                      suppressResponse: Optional[bool] = None) -> PyChipError:
+async def SendBatchCommands(future: Future, eventLoop, device, commands: List[InvokeRequestInfo],
+                            timedRequestTimeoutMs: Optional[int] = None, interactionTimeoutMs: Optional[int] = None,
+                            busyWaitMs: Optional[int] = None, suppressResponse: Optional[bool] = None) -> PyChipError:
     ''' Initiates an InvokeInteraction with the batch commands provided.
 
     Arguments:
@@ -388,7 +388,7 @@
     transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
     ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
 
-    return builtins.chipStack.Call(
+    return await builtins.chipStack.CallAsync(
         lambda: handle.pychip_CommandSender_SendBatchCommands(
             py_object(transaction), device,
             c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),