| # Copyright (c) 2023 Nordic Semiconductor ASA |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| from __future__ import annotations |
| |
| import logging |
| import re |
| import shlex |
| |
| from subprocess import check_output, getstatusoutput |
| from pathlib import Path |
| from dataclasses import dataclass |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class MCUmgrException(Exception): |
| """General MCUmgr exception.""" |
| |
| |
| @dataclass |
| class MCUmgrImage: |
| image: int |
| slot: int |
| version: str = '' |
| flags: str = '' |
| hash: str = '' |
| |
| |
| class MCUmgr: |
| """Sample wrapper for mcumgr command-line tool""" |
| mcumgr_exec = 'mcumgr' |
| |
| def __init__(self, connection_options: str): |
| self.conn_opts = connection_options |
| |
| @classmethod |
| def create_for_serial(cls, serial_port: str) -> MCUmgr: |
| return cls(connection_options=f'--conntype serial --connstring={serial_port}') |
| |
| @classmethod |
| def is_available(cls) -> bool: |
| exitcode, output = getstatusoutput(f'{cls.mcumgr_exec} version') |
| if exitcode != 0: |
| logger.warning(f'mcumgr tool not available: {output}') |
| return False |
| return True |
| |
| def run_command(self, cmd: str) -> str: |
| command = f'{self.mcumgr_exec} {self.conn_opts} {cmd}' |
| logger.info(f'CMD: {command}') |
| return check_output(shlex.split(command), text=True) |
| |
| def reset_device(self): |
| self.run_command('reset') |
| |
| def image_upload(self, image: Path | str, slot: int | None = None, timeout: int = 30): |
| command = f'-t {timeout} image upload {image}' |
| if slot is not None: |
| command += f' -e -n {slot}' |
| self.run_command(command) |
| logger.info('Image successfully uploaded') |
| |
| def get_image_list(self) -> list[MCUmgrImage]: |
| output = self.run_command('image list') |
| return self._parse_image_list(output) |
| |
| @staticmethod |
| def _parse_image_list(cmd_output: str) -> list[MCUmgrImage]: |
| image_list = [] |
| re_image = re.compile(r'image=(\d+)\s+slot=(\d+)') |
| re_version = re.compile(r'version:\s+(\S+)') |
| re_flags = re.compile(r'flags:\s+(.+)') |
| re_hash = re.compile(r'hash:\s+(\w+)') |
| for line in cmd_output.splitlines(): |
| if m := re_image.search(line): |
| image_list.append( |
| MCUmgrImage( |
| image=int(m.group(1)), |
| slot=int(m.group(2)) |
| ) |
| ) |
| elif image_list: |
| if m := re_version.search(line): |
| image_list[-1].version = m.group(1) |
| elif m := re_flags.search(line): |
| image_list[-1].flags = m.group(1) |
| elif m := re_hash.search(line): |
| image_list[-1].hash = m.group(1) |
| return image_list |
| |
| def get_hash_to_test(self) -> str: |
| image_list = self.get_image_list() |
| for image in image_list: |
| if 'active' not in image.flags: |
| return image.hash |
| logger.warning(f'Images returned by mcumgr (no not active):\n{image_list}') |
| raise MCUmgrException('No not active image found') |
| |
| def get_hash_to_confirm(self): |
| image_list = self.get_image_list() |
| for image in image_list: |
| if 'confirmed' not in image.flags: |
| return image.hash |
| logger.warning(f'Images returned by mcumgr (no not confirmed):\n{image_list}') |
| raise MCUmgrException('No not confirmed image found') |
| |
| def image_test(self, hash: str | None = None): |
| if not hash: |
| hash = self.get_hash_to_test() |
| self.run_command(f'image test {hash}') |
| |
| def image_confirm(self, hash: str | None = None): |
| if not hash: |
| hash = self.get_hash_to_confirm() |
| self.run_command(f'image confirm {hash}') |