blob: a6423f6e4077528a27272d948f9e54e53b3ec213 [file] [log] [blame]
'''
{{> header}}
'''
import ctypes
from chip.ChipStack import *
from chip.exceptions import *
'''
TODO(#4511): This file only sends cluster commands, should add more functions.
'''
__all__ = ["ChipClusters"]
class ChipClusters:
SUCCESS_DELEGATE = ctypes.CFUNCTYPE(None)
FAILURE_DELEGATE = ctypes.CFUNCTYPE(None, ctypes.c_uint8)
def __init__(self, chipstack):
self._ChipStack = chipstack
def ListClusterCommands(self):
return {
{{#chip_client_clusters}}
"{{asCamelCased name false}}": {
{{#chip_server_cluster_commands}}
"{{asCamelCased name false}}": {
{{#chip_server_cluster_command_arguments}}
"{{asCamelCased label}}": "{{#if (isCharString type)}}str{{else}}{{asPythonType chipType}}{{/if}}",
{{/chip_server_cluster_command_arguments}}
},
{{/chip_server_cluster_commands}}
},
{{/chip_client_clusters}}
}
def ListClusterAttributes(self):
return {
{{#chip_client_clusters}}
"{{asCamelCased name false}}": [
{{#chip_server_cluster_attributes}}
{{#unless isList}}
"{{asCamelCased name false}}",
{{/unless}}
{{/chip_server_cluster_attributes}}
],
{{/chip_client_clusters}}
}
def SendCommand(self, device: ctypes.c_void_p, cluster: str, command: str, endpoint: int, groupid: int, args, imEnabled):
func = getattr(self, "Cluster{}_Command{}".format(cluster, command), None)
if not func:
raise UnknownCommand(cluster, command)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
funcCaller(lambda: func(device, endpoint, groupid, **args))
def ReadAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, groupid: int, imEnabled):
func = getattr(self, "Cluster{}_ReadAttribute{}".format(cluster, attribute), None)
if not func:
raise UnknownAttribute(cluster, attribute)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
funcCaller(lambda: func(device, endpoint, groupid))
def ConfigureAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, minInterval: int, maxInterval: int, change: int, imEnabled):
func = getattr(self, "Cluster{}_ConfigureAttribute{}".format(cluster, attribute), None)
if not func:
raise UnknownAttribute(cluster, attribute)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
funcCaller(lambda: func(device, endpoint, minInterval, maxInterval, change))
# Cluster commands
{{#chip_client_clusters}}
{{#chip_server_cluster_commands}}
def Cluster{{asCamelCased clusterName false}}_Command{{asCamelCased name false}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int{{#chip_server_cluster_command_arguments}}, {{asCamelCased label}}: {{asPythonType chipType}}{{/chip_server_cluster_command_arguments}}):
{{#chip_server_cluster_command_arguments}}
{{#if (isCharString type)}}
{{asCamelCased label}} = {{asCamelCased label}}.encode("utf-8") + b'\x00'
{{/if}}
{{/chip_server_cluster_command_arguments}}
return self._chipLib.chip_ime_AppendCommand_{{asCamelCased clusterName false}}_{{asCamelCased name false}}(
device, ZCLendpoint, ZCLgroupid{{#chip_server_cluster_command_arguments}}, {{asCamelCased label}}{{#if (isString type)}}, len({{asCamelCased label}}){{/if}}{{/chip_server_cluster_command_arguments}}
)
{{/chip_server_cluster_commands}}
{{/chip_client_clusters}}
# Cluster attributes
{{#chip_client_clusters}}
{{#chip_server_cluster_attributes}}
{{#unless isList}}
def Cluster{{asCamelCased parent.name false}}_ReadAttribute{{asCamelCased name false}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int):
return self._chipLib.chip_ime_ReadAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}(device, ZCLendpoint, ZCLgroupid)
{{#if (isReportableAttribute)}}
def Cluster{{asCamelCased parent.name false}}_ConfigureAttribute{{asCamelCased name false}}(self, device: ctypes.c_void_p, ZCLendpoint: int, minInterval: int, maxInterval: int, change: int):
return self._chipLib.chip_ime_ConfigureAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}(device, ZCLendpoint, minInterval, maxInterval, change)
{{/if}}
{{/unless}}
{{/chip_server_cluster_attributes}}
{{/chip_client_clusters}}
# Init native functions
def InitLib(self, chipLib):
self._chipLib = chipLib
# Response delegate setters
self._chipLib.chip_ime_SetSuccessResponseDelegate.argtypes = [ChipClusters.SUCCESS_DELEGATE]
self._chipLib.chip_ime_SetSuccessResponseDelegate.restype = None
self._chipLib.chip_ime_SetFailureResponseDelegate.argtypes = [ChipClusters.FAILURE_DELEGATE]
self._chipLib.chip_ime_SetFailureResponseDelegate.res = None
{{#chip_client_clusters}}
# Cluster {{asCamelCased name false}}
{{#chip_server_cluster_commands}}
# Cluster {{asCamelCased clusterName false}} Command {{asCamelCased name false}}
self._chipLib.chip_ime_AppendCommand_{{asCamelCased clusterName false}}_{{asCamelCased name false}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16{{#chip_server_cluster_command_arguments}}{{#if (isString type)}}, ctypes.c_char_p, ctypes.c_uint32{{else}}, ctypes.{{asPythonCType chipType}}{{/if}}{{/chip_server_cluster_command_arguments}}]
self._chipLib.chip_ime_AppendCommand_{{asCamelCased clusterName false}}_{{asCamelCased name false}}.restype = ctypes.c_uint32
{{/chip_server_cluster_commands}}
{{#chip_server_cluster_attributes}}
{{#unless isList}}
# Cluster {{asCamelCased parent.name false}} ReadAttribute {{asCamelCased name false}}
self._chipLib.chip_ime_ReadAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16]
self._chipLib.chip_ime_ReadAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.restype = ctypes.c_uint32
{{#if (isReportableAttribute)}}
# Cluster {{asCamelCased parent.name false}} ConfigureAttribute {{asCamelCased name false}}
self._chipLib.chip_ime_ConfigureAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16, ctypes.c_uint16{{#unless (isDiscreteType)}}, ctypes.{{asPythonCType chipType}}{{/unless}}]
self._chipLib.chip_ime_ConfigureAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.restype = ctypes.c_uint32
{{/if}}
{{/unless}}
{{/chip_server_cluster_attributes}}
{{/chip_client_clusters}}
# Init response delegates
def HandleSuccess():
self._ChipStack.callbackRes = 0
self._ChipStack.completeEvent.set()
def HandleFailure(status):
self._ChipStack.callbackRes = status
self._ChipStack.completeEvent.set()
self._chipLib.chip_ime_SetSuccessResponseDelegate(ChipClusters.SUCCESS_DELEGATE(HandleSuccess))
self._chipLib.chip_ime_SetFailureResponseDelegate(ChipClusters.FAILURE_DELEGATE(HandleFailure))