blob: 43317c1dc6158a0ff924d7ea1f43525522cb9c4e [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import os
import string
import sys
import textwrap
import threading
from cmd import Cmd
from chip.server import GetLibraryHandle, PostAttributeChangeCallback
from dali.address import Broadcast
from dali.driver.hid import tridonic
from dali.gear.general import DAPC, Off, RecallMaxLevel
dali_loop = None
dev = None
async def dali_on(is_on: bool):
global dali_loop
global dev
await dev.connected.wait()
if (is_on):
await dev.send(RecallMaxLevel(Broadcast()))
else:
await dev.send(Off(Broadcast()))
async def dali_level(level: int):
global dali_loop
global dev
await dev.connected.wait()
await dev.send(DAPC(Broadcast(), level))
def daliworker():
global dali_loop
global dev
dali_loop = asyncio.new_event_loop()
dev = tridonic("/dev/dali/daliusb-*", glob=True, loop=dali_loop)
dev.connect()
asyncio.set_event_loop(dali_loop)
dali_loop.run_forever()
class LightingMgrCmd(Cmd):
def __init__(self, rendezvousAddr=None, controllerNodeId=0, bluetoothAdapter=None):
self.lastNetworkId = None
Cmd.__init__(self)
Cmd.identchars = string.ascii_letters + string.digits + "-"
if sys.stdin.isatty():
self.prompt = "chip-lighting > "
else:
self.use_rawinput = 0
self.prompt = ""
LightingMgrCmd.command_names.sort()
self.historyFileName = os.path.expanduser("~/.chip-lighting-history")
try:
import readline
if "libedit" in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
readline.set_completer_delims(" ")
try:
readline.read_history_file(self.historyFileName)
except IOError:
pass
except ImportError:
pass
command_names = [
"help"
]
def parseline(self, line):
cmd, arg, line = Cmd.parseline(self, line)
if cmd:
cmd = self.shortCommandName(cmd)
line = cmd + " " + arg
return cmd, arg, line
def completenames(self, text, *ignored):
return [
name + " "
for name in LightingMgrCmd.command_names
if name.startswith(text) or self.shortCommandName(name).startswith(text)
]
def shortCommandName(self, cmd):
return cmd.replace("-", "")
def precmd(self, line):
if not self.use_rawinput and line != "EOF" and line != "":
print(">>> " + line)
return line
def postcmd(self, stop, line):
if not stop and self.use_rawinput:
self.prompt = "chip-lighting > "
return stop
def postloop(self):
try:
import readline
try:
readline.write_history_file(self.historyFileName)
except IOError:
pass
except ImportError:
pass
def do_help(self, line):
"""
help
Print the help
"""
if line:
cmd, arg, unused = self.parseline(line)
try:
doc = getattr(self, "do_" + cmd).__doc__
except AttributeError:
doc = None
if doc:
self.stdout.write("%s\n" % textwrap.dedent(doc))
else:
self.stdout.write("No help on %s\n" % (line))
else:
self.print_topics(
"\nAvailable commands (type help <name> for more information):",
LightingMgrCmd.command_names,
15,
80,
)
@PostAttributeChangeCallback
def attributeChangeCallback(
endpoint: int,
clusterId: int,
attributeId: int,
xx_type: int,
size: int,
value: bytes,
):
global dali_loop
if endpoint == 1:
if clusterId == 6 and attributeId == 0:
if len(value) == 1 and value[0] == 1:
# print("[PY] light on")
future = asyncio.run_coroutine_threadsafe(
dali_on(True), dali_loop)
future.result()
else:
# print("[PY] light off")
future = asyncio.run_coroutine_threadsafe(
dali_on(False), dali_loop)
future.result()
elif clusterId == 8 and attributeId == 0:
if len(value) == 2:
# print("[PY] level {}".format(value[0]))
future = asyncio.run_coroutine_threadsafe(
dali_level(value[0]), dali_loop)
future.result()
else:
print("[PY] no level")
else:
# print("[PY] [ERR] unhandled cluster {} or attribute {}".format(
# clusterId, attributeId))
pass
else:
print("[PY] [ERR] unhandled endpoint {} ".format(endpoint))
class Lighting:
def __init__(self):
self.chipLib = GetLibraryHandle(attributeChangeCallback)
if __name__ == "__main__":
lighting = Lighting()
lightMgrCmd = LightingMgrCmd()
print("Chip Lighting Device Shell")
print()
print("Starting DALI async")
threads = []
t = threading.Thread(target=daliworker)
threads.append(t)
t.start()
try:
lightMgrCmd.cmdloop()
except KeyboardInterrupt:
print("\nQuitting")
sys.exit(0)