blob: c33d0836bbf5cfaf7744b64c7133cd26d17b4595 [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from asyncio.futures import Future
import ctypes
from dataclasses import dataclass
from typing import Type, Union, List, Any
from ctypes import CFUNCTYPE, c_char_p, c_size_t, c_void_p, c_uint32, c_uint16, py_object
from .ClusterObjects import ClusterAttributeDescriptor
import chip.exceptions
import chip.interaction_model
@dataclass
class AttributePath:
EndpointId: int
ClusterId: int
AttributeId: int
@dataclass
class AttributeStatus:
Path: AttributePath
Status: Union[chip.interaction_model.Status, int]
AttributeWriteResult = AttributeStatus
@dataclass
class AttributeDescriptorWithEndpoint:
EndpointId: int
Attribute: ClusterAttributeDescriptor
@dataclass
class AttributeWriteRequest(AttributeDescriptorWithEndpoint):
Data: Any
AttributeReadRequest = AttributeDescriptorWithEndpoint
@dataclass
class AttributeReadResult(AttributeStatus):
Data: Any = None
class AsyncWriteTransaction:
def __init__(self, future: Future, eventLoop):
self._event_loop = eventLoop
self._future = future
self._res = []
def _handleResponse(self, path: AttributePath, status: int):
try:
imStatus = chip.interaction_model.Status(status)
self._res.append(AttributeWriteResult(Path=path, Status=imStatus))
except:
self._res.append(AttributeWriteResult(Path=path, Status=status))
def handleResponse(self, path: AttributePath, status: int):
self._event_loop.call_soon_threadsafe(
self._handleResponse, path, status)
def _handleError(self, chipError: int):
self._future.set_exception(
chip.exceptions.ChipStackError(chipError))
def handleError(self, chipError: int):
self._event_loop.call_soon_threadsafe(
self._handleError, chipError
)
def _handleDone(self):
if not self._future.done():
self._future.set_result(self._res)
def handleDone(self):
self._event_loop.call_soon_threadsafe(self._handleDone)
_OnWriteResponseCallbackFunct = CFUNCTYPE(
None, py_object, c_uint16, c_uint32, c_uint32, c_uint16)
_OnWriteErrorCallbackFunct = CFUNCTYPE(
None, py_object, c_uint32)
_OnWriteDoneCallbackFunct = CFUNCTYPE(
None, py_object)
@_OnWriteResponseCallbackFunct
def _OnWriteResponseCallback(closure, endpoint: int, cluster: int, attribute: int, status):
closure.handleResponse(AttributePath(endpoint, cluster, attribute), status)
@_OnWriteErrorCallbackFunct
def _OnWriteErrorCallback(closure, chiperror: int):
closure.handleError(chiperror)
@_OnWriteDoneCallbackFunct
def _OnWriteDoneCallback(closure):
closure.handleDone()
ctypes.pythonapi.Py_DecRef(ctypes.py_object(closure))
def WriteAttributes(future: Future, eventLoop, device, attributes: List[AttributeWriteRequest]) -> int:
handle = chip.native.GetLibraryHandle()
transaction = AsyncWriteTransaction(future, eventLoop)
writeargs = []
for attr in attributes:
path = chip.interaction_model.AttributePathStruct.parse(
b'\x00' * chip.interaction_model.AttributePathStruct.sizeof())
path.EndpointId = attr.EndpointId
path.ClusterId = attr.Attribute.cluster_id
path.AttributeId = attr.Attribute.attribute_id
path = chip.interaction_model.AttributePathStruct.build(path)
tlv = attr.Attribute.ToTLV(None, attr.Data)
writeargs.append(ctypes.c_char_p(path))
writeargs.append(ctypes.c_char_p(bytes(tlv)))
writeargs.append(ctypes.c_int(len(tlv)))
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
res = handle.pychip_WriteClient_WriteAttributes(
ctypes.py_object(transaction), device, ctypes.c_size_t(len(attributes)), *writeargs)
if res != 0:
ctypes.pythonapi.Py_DecRef(ctypes.py_object(transaction))
return res
def Init():
handle = chip.native.GetLibraryHandle()
# Uses one of the type decorators as an indicator for everything being
# initialized.
if not handle.pychip_WriteClient_InitCallbacks.argtypes:
setter = chip.native.NativeLibraryHandleMethodArguments(handle)
handle.pychip_WriteClient_WriteAttributes.restype = c_uint32
setter.Set('pychip_WriteClient_InitCallbacks', None, [
_OnWriteResponseCallbackFunct, _OnWriteErrorCallbackFunct, _OnWriteDoneCallbackFunct])
handle.pychip_WriteClient_InitCallbacks(
_OnWriteResponseCallback, _OnWriteErrorCallback, _OnWriteDoneCallback)