| ''' |
| {{> header}} |
| ''' |
| |
| import ctypes |
| from chip.ChipStack import * |
| from chip.exceptions import * |
| |
| ''' |
| TODO(#4511): This file only sends cluster commands, should add more functions. |
| ''' |
| |
| __all__ = ["ChipClusters"] |
| |
| class ChipClusters: |
| SUCCESS_DELEGATE = ctypes.CFUNCTYPE(None) |
| FAILURE_DELEGATE = ctypes.CFUNCTYPE(None, ctypes.c_uint8) |
| |
| def __init__(self, chipstack): |
| self._ChipStack = chipstack |
| |
| def ListClusterCommands(self): |
| return { |
| {{#chip_client_clusters}} |
| "{{asCamelCased name false}}": { |
| {{#chip_server_cluster_commands}} |
| "{{asCamelCased name false}}": { |
| {{#chip_server_cluster_command_arguments}} |
| "{{asCamelCased label}}": "{{#if (isCharString type)}}str{{else}}{{asPythonType chipType}}{{/if}}", |
| {{/chip_server_cluster_command_arguments}} |
| }, |
| {{/chip_server_cluster_commands}} |
| }, |
| {{/chip_client_clusters}} |
| } |
| |
| def ListClusterAttributes(self): |
| return { |
| {{#chip_client_clusters}} |
| "{{asCamelCased name false}}": [ |
| {{#chip_server_cluster_attributes}} |
| {{#unless isList}} |
| "{{asCamelCased name false}}", |
| {{/unless}} |
| {{/chip_server_cluster_attributes}} |
| ], |
| {{/chip_client_clusters}} |
| } |
| |
| def SendCommand(self, device: ctypes.c_void_p, cluster: str, command: str, endpoint: int, groupid: int, args, imEnabled): |
| func = getattr(self, "Cluster{}_Command{}".format(cluster, command), None) |
| if not func: |
| raise UnknownCommand(cluster, command) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| funcCaller(lambda: func(device, endpoint, groupid, **args)) |
| |
| def ReadAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, groupid: int, imEnabled): |
| func = getattr(self, "Cluster{}_ReadAttribute{}".format(cluster, attribute), None) |
| if not func: |
| raise UnknownAttribute(cluster, attribute) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| funcCaller(lambda: func(device, endpoint, groupid)) |
| |
| def ConfigureAttribute(self, device: ctypes.c_void_p, cluster: str, attribute: str, endpoint: int, minInterval: int, maxInterval: int, change: int, imEnabled): |
| func = getattr(self, "Cluster{}_ConfigureAttribute{}".format(cluster, attribute), None) |
| if not func: |
| raise UnknownAttribute(cluster, attribute) |
| funcCaller = self._ChipStack.Call if imEnabled else self._ChipStack.CallAsync |
| funcCaller(lambda: func(device, endpoint, minInterval, maxInterval, change)) |
| |
| # Cluster commands |
| |
| {{#chip_client_clusters}} |
| {{#chip_server_cluster_commands}} |
| def Cluster{{asCamelCased clusterName false}}_Command{{asCamelCased name false}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int{{#chip_server_cluster_command_arguments}}, {{asCamelCased label}}: {{asPythonType chipType}}{{/chip_server_cluster_command_arguments}}): |
| {{#chip_server_cluster_command_arguments}} |
| {{#if (isCharString type)}} |
| {{asCamelCased label}} = {{asCamelCased label}}.encode("utf-8") + b'\x00' |
| {{/if}} |
| {{/chip_server_cluster_command_arguments}} |
| return self._chipLib.chip_ime_AppendCommand_{{asCamelCased clusterName false}}_{{asCamelCased name false}}( |
| device, ZCLendpoint, ZCLgroupid{{#chip_server_cluster_command_arguments}}, {{asCamelCased label}}{{#if (isString type)}}, len({{asCamelCased label}}){{/if}}{{/chip_server_cluster_command_arguments}} |
| ) |
| {{/chip_server_cluster_commands}} |
| {{/chip_client_clusters}} |
| |
| # Cluster attributes |
| |
| {{#chip_client_clusters}} |
| {{#chip_server_cluster_attributes}} |
| {{#unless isList}} |
| def Cluster{{asCamelCased parent.name false}}_ReadAttribute{{asCamelCased name false}}(self, device: ctypes.c_void_p, ZCLendpoint: int, ZCLgroupid: int): |
| return self._chipLib.chip_ime_ReadAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}(device, ZCLendpoint, ZCLgroupid) |
| {{#if (isReportableAttribute)}} |
| def Cluster{{asCamelCased parent.name false}}_ConfigureAttribute{{asCamelCased name false}}(self, device: ctypes.c_void_p, ZCLendpoint: int, minInterval: int, maxInterval: int, change: int): |
| return self._chipLib.chip_ime_ConfigureAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}(device, ZCLendpoint, minInterval, maxInterval, change) |
| {{/if}} |
| {{/unless}} |
| {{/chip_server_cluster_attributes}} |
| {{/chip_client_clusters}} |
| |
| # Init native functions |
| |
| def InitLib(self, chipLib): |
| self._chipLib = chipLib |
| # Response delegate setters |
| self._chipLib.chip_ime_SetSuccessResponseDelegate.argtypes = [ChipClusters.SUCCESS_DELEGATE] |
| self._chipLib.chip_ime_SetSuccessResponseDelegate.restype = None |
| self._chipLib.chip_ime_SetFailureResponseDelegate.argtypes = [ChipClusters.FAILURE_DELEGATE] |
| self._chipLib.chip_ime_SetFailureResponseDelegate.res = None |
| {{#chip_client_clusters}} |
| # Cluster {{asCamelCased name false}} |
| {{#chip_server_cluster_commands}} |
| # Cluster {{asCamelCased clusterName false}} Command {{asCamelCased name false}} |
| self._chipLib.chip_ime_AppendCommand_{{asCamelCased clusterName false}}_{{asCamelCased name false}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16{{#chip_server_cluster_command_arguments}}{{#if (isString type)}}, ctypes.c_char_p, ctypes.c_uint32{{else}}, ctypes.{{asPythonCType chipType}}{{/if}}{{/chip_server_cluster_command_arguments}}] |
| self._chipLib.chip_ime_AppendCommand_{{asCamelCased clusterName false}}_{{asCamelCased name false}}.restype = ctypes.c_uint32 |
| {{/chip_server_cluster_commands}} |
| {{#chip_server_cluster_attributes}} |
| {{#unless isList}} |
| # Cluster {{asCamelCased parent.name false}} ReadAttribute {{asCamelCased name false}} |
| self._chipLib.chip_ime_ReadAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16] |
| self._chipLib.chip_ime_ReadAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.restype = ctypes.c_uint32 |
| {{#if (isReportableAttribute)}} |
| # Cluster {{asCamelCased parent.name false}} ConfigureAttribute {{asCamelCased name false}} |
| self._chipLib.chip_ime_ConfigureAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.argtypes = [ctypes.c_void_p, ctypes.c_uint8, ctypes.c_uint16, ctypes.c_uint16{{#unless (isDiscreteType)}}, ctypes.{{asPythonCType chipType}}{{/unless}}] |
| self._chipLib.chip_ime_ConfigureAttribute_{{asCamelCased parent.name false}}_{{asCamelCased name false}}.restype = ctypes.c_uint32 |
| {{/if}} |
| {{/unless}} |
| {{/chip_server_cluster_attributes}} |
| {{/chip_client_clusters}} |
| # Init response delegates |
| def HandleSuccess(): |
| self._ChipStack.callbackRes = 0 |
| self._ChipStack.completeEvent.set() |
| def HandleFailure(status): |
| self._ChipStack.callbackRes = status |
| self._ChipStack.completeEvent.set() |
| self._chipLib.chip_ime_SetSuccessResponseDelegate(ChipClusters.SUCCESS_DELEGATE(HandleSuccess)) |
| self._chipLib.chip_ime_SetFailureResponseDelegate(ChipClusters.FAILURE_DELEGATE(HandleFailure)) |