| ''' |
| {{> header}} |
| ''' |
| |
| import ctypes |
| from chip.ChipStack import * |
| from chip.exceptions import * |
| |
| __all__ = ["ChipClusters"] |
| |
| class ChipClusters: |
| SUCCESS_DELEGATE = ctypes.CFUNCTYPE(None) |
| FAILURE_DELEGATE = ctypes.CFUNCTYPE(None, ctypes.c_uint8) |
| |
| {{#chip_client_clusters}} |
| _{{asDelimitedMacro name}}_CLUSTER_INFO = { |
| "clusterName": "{{asUpperCamelCase name}}", |
| "clusterId": {{asHex code 8}}, |
| "commands": { |
| {{#chip_cluster_commands}} |
| {{asHex code 8}}: { |
| "commandId": {{asHex code 8}}, |
| "commandName": "{{asUpperCamelCase name}}", |
| "args": { |
| {{#chip_cluster_command_arguments_with_structs_expanded}} |
| "{{asLowerCamelCase label}}": "{{#if (isCharString type)}}str{{else}}{{asPythonType chipType}}{{/if}}", |
| {{/chip_cluster_command_arguments_with_structs_expanded}} |
| }, |
| }, |
| {{/chip_cluster_commands}} |
| }, |
| "attributes": { |
| {{#chip_server_cluster_attributes}} |
| {{asHex code 8}}: { |
| "attributeName": "{{asUpperCamelCase name}}", |
| "attributeId": {{asHex code 8}}, |
| "type": "{{#if (isCharString type)}}str{{else}}{{asPythonType chipType}}{{/if}}", |
| {{#if isReportableAttribute}} |
| "reportable": True, |
| {{/if}} |
| {{#if isWritableAttribute}} |
| "writable": True, |
| {{/if}} |
| }, |
| {{/chip_server_cluster_attributes}} |
| }, |
| } |
| {{/chip_client_clusters}} |
| |
| _CLUSTER_ID_DICT = { |
| {{#chip_client_clusters}} |
| {{asHex code 8}}: _{{asDelimitedMacro name}}_CLUSTER_INFO, |
| {{/chip_client_clusters}} |
| } |
| |
| _CLUSTER_NAME_DICT = { |
| {{#chip_client_clusters}} |
| "{{asUpperCamelCase name}}": _{{asDelimitedMacro name}}_CLUSTER_INFO, |
| {{/chip_client_clusters}} |
| } |
| |
| def __init__(self, chipstack): |
| self._ChipStack = chipstack |
| |
| def GetClusterInfoById(self, cluster_id: int): |
| data = ChipClusters._CLUSTER_ID_DICT.get(cluster_id, None) |
| if not data: |
| raise UnknownCluster(cluster_id) |
| return data |
| |
| def ListClusterInfo(self): |
| return ChipClusters._CLUSTER_NAME_DICT |
| |
| def ListClusterCommands(self): |
| return { clusterName: { |
| command["commandName"]: command["args"] for command in clusterInfo["commands"].values() |
| } for clusterName, clusterInfo in ChipClusters._CLUSTER_NAME_DICT.items() } |
| |
| def ListClusterAttributes(self): |
| return { clusterName: { |
| attribute["attributeName"]: attribute for attribute in clusterInfo["attributes"].values() |
| } for clusterName, clusterInfo in ChipClusters._CLUSTER_NAME_DICT.items() } |
| |
| def SendCommand(self, device: ctypes.c_void_p, cluster: str, command: str, endpoint: int, groupid: int, args, imEnabled): |
| func = getattr(self, "Cluster{}_Command{}".format(cluster, command), None) |
| if not func: |
| raise UnknownCommand(cluster, command) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| res = funcCaller(lambda: func(device, endpoint, groupid, **args)) |
| if res != 0: |
| raise self._ChipStack.ErrorToException(res) |
| |
| def ReadAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, groupid: int, imEnabled): |
| func = getattr(self, "Cluster{}_ReadAttribute{}".format(cluster, attribute), None) |
| if not func: |
| raise UnknownAttribute(cluster, attribute) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| res = funcCaller(lambda: func(device, endpoint, groupid)) |
| if res != 0: |
| raise self._ChipStack.ErrorToException(res) |
| |
| def SubscribeAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, minInterval: int, maxInterval: int, imEnabled): |
| func = getattr(self, "Cluster{}_SubscribeAttribute{}".format(cluster, attribute), None) |
| if not func: |
| raise UnknownAttribute(cluster, attribute) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| funcCaller(lambda: func(device, endpoint, minInterval, maxInterval)) |
| |
| def WriteAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, groupid: int, value, imEnabled): |
| func = getattr(self, "Cluster{}_WriteAttribute{}".format(cluster, attribute), None) |
| if not func: |
| raise UnknownAttribute(cluster, attribute) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| res = funcCaller(lambda: func(device, endpoint, groupid, value)) |
| if res != 0: |
| raise self._ChipStack.ErrorToException(res) |
| |
| # Cluster commands |
| |
| {{#chip_client_clusters}} |
| {{#chip_cluster_commands}} |
| def Cluster{{asUpperCamelCase clusterName}}_Command{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int{{#chip_cluster_command_arguments_with_structs_expanded}}, {{asLowerCamelCase label}}: {{asPythonType chipType}}{{/chip_cluster_command_arguments_with_structs_expanded}}): |
| {{#chip_cluster_command_arguments_with_structs_expanded}} |
| {{#if (isCharString type)}} |
| {{asLowerCamelCase label}} = {{asLowerCamelCase label}}.encode("utf-8") + b'\x00' |
| {{/if}} |
| {{/chip_cluster_command_arguments_with_structs_expanded}} |
| return self._chipLib.chip_ime_AppendCommand_{{asUpperCamelCase clusterName}}_{{asUpperCamelCase name}}( |
| device, ZCLendpoint, ZCLgroupid{{#chip_cluster_command_arguments_with_structs_expanded}}, {{asLowerCamelCase label}}{{#if (isString type)}}, len({{asLowerCamelCase label}}){{/if}}{{/chip_cluster_command_arguments_with_structs_expanded}} |
| ) |
| {{/chip_cluster_commands}} |
| {{/chip_client_clusters}} |
| |
| # Cluster attributes |
| |
| {{#chip_client_clusters}} |
| {{#chip_server_cluster_attributes}} |
| def Cluster{{asUpperCamelCase parent.name}}_ReadAttribute{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int): |
| return self._chipLib.chip_ime_ReadAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}(device, ZCLendpoint, ZCLgroupid) |
| {{#if isReportableAttribute}} |
| def Cluster{{asUpperCamelCase parent.name}}_SubscribeAttribute{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, minInterval: int, maxInterval: int): |
| return self._chipLib.chip_ime_SubscribeAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}(device, ZCLendpoint, minInterval, maxInterval) |
| {{/if}} |
| {{#if isWritableAttribute}} |
| def Cluster{{asUpperCamelCase parent.name}}_WriteAttribute{{asUpperCamelCase name}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int, value: {{ asPythonType chipType }}): |
| {{#if (isCharString type)}} |
| value = value.encode("utf-8") |
| {{/if}} |
| return self._chipLib.chip_ime_WriteAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}(device, ZCLendpoint, ZCLgroupid, value{{#if (isString type)}}, len(value){{/if}}) |
| {{/if}} |
| {{/chip_server_cluster_attributes}} |
| {{/chip_client_clusters}} |
| |
| # Init native functions |
| |
| def InitLib(self, chipLib): |
| self._chipLib = chipLib |
| # Response delegate setters |
| self._chipLib.chip_ime_SetSuccessResponseDelegate.argtypes = [ChipClusters.SUCCESS_DELEGATE] |
| self._chipLib.chip_ime_SetSuccessResponseDelegate.restype = None |
| self._chipLib.chip_ime_SetFailureResponseDelegate.argtypes = [ChipClusters.FAILURE_DELEGATE] |
| self._chipLib.chip_ime_SetFailureResponseDelegate.res = None |
| {{#chip_client_clusters}} |
| # Cluster {{asUpperCamelCase name}} |
| {{#chip_cluster_commands}} |
| # Cluster {{asUpperCamelCase clusterName}} Command {{asUpperCamelCase name}} |
| self._chipLib.chip_ime_AppendCommand_{{asUpperCamelCase clusterName}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16{{#chip_cluster_command_arguments_with_structs_expanded}}{{#if (isString type)}}, ctypes.c_char_p, ctypes.c_uint32{{else}}, ctypes.{{asPythonCType chipType}}{{/if}}{{/chip_cluster_command_arguments_with_structs_expanded}}] |
| self._chipLib.chip_ime_AppendCommand_{{asUpperCamelCase clusterName}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32 |
| {{/chip_cluster_commands}} |
| {{#chip_server_cluster_attributes}} |
| # Cluster {{asUpperCamelCase parent.name}} ReadAttribute {{asUpperCamelCase name}} |
| self._chipLib.chip_ime_ReadAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16] |
| self._chipLib.chip_ime_ReadAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32 |
| {{#if isReportableAttribute}} |
| # Cluster {{asUpperCamelCase parent.name}} SubscribeAttribute {{asUpperCamelCase name}} |
| self._chipLib.chip_ime_SubscribeAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16, ctypes.c_uint16] |
| self._chipLib.chip_ime_SubscribeAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32 |
| {{/if}} |
| {{#if isWritableAttribute}} |
| # Cluster {{asUpperCamelCase parent.name}} WriteAttribute {{asUpperCamelCase name}} |
| self._chipLib.chip_ime_WriteAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16, {{#if (isString type)}}ctypes.c_char_p, ctypes.c_uint32{{else}}ctypes.{{asPythonCType chipType}}{{/if}}] |
| self._chipLib.chip_ime_WriteAttribute_{{asUpperCamelCase parent.name}}_{{asUpperCamelCase name}}.restype = ctypes.c_uint32 |
| {{/if}} |
| {{/chip_server_cluster_attributes}} |
| {{/chip_client_clusters}} |
| # Init response delegates |
| def HandleSuccess(): |
| self._ChipStack.callbackRes = 0 |
| self._ChipStack.completeEvent.set() |
| |
| def HandleFailure(status): |
| self._ChipStack.callbackRes = status |
| self._ChipStack.completeEvent.set() |
| |
| self._HandleSuccess = ChipClusters.SUCCESS_DELEGATE(HandleSuccess) |
| self._HandleFailure = ChipClusters.FAILURE_DELEGATE(HandleFailure) |
| self._chipLib.chip_ime_SetSuccessResponseDelegate(self._HandleSuccess) |
| self._chipLib.chip_ime_SetFailureResponseDelegate(self._HandleFailure) |