blob: 038c16a4fc8de155e5312b8e37071ac4fa521de8 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import re
import shlex
from subprocess import check_output, getstatusoutput
from pathlib import Path
from dataclasses import dataclass
logger = logging.getLogger(__name__)
class MCUmgrException(Exception):
"""General MCUmgr exception."""
@dataclass
class MCUmgrImage:
image: int
slot: int
version: str = ''
flags: str = ''
hash: str = ''
class MCUmgr:
"""Sample wrapper for mcumgr command-line tool"""
mcumgr_exec = 'mcumgr'
def __init__(self, connection_options: str):
self.conn_opts = connection_options
@classmethod
def create_for_serial(cls, serial_port: str) -> MCUmgr:
return cls(connection_options=f'--conntype serial --connstring={serial_port}')
@classmethod
def is_available(cls) -> bool:
exitcode, output = getstatusoutput(f'{cls.mcumgr_exec} version')
if exitcode != 0:
logger.warning(f'mcumgr tool not available: {output}')
return False
return True
def run_command(self, cmd: str) -> str:
command = f'{self.mcumgr_exec} {self.conn_opts} {cmd}'
logger.info(f'CMD: {command}')
return check_output(shlex.split(command), text=True)
def reset_device(self):
self.run_command('reset')
def image_upload(self, image: Path | str, slot: int | None = None, timeout: int = 30):
command = f'-t {timeout} image upload {image}'
if slot is not None:
command += f' -e -n {slot}'
self.run_command(command)
logger.info('Image successfully uploaded')
def get_image_list(self) -> list[MCUmgrImage]:
output = self.run_command('image list')
return self._parse_image_list(output)
@staticmethod
def _parse_image_list(cmd_output: str) -> list[MCUmgrImage]:
image_list = []
re_image = re.compile(r'image=(\d+)\s+slot=(\d+)')
re_version = re.compile(r'version:\s+(\S+)')
re_flags = re.compile(r'flags:\s+(.+)')
re_hash = re.compile(r'hash:\s+(\w+)')
for line in cmd_output.splitlines():
if m := re_image.search(line):
image_list.append(
MCUmgrImage(
image=int(m.group(1)),
slot=int(m.group(2))
)
)
elif image_list:
if m := re_version.search(line):
image_list[-1].version = m.group(1)
elif m := re_flags.search(line):
image_list[-1].flags = m.group(1)
elif m := re_hash.search(line):
image_list[-1].hash = m.group(1)
return image_list
def get_hash_to_test(self) -> str:
image_list = self.get_image_list()
for image in image_list:
if 'active' not in image.flags:
return image.hash
logger.warning(f'Images returned by mcumgr (no not active):\n{image_list}')
raise MCUmgrException('No not active image found')
def get_hash_to_confirm(self):
image_list = self.get_image_list()
for image in image_list:
if 'confirmed' not in image.flags:
return image.hash
logger.warning(f'Images returned by mcumgr (no not confirmed):\n{image_list}')
raise MCUmgrException('No not confirmed image found')
def image_test(self, hash: str | None = None):
if not hash:
hash = self.get_hash_to_test()
self.run_command(f'image test {hash}')
def image_confirm(self, hash: str | None = None):
if not hash:
hash = self.get_hash_to_confirm()
self.run_command(f'image confirm {hash}')