blob: 99433d7ed88660f59a1f9193f4ca464874e32117 [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import filecmp
import logging
import os
import subprocess
import sys
import threading
from xmlrpc.server import SimpleXMLRPCServer
_DEFAULT_CHIP_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', '..'))
IP = '127.0.0.1'
PORT = 9000
if sys.platform == 'linux':
IP = '10.10.10.5'
class AppsRegister:
_instance = None
__accessories = {}
def init(self):
self.__startXMLRPCServer()
def uninit(self):
self.__stopXMLRPCServer()
@property
def accessories(self):
"""List of registered accessory applications."""
return self.__accessories.values()
def add(self, name, accessory):
self.__accessories[name] = accessory
def remove(self, name):
self.__accessories.pop(name)
def removeAll(self):
self.__accessories = {}
def get(self, name):
return self.__accessories[name]
def kill(self, name):
accessory = self.__accessories[name]
if accessory:
accessory.kill()
def killAll(self):
for accessory in self.__accessories.values():
accessory.kill()
def start(self, name, args):
accessory = self.__accessories[name]
if accessory:
# The args param comes directly from the sys.argv[2:] of Start.py and should contain a list of strings in
# key-value pair, e.g. [option1, value1, option2, value2, ...]
options = self.__createCommandLineOptions(args)
return accessory.start(options)
return False
def stop(self, name):
accessory = self.__accessories[name]
if accessory:
return accessory.stop()
return False
def reboot(self, name):
accessory = self.__accessories[name]
if accessory:
return accessory.stop() and accessory.start()
return False
def factoryResetAll(self):
for accessory in self.__accessories.values():
accessory.factoryReset()
def factoryReset(self, name):
accessory = self.__accessories[name]
if accessory:
return accessory.factoryReset()
return False
def waitForMessage(self, name, message, timeoutInSeconds=10):
accessory = self.__accessories[name]
if accessory:
# The message param comes directly from the sys.argv[2:] of WaitForMessage.py and should contain a list of strings that
# comprise the entire message to wait for
return accessory.waitForMessage(' '.join(message), timeoutInSeconds)
return False
def createOtaImage(self, otaImageFilePath, rawImageFilePath, rawImageContent, vid='0xDEAD', pid='0xBEEF'):
# Write the raw image content
with open(rawImageFilePath, 'w') as rawFile:
rawFile.write(rawImageContent)
# Add an OTA header to the raw file
otaImageTool = _DEFAULT_CHIP_ROOT + '/src/app/ota_image_tool.py'
cmd = [otaImageTool, 'create', '-v', vid, '-p', pid, '-vn', '2',
'-vs', "2.0", '-da', 'sha256', rawImageFilePath, otaImageFilePath]
s = subprocess.Popen(cmd)
s.wait()
if s.returncode != 0:
raise Exception('Cannot create OTA image file')
return True
def compareFiles(self, file1, file2):
if filecmp.cmp(file1, file2, shallow=False) is False:
raise Exception('Files %s and %s do not match' % (file1, file2))
return True
def createFile(self, filePath, fileContent):
with open(filePath, 'w') as rawFile:
rawFile.write(fileContent)
return True
def deleteFile(self, filePath):
if os.path.exists(filePath):
os.remove(filePath)
return True
def __startXMLRPCServer(self):
self.server = SimpleXMLRPCServer((IP, PORT))
self.server.register_function(self.start, 'start')
self.server.register_function(self.stop, 'stop')
self.server.register_function(self.reboot, 'reboot')
self.server.register_function(self.factoryReset, 'factoryReset')
self.server.register_function(self.waitForMessage, 'waitForMessage')
self.server.register_function(self.compareFiles, 'compareFiles')
self.server.register_function(self.createOtaImage, 'createOtaImage')
self.server.register_function(self.createFile, 'createFile')
self.server.register_function(self.deleteFile, 'deleteFile')
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
def __stopXMLRPCServer(self):
self.server.shutdown()
def __createCommandLineOptions(self, args):
if not args:
return {}
# args should contain a list of strings in key-value pair, e.g. [option1, value1, option2, value2, ...]
if (len(args) % 2) != 0:
logging.warning("Unexpected command line options %r - not key/value pairs (odd length)" % (args,))
return {}
# Create a dictionary from the key-value pair list
options = {args[i]: args[i+1] for i in range(0, len(args), 2)}
return options