blob: 08042b9b865376ea4b426fce0e7dde73153a34b5 [file] [log] [blame]
'''
{{> header}}
'''
import ctypes
from chip.ChipStack import *
from chip.exceptions import *
__all__ = ["ChipClusters"]
class ChipClusters:
SUCCESS_DELEGATE = ctypes.CFUNCTYPE(None)
FAILURE_DELEGATE = ctypes.CFUNCTYPE(None, ctypes.c_uint8)
{{#chip_client_clusters}}
_{{asDelimitedMacro name}}_CLUSTER_INFO = {
"clusterName": "{{asUpperCamelCase name}}",
"clusterId": {{asHex code 8}},
"commands": {
{{#chip_cluster_commands}}
{{asHex code 8}}: {
"commandId": {{asHex code 8}},
"commandName": "{{asUpperCamelCase name}}",
"args": {
{{#chip_cluster_command_arguments_with_structs_expanded}}
"{{asLowerCamelCase label}}": "{{#if (isCharString type)}}str{{else}}{{asPythonType chipType}}{{/if}}",
{{/chip_cluster_command_arguments_with_structs_expanded}}
},
},
{{/chip_cluster_commands}}
},
"attributes": {
{{#chip_server_cluster_attributes}}
{{asHex code 8}}: {
"attributeName": "{{asUpperCamelCase name}}",
"attributeId": {{asHex code 8}},
"type": "{{#if (isCharString type)}}str{{else}}{{asPythonType chipType}}{{/if}}",
{{#if isReportableAttribute}}
"reportable": True,
{{/if}}
{{#if isWritableAttribute}}
"writable": True,
{{/if}}
},
{{/chip_server_cluster_attributes}}
},
}
{{/chip_client_clusters}}
_CLUSTER_ID_DICT = {
{{#chip_client_clusters}}
{{asHex code 8}}: _{{asDelimitedMacro name}}_CLUSTER_INFO,
{{/chip_client_clusters}}
}
_CLUSTER_NAME_DICT = {
{{#chip_client_clusters}}
"{{asUpperCamelCase name}}": _{{asDelimitedMacro name}}_CLUSTER_INFO,
{{/chip_client_clusters}}
}
def __init__(self, chipstack):
self._ChipStack = chipstack
def GetClusterInfoById(self, cluster_id: int):
data = ChipClusters._CLUSTER_ID_DICT.get(cluster_id, None)
if not data:
raise UnknownCluster(cluster_id)
return data
def ListClusterInfo(self):
return ChipClusters._CLUSTER_NAME_DICT
def ListClusterCommands(self):
return { clusterName: {
command["commandName"]: command["args"] for command in clusterInfo["commands"].values()
} for clusterName, clusterInfo in ChipClusters._CLUSTER_NAME_DICT.items() }
def ListClusterAttributes(self):
return { clusterName: {
attribute["attributeName"]: attribute for attribute in clusterInfo["attributes"].values()
} for clusterName, clusterInfo in ChipClusters._CLUSTER_NAME_DICT.items() }
def SendCommand(self, device: ctypes.c_void_p, cluster: str, command: str, endpoint: int, groupid: int, args, imEnabled):
func = getattr(self, "Cluster{}_Command{}".format(cluster, command), None)
if not func:
raise UnknownCommand(cluster, command)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
res = funcCaller(lambda: func(device, endpoint, groupid, **args))
if res != 0:
raise self._ChipStack.ErrorToException(res)
def ReadAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, groupid: int, imEnabled):
func = getattr(self, "Cluster{}_ReadAttribute{}".format(cluster, attribute), None)
if not func:
raise UnknownAttribute(cluster, attribute)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
res = funcCaller(lambda: func(device, endpoint, groupid))
if res != 0:
raise self._ChipStack.ErrorToException(res)
def SubscribeAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, minInterval: int, maxInterval: int, imEnabled):
func = getattr(self, "Cluster{}_SubscribeAttribute{}".format(cluster, attribute), None)
if not func:
raise UnknownAttribute(cluster, attribute)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
funcCaller(lambda: func(device, endpoint, minInterval, maxInterval))
def WriteAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, groupid: int, value, imEnabled):
func = getattr(self, "Cluster{}_WriteAttribute{}".format(cluster, attribute), None)
if not func:
raise UnknownAttribute(cluster, attribute)
funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync
res = funcCaller(lambda: func(device, endpoint, groupid, value))
if res != 0:
raise self._ChipStack.ErrorToException(res)
# Cluster commands
{{#chip_client_clusters}}
{{#chip_cluster_commands}}
def Cluster{{asUpperCamelCase clusterName}}_Command{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int{{#chip_cluster_command_arguments_with_structs_expanded}}, {{asLowerCamelCase label}}: {{asPythonType chipType}}{{/chip_cluster_command_arguments_with_structs_expanded}}):
{{#chip_cluster_command_arguments_with_structs_expanded}}
{{#if (isCharString type)}}
{{asLowerCamelCase label}} = {{asLowerCamelCase label}}.encode("utf-8") + b'\x00'
{{/if}}
{{/chip_cluster_command_arguments_with_structs_expanded}}
return self._chipLib.chip_ime_AppendCommand_{{asUpperCamelCase clusterName}}_{{asUpperCamelCase name}}(
device, ZCLendpoint, ZCLgroupid{{#chip_cluster_command_arguments_with_structs_expanded}}, {{asLowerCamelCase label}}{{#if (isString type)}}, len({{asLowerCamelCase label}}){{/if}}{{/chip_cluster_command_arguments_with_structs_expanded}}
)
{{/chip_cluster_commands}}
{{/chip_client_clusters}}
# Cluster attributes
{{#chip_client_clusters}}
{{#chip_server_cluster_attributes}}
def Cluster{{asUpperCamelCase parent.name}}_ReadAttribute{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int):
return self._chipLib.chip_ime_ReadAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}(device, ZCLendpoint, ZCLgroupid)
{{#if isReportableAttribute}}
def Cluster{{asUpperCamelCase parent.name}}_SubscribeAttribute{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, minInterval: int, maxInterval: int):
return self._chipLib.chip_ime_SubscribeAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}(device, ZCLendpoint, minInterval, maxInterval)
{{/if}}
{{#if isWritableAttribute}}
def Cluster{{asUpperCamelCase parent.name}}_WriteAttribute{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int, value: {{ asPythonType chipType }}):
{{#if (isCharString type)}}
value = value.encode("utf-8")
{{/if}}
return self._chipLib.chip_ime_WriteAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}(device, ZCLendpoint, ZCLgroupid, value{{#if (isString type)}}, len(value){{/if}})
{{/if}}
{{/chip_server_cluster_attributes}}
{{/chip_client_clusters}}
# Init native functions
def InitLib(self, chipLib):
self._chipLib = chipLib
# Response delegate setters
self._chipLib.chip_ime_SetSuccessResponseDelegate.argtypes = [ChipClusters.SUCCESS_DELEGATE]
self._chipLib.chip_ime_SetSuccessResponseDelegate.restype = None
self._chipLib.chip_ime_SetFailureResponseDelegate.argtypes = [ChipClusters.FAILURE_DELEGATE]
self._chipLib.chip_ime_SetFailureResponseDelegate.res = None
{{#chip_client_clusters}}
# Cluster {{asUpperCamelCase name}}
{{#chip_cluster_commands}}
# Cluster {{asUpperCamelCase clusterName}} Command {{asUpperCamelCase name}}
self._chipLib.chip_ime_AppendCommand_{{asUpperCamelCase clusterName}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16{{#chip_cluster_command_arguments_with_structs_expanded}}{{#if (isString type)}}, ctypes.c_char_p, ctypes.c_uint32{{else}}, ctypes.{{asPythonCType chipType}}{{/if}}{{/chip_cluster_command_arguments_with_structs_expanded}}]
self._chipLib.chip_ime_AppendCommand_{{asUpperCamelCase clusterName}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32
{{/chip_cluster_commands}}
{{#chip_server_cluster_attributes}}
# Cluster {{asUpperCamelCase parent.name}} ReadAttribute {{asUpperCamelCase name}}
self._chipLib.chip_ime_ReadAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16]
self._chipLib.chip_ime_ReadAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32
{{#if isReportableAttribute}}
# Cluster {{asUpperCamelCase parent.name}} SubscribeAttribute {{asUpperCamelCase name}}
self._chipLib.chip_ime_SubscribeAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16, ctypes.c_uint16]
self._chipLib.chip_ime_SubscribeAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32
{{/if}}
{{#if isWritableAttribute}}
# Cluster {{asUpperCamelCase parent.name}} WriteAttribute {{asUpperCamelCase name}}
self._chipLib.chip_ime_WriteAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16, {{#if (isString type)}}ctypes.c_char_p, ctypes.c_uint32{{else}}ctypes.{{asPythonCType chipType}}{{/if}}]
self._chipLib.chip_ime_WriteAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32
{{/if}}
{{/chip_server_cluster_attributes}}
{{/chip_client_clusters}}
# Init response delegates
def HandleSuccess():
self._ChipStack.callbackRes = 0
self._ChipStack.completeEvent.set()
def HandleFailure(status):
self._ChipStack.callbackRes = status
self._ChipStack.completeEvent.set()
self._HandleSuccess = ChipClusters.SUCCESS_DELEGATE(HandleSuccess)
self._HandleFailure = ChipClusters.FAILURE_DELEGATE(HandleFailure)
self._chipLib.chip_ime_SetSuccessResponseDelegate(self._HandleSuccess)
self._chipLib.chip_ime_SetFailureResponseDelegate(self._HandleFailure)