scripts: add pytest plugin
Adding pytest plugin dedicated to running pytest tests in Zephyr
project. This plugin provides a dut fixture which allows to handle
bidirectional communication with the device under test. This version
of plugin can be used for tests dedicated to real hardware, QEMU and
native_posix simulator.
Co-authored-by: Lukasz Fundakowski <lukasz.fundakowski@nordicsemi.no>
Co-authored-by: Grzegorz Chwierut <grzegorz.chwierut@nordicsemi.no>
Co-authored-by: Katarzyna Giadla <katarzyna.giadla@nordicsemi.no>
Signed-off-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
diff --git a/scripts/pylib/pytest-twister-harness/.gitignore b/scripts/pylib/pytest-twister-harness/.gitignore
new file mode 100644
index 0000000..3372e28
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/.gitignore
@@ -0,0 +1,62 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+pip-wheel-metadata/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# Pycharm
+.idea/
+
+# VSCode
+.vscode/
diff --git a/scripts/pylib/pytest-twister-harness/README.rst b/scripts/pylib/pytest-twister-harness/README.rst
new file mode 100644
index 0000000..814282f
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/README.rst
@@ -0,0 +1,47 @@
+==============
+Pytest Twister harness
+==============
+
+Installation
+------------
+
+If you plan to use this plugin with Twister, then you don't need to install it
+separately by pip. When Twister uses this plugin for pytest tests, it updates
+`PYTHONPATH` variable, and then extends pytest command by
+`-p twister_harness.plugin` argument.
+
+
+Usage
+-----
+
+Run exemplary test shell application by Twister:
+
+.. code-block:: sh
+
+ cd ${ZEPHYR_BASE}
+
+ # native_posix & QEMU
+ ./scripts/twister -p native_posix -p qemu_x86 -T samples/subsys/testsuite/pytest/shell
+
+ # hardware
+ ./scripts/twister -p nrf52840dk_nrf52840 --device-testing --device-serial /dev/ttyACM0 -T samples/subsys/testsuite/pytest/shell
+
+or build shell application by west and call pytest directly:
+
+.. code-block:: sh
+
+ export PYTHONPATH=${ZEPHYR_BASE}/scripts/pylib/pytest-twister-harness/src:${PYTHONPATH}
+
+ cd ${ZEPHYR_BASE}/samples/subsys/testsuite/pytest/shell
+
+ # native_posix
+ west build -p -b native_posix -- -DCONFIG_NATIVE_UART_0_ON_STDINOUT=y
+ pytest --twister-harness --device-type=native --build-dir=build -p twister_harness.plugin
+
+ # QEMU
+ west build -p -b qemu_x86 -- -DQEMU_PIPE=qemu-fifo
+ pytest --twister-harness --device-type=qemu --build-dir=build -p twister_harness.plugin
+
+ # hardware
+ west build -p -b nrf52840dk_nrf52840
+ pytest --twister-harness --device-type=hardware --device-serial=/dev/ttyACM0 --build-dir=build -p twister_harness.plugin
diff --git a/scripts/pylib/pytest-twister-harness/pyproject.toml b/scripts/pylib/pytest-twister-harness/pyproject.toml
new file mode 100644
index 0000000..70119a1
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/pyproject.toml
@@ -0,0 +1,6 @@
+[build-system]
+build-backend = "setuptools.build_meta"
+requires = [
+ "setuptools >= 48.0.0",
+ "wheel",
+]
diff --git a/scripts/pylib/pytest-twister-harness/setup.cfg b/scripts/pylib/pytest-twister-harness/setup.cfg
new file mode 100644
index 0000000..35a8d93
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/setup.cfg
@@ -0,0 +1,33 @@
+[metadata]
+name = pytest-twister-harness
+version = attr: twister_harness.__version__
+description = Plugin for pytest to run tests which require interaction with real and simulated devices
+long_description = file: README.rst
+python_requires = ~=3.8
+classifiers =
+ Development Status :: 3 - Alpha
+ Intended Audience :: Developers
+ Topic :: Software Development :: Embedded Systems
+ Topic :: Software Development :: Quality Assurance
+ Operating System :: Posix :: Linux
+ Operating System :: Microsoft :: Windows
+ Programming Language :: Python :: 3.8
+ Programming Language :: Python :: 3.9
+ Programming Language :: Python :: 3.10
+ Programming Language :: Python :: 3.11
+
+[options]
+packages = find:
+package_dir =
+ =src
+install_requires =
+ psutil
+ pyserial
+ pytest>=7.0.0
+
+[options.packages.find]
+where = src
+
+[options.entry_points]
+pytest11 =
+ twister_harness = twister_harness.plugin
diff --git a/scripts/pylib/pytest-twister-harness/setup.py b/scripts/pylib/pytest-twister-harness/setup.py
new file mode 100644
index 0000000..fa79863
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/setup.py
@@ -0,0 +1,7 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import setuptools
+
+setuptools.setup()
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py
new file mode 100644
index 0000000..356fb95
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py
@@ -0,0 +1,5 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+__version__ = '0.0.1'
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py
new file mode 100644
index 0000000..e1e44b3
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py
@@ -0,0 +1,8 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+QEMU_FIFO_FILE_NAME: str = 'qemu-fifo'
+END_OF_DATA = object() #: used for indicating that there will be no more data in queue
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py
new file mode 100644
index 0000000..235c666
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py
new file mode 100644
index 0000000..13db01d
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py
@@ -0,0 +1,94 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import abc
+import logging
+import os
+from typing import Generator
+
+from twister_harness.log_files.log_file import LogFile, NullLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceAbstract(abc.ABC):
+ """Class defines an interface for all devices."""
+
+ def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+ """
+ :param device_config: device configuration
+ """
+ self.device_config: DeviceConfig = device_config
+ self.handler_log_file: LogFile = NullLogFile.create()
+ self.device_log_file: LogFile = NullLogFile.create()
+
+ def __repr__(self) -> str:
+ return f'{self.__class__.__name__}()'
+
+ @property
+ def env(self) -> dict[str, str]:
+ env = os.environ.copy()
+ return env
+
+ @abc.abstractmethod
+ def connect(self, timeout: float = 1) -> None:
+ """Connect with the device (e.g. via UART)"""
+
+ @abc.abstractmethod
+ def disconnect(self) -> None:
+ """Close a connection with the device"""
+
+ @abc.abstractmethod
+ def generate_command(self) -> None:
+ """
+ Generate command which will be used during flashing or running device.
+ """
+
+ def flash_and_run(self, timeout: float = 60.0) -> None:
+ """
+ Flash and run application on a device.
+
+ :param timeout: time out in seconds
+ """
+
+ @property
+ @abc.abstractmethod
+ def iter_stdout(self) -> Generator[str, None, None]:
+ """Iterate stdout from a device."""
+
+ @abc.abstractmethod
+ def write(self, data: bytes) -> None:
+ """Write data bytes to device"""
+
+ @abc.abstractmethod
+ def initialize_log_files(self):
+ """
+ Initialize file to store logs.
+ """
+
+ def stop(self) -> None:
+ """Stop device."""
+
+ # @abc.abstractmethod
+ # def read(self, size=1) -> None:
+ # """Read size bytes from device"""
+
+ # def read_until(self, expected, size=None):
+ # """Read until an expected bytes sequence is found"""
+ # lenterm = len(expected)
+ # line = bytearray()
+ # while True:
+ # c = self.read(1)
+ # if c:
+ # line += c
+ # if line[-lenterm:] == expected:
+ # break
+ # if size is not None and len(line) >= size:
+ # break
+ # else:
+ # break
+ # return bytes(line)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py
new file mode 100644
index 0000000..542b161
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py
@@ -0,0 +1,49 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+from typing import Type
+
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.device.hardware_adapter import HardwareAdapter
+from twister_harness.device.qemu_adapter import QemuAdapter
+from twister_harness.device.simulator_adapter import (
+ CustomSimulatorAdapter,
+ NativeSimulatorAdapter,
+ UnitSimulatorAdapter,
+)
+from twister_harness.exceptions import TwisterHarnessException
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceFactory:
+ _devices: dict[str, Type[DeviceAbstract]] = {}
+
+ @classmethod
+ def discover(cls):
+ """Return available devices."""
+
+ @classmethod
+ def register_device_class(cls, name: str, klass: Type[DeviceAbstract]):
+ if name not in cls._devices:
+ cls._devices[name] = klass
+
+ @classmethod
+ def get_device(cls, name: str) -> Type[DeviceAbstract]:
+ logger.debug('Get device type "%s"', name)
+ try:
+ return cls._devices[name]
+ except KeyError as e:
+ logger.error('There is no device with name "%s"', name)
+ raise TwisterHarnessException(f'There is no device with name "{name}"') from e
+
+
+DeviceFactory.register_device_class('custom', CustomSimulatorAdapter)
+DeviceFactory.register_device_class('native', NativeSimulatorAdapter)
+DeviceFactory.register_device_class('unit', UnitSimulatorAdapter)
+DeviceFactory.register_device_class('hardware', HardwareAdapter)
+DeviceFactory.register_device_class('qemu', QemuAdapter)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py
new file mode 100755
index 0000000..c6bda2f
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py
@@ -0,0 +1,91 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import io
+import logging
+import os
+import threading
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+
+class FifoHandler:
+ """Creates FIFO file for reading and writing."""
+
+ def __init__(self, fifo: str | Path):
+ """
+ :param fifo: path to fifo file
+ """
+ self._fifo_in = str(fifo) + '.in'
+ self._fifo_out = str(fifo) + '.out'
+ self.file_in: io.BytesIO | None = None
+ self.file_out: io.BytesIO | None = None
+ self._threads: list[threading.Thread] = []
+
+ @staticmethod
+ def _make_fifo_file(filename: str) -> None:
+ if os.path.exists(filename):
+ os.unlink(filename)
+ os.mkfifo(filename)
+ logger.debug('Created new fifo file: %s', filename)
+
+ @property
+ def is_open(self) -> bool:
+ try:
+ return bool(
+ self.file_in is not None and self.file_out is not None
+ and self.file_in.fileno() and self.file_out.fileno()
+ )
+ except ValueError:
+ return False
+
+ def connect(self):
+ self._make_fifo_file(self._fifo_in)
+ self._make_fifo_file(self._fifo_out)
+ self._threads = [
+ threading.Thread(target=self._open_fifo_in, daemon=True),
+ threading.Thread(target=self._open_fifo_out, daemon=True)
+ ]
+ for t in self._threads:
+ t.start()
+
+ def _open_fifo_in(self):
+ self.file_in = open(self._fifo_in, 'wb', buffering=0)
+
+ def _open_fifo_out(self):
+ self.file_out = open(self._fifo_out, 'rb', buffering=0)
+
+ def disconnect(self):
+ if self.file_in is not None:
+ self.file_in.close()
+ if self.file_out is not None:
+ self.file_out.close()
+ for t in self._threads:
+ t.join(timeout=1)
+ logger.debug(f'Unlink {self._fifo_in}')
+ os.unlink(self._fifo_in)
+ logger.debug(f'Unlink {self._fifo_out}')
+ os.unlink(self._fifo_out)
+
+ def read(self, __size: int | None = None) -> bytes:
+ return self.file_out.read(__size) # type: ignore[union-attr]
+
+ def readline(self, __size: int | None = None) -> bytes:
+ line = self.file_out.readline(__size) # type: ignore[union-attr]
+ return line
+
+ def write(self, __buffer: bytes) -> int:
+ return self.file_in.write(__buffer) # type: ignore[union-attr]
+
+ def flush(self):
+ if self.file_in:
+ self.file_in.flush()
+ if self.file_out:
+ self.file_out.flush()
+
+ def fileno(self) -> int:
+ return self.file_out.fileno() # type: ignore[union-attr]
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py
new file mode 100644
index 0000000..e0d34f9
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py
@@ -0,0 +1,226 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+import pty
+import re
+import shutil
+import subprocess
+from datetime import datetime
+from typing import Generator
+
+import serial
+
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.helper import log_command
+from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+logger = logging.getLogger(__name__)
+
+
+class HardwareAdapter(DeviceAbstract):
+ """Adapter class for real device."""
+
+ def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+ super().__init__(device_config, **kwargs)
+ self.connection: serial.Serial | None = None
+ self.command: list[str] = []
+ self.process_kwargs: dict = {
+ 'stdout': subprocess.PIPE,
+ 'stderr': subprocess.STDOUT,
+ 'env': self.env,
+ }
+ self.serial_pty_proc: subprocess.Popen | None = None
+
+ def connect(self, timeout: float = 1) -> None:
+ """
+ Open serial connection.
+
+ :param timeout: Read timeout value in seconds
+ """
+ if self.connection:
+ # already opened
+ return
+
+ if self.device_config.pre_script:
+ self.run_custom_script(self.device_config.pre_script, 30)
+
+ serial_name = self._open_serial_pty() or self.device_config.serial
+ logger.info('Opening serial connection for %s', serial_name)
+ try:
+ self.connection = serial.Serial(
+ serial_name,
+ baudrate=self.device_config.baud,
+ parity=serial.PARITY_NONE,
+ stopbits=serial.STOPBITS_ONE,
+ bytesize=serial.EIGHTBITS,
+ timeout=timeout
+ )
+ except serial.SerialException as e:
+ logger.exception('Cannot open connection: %s', e)
+ self._close_serial_pty()
+ raise
+
+ self.connection.flush()
+
+ def disconnect(self) -> None:
+ """Close serial connection."""
+ if self.connection:
+ serial_name = self.connection.port
+ self.connection.close()
+ self.connection = None
+ logger.info('Closed serial connection for %s', serial_name)
+ self._close_serial_pty()
+
+ def stop(self) -> None:
+ if self.device_config.post_script:
+ self.run_custom_script(self.device_config.post_script, 30)
+
+ def _open_serial_pty(self) -> str | None:
+ """Open a pty pair, run process and return tty name"""
+ if not self.device_config.serial_pty:
+ return None
+ master, slave = pty.openpty()
+ try:
+ self.serial_pty_proc = subprocess.Popen(
+ re.split(',| ', self.device_config.serial_pty),
+ stdout=master,
+ stdin=master,
+ stderr=master
+ )
+ except subprocess.CalledProcessError as e:
+ logger.exception('Failed to run subprocess %s, error %s',
+ self.device_config.serial_pty, str(e))
+ raise
+ return os.ttyname(slave)
+
+ def _close_serial_pty(self) -> None:
+ """Terminate the process opened for serial pty script"""
+ if self.serial_pty_proc:
+ self.serial_pty_proc.terminate()
+ self.serial_pty_proc.communicate()
+ logger.info('Process %s terminated', self.device_config.serial_pty)
+ self.serial_pty_proc = None
+
+ def generate_command(self) -> None:
+ """Return command to flash."""
+ west = shutil.which('west')
+ if west is None:
+ raise TwisterHarnessException('west not found')
+
+ command = [
+ west,
+ 'flash',
+ '--skip-rebuild',
+ '--build-dir', str(self.device_config.build_dir),
+ ]
+
+ command_extra_args = []
+ if self.device_config.west_flash_extra_args:
+ command_extra_args.extend(self.device_config.west_flash_extra_args)
+
+ if runner := self.device_config.runner:
+ command.extend(['--runner', runner])
+
+ if board_id := self.device_config.id:
+ if runner == 'pyocd':
+ command_extra_args.append('--board-id')
+ command_extra_args.append(board_id)
+ elif runner == 'nrfjprog':
+ command_extra_args.append('--dev-id')
+ command_extra_args.append(board_id)
+ elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']:
+ command_extra_args.append('--cmd-pre-init')
+ command_extra_args.append(f'hla_serial {board_id}')
+ elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP':
+ command_extra_args.append('--cmd-pre-init')
+ command_extra_args.append(f'cmsis_dap_serial {board_id}')
+ elif runner == 'jlink':
+ command.append(f'--tool-opt=-SelectEmuBySN {board_id}')
+ elif runner == 'stm32cubeprogrammer':
+ command.append(f'--tool-opt=sn={board_id}')
+
+ if command_extra_args:
+ command.append('--')
+ command.extend(command_extra_args)
+ self.command = command
+
+ @staticmethod
+ def run_custom_script(script, timeout):
+ with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
+ try:
+ stdout, stderr = proc.communicate(timeout=timeout)
+ logger.debug(stdout.decode())
+ if proc.returncode != 0:
+ logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
+
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.communicate()
+ logger.error("{} timed out".format(script))
+
+ def flash_and_run(self, timeout: float = 60.0) -> None:
+ if not self.command:
+ msg = 'Flash command is empty, please verify if it was generated properly.'
+ logger.error(msg)
+ raise TwisterHarnessException(msg)
+ if self.device_config.id:
+ logger.info('Flashing device %s', self.device_config.id)
+ log_command(logger, 'Flashing command', self.command, level=logging.INFO)
+ try:
+ process = subprocess.Popen(
+ self.command,
+ **self.process_kwargs
+ )
+ except subprocess.CalledProcessError:
+ logger.error('Error while flashing device')
+ raise TwisterHarnessException('Could not flash device')
+ else:
+ stdout = stderr = None
+ try:
+ stdout, stderr = process.communicate(timeout=self.device_config.flashing_timeout)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ finally:
+ if stdout:
+ self.device_log_file.handle(data=stdout)
+ logger.debug(stdout.decode(errors='ignore'))
+ if stderr:
+ self.device_log_file.handle(data=stderr)
+ if process.returncode == 0:
+ logger.info('Flashing finished')
+ else:
+ raise TwisterHarnessException(f'Could not flash device {self.device_config.id}')
+ finally:
+ if self.device_config.post_flash_script:
+ self.run_custom_script(self.device_config.post_flash_script, 30)
+
+ @property
+ def iter_stdout(self) -> Generator[str, None, None]:
+ """Return output from serial."""
+ if not self.connection:
+ return
+ self.connection.flush()
+ self.connection.reset_input_buffer()
+ while self.connection and self.connection.is_open:
+ stream = self.connection.readline()
+ self.handler_log_file.handle(data=stream)
+ yield stream.decode(errors='ignore').strip()
+
+ def write(self, data: bytes) -> None:
+ """Write data to serial"""
+ if self.connection:
+ self.connection.write(data)
+
+ def initialize_log_files(self) -> None:
+ self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
+ self.device_log_file = DeviceLogFile.create(build_dir=self.device_config.build_dir)
+ start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
+ self.handler_log_file.handle(start_msg)
+ self.device_log_file.handle(start_msg)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py
new file mode 100755
index 0000000..4c649b9
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py
@@ -0,0 +1,196 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+import shutil
+import signal
+import subprocess
+import threading
+import time
+from datetime import datetime
+from pathlib import Path
+from queue import Empty, Queue
+from typing import Generator
+
+import psutil
+
+from twister_harness.constants import QEMU_FIFO_FILE_NAME
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.device.fifo_handler import FifoHandler
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.helper import log_command
+from twister_harness.log_files.log_file import HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+logger = logging.getLogger(__name__)
+
+
+class QemuAdapter(DeviceAbstract):
+ """Adapter for Qemu simulator"""
+
+ def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+ super().__init__(device_config, **kwargs)
+ self._process: subprocess.Popen | None = None
+ self._process_ended_with_timeout: bool = False
+ self._exc: Exception | None = None #: store any exception which appeared running this thread
+ self._thread: threading.Thread | None = None
+ self._emulation_was_finished: bool = False
+ self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME))
+ self.command: list[str] = []
+ self.timeout: float = 60 # running timeout in seconds
+ self.booting_timeout_in_ms: int = 10_000 #: wait time for booting Qemu in milliseconds
+
+ def generate_command(self) -> None:
+ """Return command to flash."""
+ if (west := shutil.which('west')) is None:
+ logger.error('west not found')
+ self.command = []
+ else:
+ self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
+
+ def connect(self, timeout: float = 1) -> None:
+ logger.debug('Opening connection')
+ self.connection.connect()
+
+ def flash_and_run(self, timeout: float = 60.0) -> None:
+ self.timeout = timeout
+ if not self.command:
+ msg = 'Run simulation command is empty, please verify if it was generated properly.'
+ logger.error(msg)
+ raise TwisterHarnessException(msg)
+
+ self._thread = threading.Thread(target=self._run_command, args=(self.timeout,), daemon=True)
+ self._thread.start()
+ # Give a time to start subprocess before test is executed
+ time.sleep(0.1)
+ # Check if subprocess (simulation) has started without errors
+ if self._exc is not None:
+ logger.error('Simulation failed due to an exception: %s', self._exc)
+ raise self._exc
+
+ def _run_command(self, timeout: float) -> None:
+ log_command(logger, 'Running command', self.command, level=logging.INFO)
+ try:
+ self._process = subprocess.Popen(
+ self.command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=self.env
+ )
+ stdout, _ = self._process.communicate(timeout=timeout)
+ return_code: int = self._process.returncode
+ except subprocess.TimeoutExpired:
+ logger.error('Running simulation finished after timeout: %s seconds', timeout)
+ self._process_ended_with_timeout = True
+ # we don't want to raise Timeout exception, but allowed a test to parse the output
+ # and set proper status
+ except subprocess.SubprocessError as e:
+ logger.error('Running simulation failed due to subprocess error %s', e)
+ self._exc = TwisterHarnessException(e.args)
+ except FileNotFoundError as e:
+ logger.error(f'Running simulation failed due to file not found: {e.filename}')
+ self._exc = TwisterHarnessException(f'File not found: {e.filename}')
+ except Exception as e:
+ logger.error('Running simulation failed: %s', e)
+ self._exc = TwisterHarnessException(e.args)
+ else:
+ if return_code == 0:
+ logger.info('Running simulation finished with return code %s', return_code)
+ elif return_code == -15:
+ logger.info('Running simulation terminated')
+ else:
+ logger.warning('Running simulation finished with return code %s', return_code)
+ for line in stdout.decode('utf-8').split('\n'):
+ logger.info(line)
+ finally:
+ self._emulation_was_finished = True
+
+ def disconnect(self):
+ logger.debug('Closing connection')
+ self.connection.disconnect()
+
+ def stop(self) -> None:
+ """Stop device."""
+ time.sleep(0.1) # give a time to end while loop in running simulation
+ if self._process is not None and self._process.returncode is None:
+ logger.debug('Stopping all running processes for PID %s', self._process.pid)
+ # kill all child subprocesses
+ for child in psutil.Process(self._process.pid).children(recursive=True):
+ try:
+ os.kill(child.pid, signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+ # kill subprocess if it is still running
+ os.kill(self._process.pid, signal.SIGTERM)
+ if self._thread is not None:
+ self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
+ if self._exc:
+ raise self._exc
+
+ def _wait_for_fifo(self):
+ for _ in range(int(self.booting_timeout_in_ms / 10) or 1):
+ if self.connection.is_open:
+ break
+ elif self._emulation_was_finished:
+ msg = 'Problem with starting QEMU'
+ logger.error(msg)
+ raise TwisterHarnessException(msg)
+ time.sleep(0.1)
+ else:
+ msg = 'Problem with starting QEMU - fifo file was not created yet'
+ logger.error(msg)
+ raise TwisterHarnessException(msg)
+
+ @property
+ def iter_stdout(self) -> Generator[str, None, None]:
+ if not self.connection:
+ return
+ # fifo file can be not create yet, so we need to wait for a while
+ self._wait_for_fifo()
+
+ # create unblocking reading from fifo file
+ q: Queue = Queue()
+
+ def read_lines():
+ while self.connection and self.connection.is_open:
+ try:
+ line = self.connection.readline().decode('UTF-8').strip()
+ except (OSError, ValueError):
+ # file could be closed already so we should stop reading
+ break
+ if len(line) != 0:
+ q.put(line)
+
+ t = threading.Thread(target=read_lines, daemon=True)
+ t.start()
+
+ end_time = time.time() + self.timeout
+ try:
+ while True:
+ try:
+ stream = q.get(timeout=0.1)
+ self.handler_log_file.handle(data=stream + '\n')
+ yield stream
+ except Empty: # timeout appeared
+ pass
+ if time.time() > end_time:
+ break
+ except KeyboardInterrupt:
+ # let thread to finish smoothly
+ pass
+ finally:
+ t.join(1)
+
+ def write(self, data: bytes) -> None:
+ """Write data to serial"""
+ if self.connection:
+ self.connection.write(data)
+
+ def initialize_log_files(self):
+ self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
+ start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
+ self.handler_log_file.handle(start_msg)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py
new file mode 100755
index 0000000..558e206
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py
@@ -0,0 +1,220 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import abc
+import asyncio
+import asyncio.subprocess
+import logging
+import os
+import shutil
+import signal
+import subprocess
+import threading
+import time
+from asyncio.base_subprocess import BaseSubprocessTransport
+from datetime import datetime
+from functools import wraps
+from pathlib import Path
+from queue import Queue
+from typing import Generator
+
+import psutil
+
+from twister_harness.constants import END_OF_DATA
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.helper import log_command
+from twister_harness.log_files.log_file import HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+# Workaround for RuntimeError: Event loop is closed
+# https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/
+def silence_event_loop_closed(func):
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except RuntimeError as e:
+ if str(e) != 'Event loop is closed':
+ raise
+
+ return wrapper
+
+
+BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__) # type: ignore
+
+logger = logging.getLogger(__name__)
+
+
+class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
+
+ def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+ """
+ :param twister_config: twister configuration
+ """
+ super().__init__(device_config, **kwargs)
+ self._process: asyncio.subprocess.Process | None = None
+ self._process_ended_with_timeout: bool = False
+ self.queue: Queue = Queue()
+ self._stop_job: bool = False
+ self._exc: Exception | None = None #: store any exception which appeared running this thread
+ self._thread: threading.Thread | None = None
+ self.command: list[str] = []
+ self.process_kwargs: dict = {
+ 'stdout': asyncio.subprocess.PIPE,
+ 'stderr': asyncio.subprocess.STDOUT,
+ 'stdin': asyncio.subprocess.PIPE,
+ 'env': self.env,
+ }
+ self._data_to_send: bytes | None = None
+
+ def connect(self, timeout: float = 1) -> None:
+ pass # pragma: no cover
+
+ def flash_and_run(self, timeout: float = 60.0) -> None:
+ if not self.command:
+ msg = 'Run simulation command is empty, please verify if it was generated properly.'
+ logger.error(msg)
+ raise TwisterHarnessException(msg)
+ self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True)
+ self._thread.start()
+ # Give a time to start subprocess before test is executed
+ time.sleep(0.1)
+ # Check if subprocess (simulation) has started without errors
+ if self._exc is not None:
+ logger.error('Simulation failed due to an exception: %s', self._exc)
+ raise self._exc
+
+ def _run_simulation(self, timeout: float) -> None:
+ log_command(logger, 'Running command', self.command, level=logging.INFO)
+ try:
+ return_code: int = asyncio.run(self._run_command(timeout=timeout))
+ except subprocess.SubprocessError as e:
+ logger.error('Running simulation failed due to subprocess error %s', e)
+ self._exc = TwisterHarnessException(e.args)
+ except FileNotFoundError as e:
+ logger.error(f'Running simulation failed due to file not found: {e.filename}')
+ self._exc = TwisterHarnessException(f'File not found: {e.filename}')
+ except Exception as e:
+ logger.error('Running simulation failed: %s', e)
+ self._exc = TwisterHarnessException(e.args)
+ else:
+ if return_code == 0:
+ logger.info('Running simulation finished with return code %s', return_code)
+ elif return_code == -15:
+ logger.info('Running simulation stopped interrupted by user')
+ else:
+ logger.warning('Running simulation finished with return code %s', return_code)
+ finally:
+ self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
+
+ async def _run_command(self, timeout: float = 60.):
+ assert isinstance(self.command, (list, tuple, set))
+ # to avoid stupid and difficult to debug mistakes
+ # we are using asyncio to run subprocess to be able to read from stdout
+ # without blocking while loop (readline with timeout)
+ self._process = await asyncio.create_subprocess_exec(
+ *self.command,
+ **self.process_kwargs
+ )
+ logger.debug('Started subprocess with PID %s', self._process.pid)
+ end_time = time.time() + timeout
+ while not self._stop_job and not self._process.stdout.at_eof(): # type: ignore[union-attr]
+ if line := await self._read_line(timeout=0.1):
+ self.queue.put(line.decode('utf-8').strip())
+ if time.time() > end_time:
+ self._process_ended_with_timeout = True
+ logger.info(f'Finished process with PID {self._process.pid} after {timeout} seconds timeout')
+ break
+ if self._data_to_send:
+ self._process.stdin.write(self._data_to_send) # type: ignore[union-attr]
+ await self._process.stdin.drain() # type: ignore[union-attr]
+ self._data_to_send = None
+
+ self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
+ return await self._process.wait()
+
+ async def _read_line(self, timeout=0.1) -> bytes | None:
+ try:
+ return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout) # type: ignore[union-attr]
+ except asyncio.TimeoutError:
+ return None
+
+ def disconnect(self):
+ pass # pragma: no cover
+
+ def stop(self) -> None:
+ """Stop device."""
+ self._stop_job = True
+ time.sleep(0.1) # give a time to end while loop in running simulation
+ if self._process is not None and self._process.returncode is None:
+ logger.debug('Stopping all running processes for PID %s', self._process.pid)
+ # kill subprocess if it is still running
+ for child in psutil.Process(self._process.pid).children(recursive=True):
+ try:
+ os.kill(child.pid, signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+ # kill subprocess if it is still running
+ os.kill(self._process.pid, signal.SIGTERM)
+ if self._thread is not None:
+ self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
+ if self._exc:
+ raise self._exc
+
+ @property
+ def iter_stdout(self) -> Generator[str, None, None]:
+ """Return output from serial."""
+ while True:
+ stream = self.queue.get()
+ if stream == END_OF_DATA:
+ logger.debug('No more data from running process')
+ break
+ self.handler_log_file.handle(data=stream + '\n')
+ yield stream
+ self.queue.task_done()
+
+ def write(self, data: bytes) -> None:
+ """Write data to serial"""
+ while self._data_to_send:
+ # wait data will be write to self._process.stdin.write
+ time.sleep(0.1)
+ self._data_to_send = data
+
+ def initialize_log_files(self):
+ self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
+ start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
+ self.handler_log_file.handle(start_msg)
+
+
+class NativeSimulatorAdapter(SimulatorAdapterBase):
+ """Simulator adapter to run `zephyr.exe` simulation"""
+
+ def generate_command(self) -> None:
+ """Return command to run."""
+ self.command = [
+ str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve())
+ ]
+
+
+class UnitSimulatorAdapter(SimulatorAdapterBase):
+ """Simulator adapter to run unit tests"""
+
+ def generate_command(self) -> None:
+ """Return command to run."""
+ self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())]
+
+
+class CustomSimulatorAdapter(SimulatorAdapterBase):
+
+ def generate_command(self) -> None:
+ """Return command to run."""
+ if (west := shutil.which('west')) is None:
+ logger.error('west not found')
+ self.command = []
+ else:
+ self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py
new file mode 100644
index 0000000..4fbd3b5
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py
@@ -0,0 +1,6 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class TwisterHarnessException(Exception):
+ """General Twister harness exception."""
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py
new file mode 100644
index 0000000..235c666
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py
new file mode 100644
index 0000000..90e82cf
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py
@@ -0,0 +1,39 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+from typing import Generator, Type
+
+import pytest
+
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.device.factory import DeviceFactory
+from twister_harness.twister_harness_config import DeviceConfig, TwisterHarnessConfig
+
+logger = logging.getLogger(__name__)
+
+
+@pytest.fixture(scope='function')
+def dut(request: pytest.FixtureRequest) -> Generator[DeviceAbstract, None, None]:
+ """Return device instance."""
+ twister_harness_config: TwisterHarnessConfig = request.config.twister_harness_config # type: ignore
+ device_config: DeviceConfig = twister_harness_config.devices[0]
+ device_type = device_config.type
+
+ device_class: Type[DeviceAbstract] = DeviceFactory.get_device(device_type)
+
+ device = device_class(device_config)
+
+ try:
+ device.connect()
+ device.generate_command()
+ device.initialize_log_files()
+ device.flash_and_run()
+ device.connect()
+ yield device
+ except KeyboardInterrupt:
+ pass
+ finally: # to make sure we close all running processes after user broke execution
+ device.disconnect()
+ device.stop()
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py
new file mode 100644
index 0000000..d60717d
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os.path
+import platform
+import shlex
+
+_WINDOWS = platform.system() == 'Windows'
+
+logger = logging.getLogger(__name__)
+
+
+def log_command(logger: logging.Logger, msg: str, args: list, level: int = logging.DEBUG):
+ """
+ Platform-independent helper for logging subprocess invocations.
+
+ Will log a command string that can be copy/pasted into a POSIX
+ shell on POSIX platforms. This is not available on Windows, so
+ the entire args array is logged instead.
+
+ :param logger: logging.Logger to use
+ :param msg: message to associate with the command
+ :param args: argument list as passed to subprocess module
+ :param level: log level
+ """
+ msg = f'{msg}: %s'
+ if _WINDOWS:
+ logger.log(level, msg, str(args))
+ else:
+ logger.log(level, msg, shlex.join(args))
+
+
+def normalize_filename(filename: str) -> str:
+ filename = os.path.expanduser(os.path.expandvars(filename))
+ filename = os.path.normpath(os.path.abspath(filename))
+ return filename
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py
new file mode 100644
index 0000000..84fa9ca
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging.config
+import os
+
+import pytest
+
+
+def configure_logging(config: pytest.Config) -> None:
+ """Configure logging."""
+ output_dir = config.option.output_dir
+ os.makedirs(output_dir, exist_ok=True)
+ log_file = os.path.join(output_dir, 'twister_harness.log')
+
+ if hasattr(config, 'workerinput'):
+ worker_id = config.workerinput['workerid']
+ log_file = os.path.join(output_dir, f'twister_harness_{worker_id}.log')
+
+ log_format = '%(asctime)s:%(levelname)s:%(name)s: %(message)s'
+ log_level = config.getoption('--log-level') or config.getini('log_level') or logging.INFO
+ log_file = config.getoption('--log-file') or config.getini('log_file') or log_file
+ log_format = config.getini('log_cli_format') or log_format
+
+ default_config = {
+ 'version': 1,
+ 'disable_existing_loggers': False,
+ 'formatters': {
+ 'standard': {
+ 'format': log_format,
+ },
+ 'simply': {
+ 'format': '%(asctime)s.%(msecs)d:%(levelname)s: %(message)s',
+ 'datefmt': '%H:%M:%S'
+ }
+ },
+ 'handlers': {
+ 'file': {
+ 'class': 'logging.FileHandler',
+ 'level': 'DEBUG',
+ 'formatter': 'standard',
+ 'filters': [],
+ 'filename': log_file,
+ 'encoding': 'utf8',
+ 'mode': 'w'
+ },
+ 'console': {
+ 'class': 'logging.StreamHandler',
+ 'level': 'DEBUG',
+ 'formatter': 'simply',
+ 'filters': [],
+ }
+ },
+ 'loggers': {
+ '': {
+ 'handlers': ['console', 'file'],
+ 'level': 'WARNING',
+ 'propagate': False
+ },
+ 'twister_harness': {
+ 'handlers': ['console', 'file'],
+ 'level': log_level,
+ 'propagate': False,
+ }
+ }
+ }
+
+ logging.config.dictConfig(default_config)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py
new file mode 100755
index 0000000..235c666
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py
new file mode 100755
index 0000000..7c4b333
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+import sys
+from pathlib import Path
+
+from twister_harness.helper import normalize_filename
+
+logger = logging.getLogger(__name__)
+
+
+class LogFile:
+ """Base class for logging files."""
+ name = 'uninitialized'
+
+ def __init__(self, filename: str | Path) -> None:
+ self.default_encoding = sys.getdefaultencoding()
+ self.filename = filename
+
+ @staticmethod
+ def get_log_filename(build_dir: Path | str, name: str) -> str:
+ """
+ :param build_dir: path to building directory.
+ :param name: name of the logging file.
+ :return: path to logging file
+ """
+ if not build_dir:
+ filename = os.devnull
+ else:
+ name = name + '.log'
+ filename = os.path.join(build_dir, name)
+ filename = normalize_filename(filename=filename)
+ return filename
+
+ def handle(self, data: str | bytes) -> None:
+ """Save information to logging file."""
+ if data:
+ data = data.decode(encoding=self.default_encoding) if isinstance(data, bytes) else data
+ with open(file=self.filename, mode='a+', encoding=self.default_encoding) as log_file:
+ log_file.write(data) # type: ignore[arg-type]
+
+ @classmethod
+ def create(cls, build_dir: Path | str = '') -> LogFile:
+ filename = cls.get_log_filename(build_dir=build_dir, name=cls.name)
+ return cls(filename)
+
+
+class BuildLogFile(LogFile):
+ """Save logs from the building."""
+ name = 'build'
+
+
+class HandlerLogFile(LogFile):
+ """Save output from a device."""
+ name = 'handler'
+
+
+class DeviceLogFile(LogFile):
+ """Save errors during flashing onto device."""
+ name = 'device'
+
+
+class NullLogFile(LogFile):
+ """Placeholder for no initialized log file"""
+ def handle(self, data: str | bytes) -> None:
+ """This method does nothing."""
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py
new file mode 100644
index 0000000..a510648
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py
@@ -0,0 +1,148 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+from pathlib import Path
+
+import pytest
+
+from twister_harness.log import configure_logging
+from twister_harness.twister_harness_config import TwisterHarnessConfig
+
+logger = logging.getLogger(__name__)
+
+pytest_plugins = (
+ 'twister_harness.fixtures.dut'
+)
+
+
+def pytest_addoption(parser: pytest.Parser):
+ twister_harness_group = parser.getgroup('Twister harness')
+ twister_harness_group.addoption(
+ '--twister-harness',
+ action='store_true',
+ default=False,
+ help='Activate Twister harness plugin'
+ )
+ parser.addini(
+ 'twister_harness',
+ 'Activate Twister harness plugin',
+ type='bool'
+ )
+ twister_harness_group.addoption(
+ '-O',
+ '--outdir',
+ metavar='PATH',
+ dest='output_dir',
+ help='Output directory for logs. If not provided then use '
+ '--build-dir path as default.'
+ )
+ twister_harness_group.addoption(
+ '--platform',
+ help='Choose specific platform'
+ )
+ twister_harness_group.addoption(
+ '--device-type',
+ choices=('native', 'qemu', 'hardware', 'unit', 'custom'),
+ help='Choose type of device (hardware, qemu, etc.)'
+ )
+ twister_harness_group.addoption(
+ '--device-serial',
+ help='Serial device for accessing the board '
+ '(e.g., /dev/ttyACM0)'
+ )
+ twister_harness_group.addoption(
+ '--device-serial-baud',
+ type=int,
+ default=115200,
+ help='Serial device baud rate (default 115200)'
+ )
+ twister_harness_group.addoption(
+ '--runner',
+ help='use the specified west runner (pyocd, nrfjprog, etc)'
+ )
+ twister_harness_group.addoption(
+ '--device-id',
+ help='ID of connected hardware device (for example 000682459367)'
+ )
+ twister_harness_group.addoption(
+ '--device-product',
+ help='Product name of connected hardware device (for example "STM32 STLink")'
+ )
+ twister_harness_group.addoption(
+ '--device-serial-pty',
+ metavar='PATH',
+ help='Script for controlling pseudoterminal. '
+ 'E.g --device-testing --device-serial-pty=<script>'
+ )
+ twister_harness_group.addoption(
+ '--west-flash-extra-args',
+ help='Extend parameters for west flash. '
+ 'E.g. --west-flash-extra-args="--board-id=foobar,--erase" '
+ 'will translate to "west flash -- --board-id=foobar --erase"'
+ )
+ twister_harness_group.addoption(
+ '--flashing-timeout',
+ type=int,
+ default=60,
+ help='Set timeout for the device flash operation in seconds.'
+ )
+ twister_harness_group.addoption(
+ '--build-dir',
+ dest='build_dir',
+ metavar='PATH',
+ help='Directory with built application.'
+ )
+ twister_harness_group.addoption(
+ '--binary-file',
+ metavar='PATH',
+ help='Path to file which should be flashed.'
+ )
+ twister_harness_group.addoption(
+ '--pre-script',
+ metavar='PATH'
+ )
+ twister_harness_group.addoption(
+ '--post-script',
+ metavar='PATH'
+ )
+ twister_harness_group.addoption(
+ '--post-flash-script',
+ metavar='PATH'
+ )
+
+
+def pytest_configure(config: pytest.Config):
+ if config.getoption('help'):
+ return
+
+ if not (config.getoption('twister_harness') or config.getini('twister_harness')):
+ return
+
+ validate_options(config)
+
+ if config.option.output_dir is None:
+ config.option.output_dir = config.option.build_dir
+ config.option.output_dir = _normalize_path(config.option.output_dir)
+
+ # create output directory if not exists
+ os.makedirs(config.option.output_dir, exist_ok=True)
+
+ configure_logging(config)
+
+ config.twister_harness_config = TwisterHarnessConfig.create(config) # type: ignore
+
+
+def validate_options(config: pytest.Config) -> None:
+ """Verify if user provided proper options"""
+ # TBD
+
+
+def _normalize_path(path: str | Path) -> str:
+ path = os.path.expanduser(os.path.expandvars(path))
+ path = os.path.normpath(os.path.abspath(path))
+ return path
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py
new file mode 100644
index 0000000..7f3251a
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py
@@ -0,0 +1,75 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass, field
+from pathlib import Path
+
+import pytest
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class DeviceConfig:
+ platform: str = ''
+ type: str = ''
+ serial: str = ''
+ baud: int = 115200
+ runner: str = ''
+ id: str = ''
+ product: str = ''
+ serial_pty: str = ''
+ west_flash_extra_args: list[str] = field(default_factory=list, repr=False)
+ flashing_timeout: int = 60 # [s]
+ build_dir: Path | str = ''
+ binary_file: Path | str = ''
+ name: str = ''
+ pre_script: str = ''
+ post_script: str = ''
+ post_flash_script: str = ''
+
+
+@dataclass
+class TwisterHarnessConfig:
+ """Store Twister harness configuration to have easy access in test."""
+ output_dir: Path = Path('twister_harness_out')
+ devices: list[DeviceConfig] = field(default_factory=list, repr=False)
+
+ @classmethod
+ def create(cls, config: pytest.Config) -> TwisterHarnessConfig:
+ """Create new instance from pytest.Config."""
+ output_dir: Path = config.option.output_dir
+
+ devices = []
+
+ west_flash_extra_args: list[str] = []
+ if config.option.west_flash_extra_args:
+ west_flash_extra_args = [w.strip() for w in config.option.west_flash_extra_args.split(',')]
+ device_from_cli = DeviceConfig(
+ platform=config.option.platform,
+ type=config.option.device_type,
+ serial=config.option.device_serial,
+ baud=config.option.device_serial_baud,
+ runner=config.option.runner,
+ id=config.option.device_id,
+ product=config.option.device_product,
+ serial_pty=config.option.device_serial_pty,
+ west_flash_extra_args=west_flash_extra_args,
+ flashing_timeout=config.option.flashing_timeout,
+ build_dir=config.option.build_dir,
+ binary_file=config.option.binary_file,
+ pre_script=config.option.pre_script,
+ post_script=config.option.post_script,
+ post_flash_script=config.option.post_flash_script,
+ )
+
+ devices.append(device_from_cli)
+
+ return cls(
+ output_dir=output_dir,
+ devices=devices
+ )
diff --git a/scripts/pylib/pytest-twister-harness/tests/conftest.py b/scripts/pylib/pytest-twister-harness/tests/conftest.py
new file mode 100755
index 0000000..a3e7f7a
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/conftest.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from pathlib import Path
+
+import pytest
+
+# pytest_plugins = ['pytester']
+
+
+@pytest.fixture
+def resources(request: pytest.FixtureRequest) -> Path:
+ """Return path to `data` folder"""
+ return Path(request.module.__file__).parent.joinpath('data')
+
+
+@pytest.fixture(scope='function')
+def copy_example(pytester) -> Path:
+ """Copy example tests to temporary directory and return path the temp directory."""
+ resources_dir = Path(__file__).parent / 'data'
+ pytester.copy_example(str(resources_dir))
+ return pytester.path
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/data/fifo_mock.py b/scripts/pylib/pytest-twister-harness/tests/device/data/fifo_mock.py
new file mode 100755
index 0000000..f269766
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/data/fifo_mock.py
@@ -0,0 +1,100 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import os
+import sys
+import threading
+import time
+from argparse import ArgumentParser
+
+content = """
+The Zen of Python, by Tim Peters
+
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Although that way may not be obvious at first unless you're Dutch.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+Namespaces are one honking great idea -- let's do more of those!
+"""
+
+
+class FifoFile:
+ def __init__(self, filename, mode):
+ self.filename = filename
+ self.mode = mode
+ self.thread = None
+ self.file = None
+ self.logger = logging.getLogger(__name__)
+
+ def _open(self):
+ self.logger.info(f'Creating fifo file: {self.filename}')
+ end_time = time.time() + 2
+ while not os.path.exists(self.filename):
+ time.sleep(0.1)
+ if time.time() > end_time:
+ self.logger.error(f'Did not able create fifo file: {self.filename}')
+ return
+ self.file = open(self.filename, self.mode, buffering=0)
+ self.logger.info(f'File created: {self.filename}')
+
+ def open(self):
+ self.thread = threading.Thread(target=self._open(), daemon=True)
+ self.thread.start()
+
+ def write(self, data):
+ if self.file:
+ self.file.write(data)
+
+ def read(self):
+ if self.file:
+ return self.file.readline()
+
+ def close(self):
+ if self.file:
+ self.file.close()
+ self.thread.join(1)
+ self.logger.info(f'Closed file: {self.filename}')
+
+ def __enter__(self):
+ self.open()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+
+def main():
+ logging.basicConfig(level='DEBUG')
+ parser = ArgumentParser()
+ parser.add_argument('file')
+ args = parser.parse_args()
+ read_path = args.file + '.in'
+ write_path = args.file + '.out'
+ logger = logging.getLogger(__name__)
+ logger.info('Start')
+
+ with FifoFile(read_path, 'rb'), FifoFile(write_path, 'wb') as wf:
+ for line in content.split('\n'):
+ wf.write(f'{line}\n'.encode('utf-8'))
+ time.sleep(0.1)
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/data/mock_script.py b/scripts/pylib/pytest-twister-harness/tests/device/data/mock_script.py
new file mode 100755
index 0000000..631e93e
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/data/mock_script.py
@@ -0,0 +1,68 @@
+#!/usr/bin/python
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Simply mock for bash script to use with unit tests.
+"""
+import sys
+import time
+from argparse import ArgumentParser
+
+s = """
+The Zen of Python, by Tim Peters
+
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Although that way may not be obvious at first unless you're Dutch.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+Namespaces are one honking great idea -- let's do more of those!
+"""
+
+
+def main() -> int:
+ parser = ArgumentParser()
+ parser.add_argument('--sleep', action='store', default=0, type=float)
+ parser.add_argument('--long-sleep', action='store_true')
+ parser.add_argument('--return-code', action='store', default=0, type=int)
+ parser.add_argument('--exception', action='store_true')
+
+ args = parser.parse_args()
+
+ if args.exception:
+ # simulate crashing application
+ raise Exception
+
+ if args.long_sleep:
+ # prints data and wait for certain time
+ for line in s.split('\n'):
+ print(line, flush=True)
+ time.sleep(args.sleep)
+ else:
+ # prints lines with delay
+ for line in s.split('\n'):
+ print(line, flush=True)
+ time.sleep(args.sleep)
+
+ print('End of script', flush=True)
+ print('Returns with code', args.return_code, flush=True)
+ return args.return_code
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py
new file mode 100644
index 0000000..b7e40e9
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+from pathlib import Path
+from unittest import mock
+
+import pytest
+
+from twister_harness.device.hardware_adapter import HardwareAdapter
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+@pytest.fixture(name='device')
+def fixture_adapter() -> HardwareAdapter:
+ device_config = DeviceConfig(
+ runner='runner',
+ build_dir=Path('build'),
+ platform='platform',
+ id='p_id',
+ )
+ return HardwareAdapter(device_config)
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_1(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == ['west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'runner']
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_2(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'pyocd'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd', '--', '--board-id', 'p_id'
+ ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_raise_exception_if_west_is_not_installed(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = None
+ with pytest.raises(TwisterHarnessException, match='west not found'):
+ device.generate_command()
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_3(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'nrfjprog'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
+ ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_4(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'openocd'
+ device.device_config.product = 'STM32 STLink'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'openocd',
+ '--', '--cmd-pre-init', 'hla_serial p_id'
+ ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_5(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'openocd'
+ device.device_config.product = 'EDBG CMSIS-DAP'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'openocd',
+ '--', '--cmd-pre-init', 'cmsis_dap_serial p_id'
+ ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_6(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'jlink'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'jlink',
+ '--tool-opt=-SelectEmuBySN p_id'
+ ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_7(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'stm32cubeprogrammer'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'stm32cubeprogrammer',
+ '--tool-opt=sn=p_id'
+ ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_8(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.runner = 'openocd'
+ device.device_config.product = 'STLINK-V3'
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build',
+ '--runner', 'openocd', '--', '--cmd-pre-init', 'hla_serial p_id'
+ ]
+
+
+def test_if_hardware_adapter_raises_exception_empty_command(device: HardwareAdapter) -> None:
+ device.command = []
+ exception_msg = 'Flash command is empty, please verify if it was generated properly.'
+ with pytest.raises(TwisterHarnessException, match=exception_msg):
+ device.flash_and_run()
+
+
+def test_handler_and_device_log_correct_initialized_on_hardware(device: HardwareAdapter, tmp_path: Path) -> None:
+ device.device_config.build_dir = tmp_path
+ device.initialize_log_files()
+ assert isinstance(device.handler_log_file, HandlerLogFile)
+ assert isinstance(device.device_log_file, DeviceLogFile)
+ assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
+ assert device.device_log_file.filename.endswith('device.log') # type: ignore[union-attr]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
+def test_device_log_correct_error_handle(patched_popen, device: HardwareAdapter, tmp_path: Path) -> None:
+ popen_mock = mock.Mock()
+ popen_mock.communicate.return_value = (b'', b'flashing error')
+ patched_popen.return_value = popen_mock
+ device.device_config.build_dir = tmp_path
+ device.initialize_log_files()
+ device.command = [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', str(tmp_path),
+ '--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
+ ]
+ with pytest.raises(expected_exception=TwisterHarnessException, match='Could not flash device p_id'):
+ device.flash_and_run()
+ assert os.path.isfile(device.device_log_file.filename)
+ with open(device.device_log_file.filename, 'r') as file:
+ assert 'flashing error' in file.readlines()
+
+
+@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
+@mock.patch('twister_harness.device.hardware_adapter.serial.Serial')
+def test_if_hardware_adapter_uses_serial_pty(
+ patched_serial, patched_popen, device: HardwareAdapter, monkeypatch: pytest.MonkeyPatch
+):
+ device.device_config.serial_pty = 'script.py'
+
+ popen_mock = mock.Mock()
+ popen_mock.communicate.return_value = (b'output', b'error')
+ patched_popen.return_value = popen_mock
+
+ monkeypatch.setattr('twister_harness.device.hardware_adapter.pty.openpty', lambda: (123, 456))
+ monkeypatch.setattr('twister_harness.device.hardware_adapter.os.ttyname', lambda x: f'/pty/ttytest/{x}')
+
+ serial_mock = mock.Mock()
+ serial_mock.port = '/pty/ttytest/456'
+ patched_serial.return_value = serial_mock
+
+ device.connect()
+ assert device.connection.port == '/pty/ttytest/456' # type: ignore[union-attr]
+ assert device.serial_pty_proc
+ patched_popen.assert_called_with(
+ ['script.py'],
+ stdout=123,
+ stdin=123,
+ stderr=123
+ )
+
+ device.disconnect()
+ assert not device.connection
+ assert not device.serial_pty_proc
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_with_west_flash(patched_which, device: HardwareAdapter) -> None:
+ patched_which.return_value = 'west'
+ device.device_config.west_flash_extra_args = ['--board-id=foobar', '--erase']
+ device.device_config.runner = 'pyocd'
+ device.device_config.id = ''
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [
+ 'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd',
+ '--', '--board-id=foobar', '--erase'
+ ]
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py
new file mode 100755
index 0000000..9b47ce3
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+import subprocess
+from typing import Generator
+from unittest import mock
+from unittest.mock import patch
+
+import pytest
+
+from twister_harness.device.qemu_adapter import QemuAdapter
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+@pytest.fixture(name='device')
+def fixture_device_adapter(tmp_path) -> Generator[QemuAdapter, None, None]:
+ build_dir = tmp_path / 'build_dir'
+ adapter = QemuAdapter(DeviceConfig(build_dir=build_dir))
+ yield adapter
+ try:
+ adapter.stop() # to make sure all running processes are closed
+ except TwisterHarnessException:
+ pass
+
+
+@patch('shutil.which', return_value='/usr/bin/west')
+def test_if_generate_command_creates_proper_command(patched_which):
+ adapter = QemuAdapter(DeviceConfig(build_dir='build_dir'))
+ adapter.generate_command()
+ assert adapter.command == ['/usr/bin/west', 'build', '-d', 'build_dir', '-t', 'run']
+
+
+@patch('shutil.which', return_value=None)
+def test_if_generate_command_creates_empty_listy_if_west_is_not_installed(patched_which):
+ adapter = QemuAdapter(DeviceConfig())
+ adapter.generate_command()
+ assert adapter.command == []
+
+
+def test_if_qemu_adapter_raises_exception_for_empty_command(device) -> None:
+ device.command = []
+ exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
+ with pytest.raises(TwisterHarnessException, match=exception_msg):
+ device.flash_and_run(timeout=0.1)
+
+
+def test_if_qemu_adapter_raises_exception_file_not_found(device) -> None:
+ device.command = ['dummy']
+ with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
+ device.flash_and_run(timeout=0.1)
+ device.stop()
+ assert device._exc is not None
+ assert isinstance(device._exc, TwisterHarnessException)
+
+
+@mock.patch('subprocess.Popen', side_effect=subprocess.SubprocessError(1, 'Exception message'))
+def test_if_qemu_adapter_raises_exception_when_subprocess_raised_an_error(patched_run, device):
+ device.command = ['echo', 'TEST']
+ with pytest.raises(TwisterHarnessException, match='Exception message'):
+ device.flash_and_run(timeout=0.1)
+ device.stop()
+
+
+def test_if_qemu_adapter_runs_without_errors(resources, tmp_path) -> None:
+ fifo_file_path = str(tmp_path / 'qemu-fifo')
+ script_path = resources.joinpath('fifo_mock.py')
+ device = QemuAdapter(DeviceConfig(build_dir=str(tmp_path)))
+ device.booting_timeout_in_ms = 1000
+ device.command = ['python', str(script_path), fifo_file_path]
+ device.connect()
+ device.initialize_log_files()
+ device.flash_and_run(timeout=1)
+ lines = list(device.iter_stdout)
+ assert 'Readability counts.' in lines
+ assert os.path.isfile(device.handler_log_file.filename)
+ with open(device.handler_log_file.filename, 'r') as file:
+ file_lines = [line.strip() for line in file.readlines()]
+ assert file_lines[-2:] == lines[-2:]
+ device.disconnect()
+
+
+def test_if_qemu_adapter_finishes_after_timeout(device) -> None:
+ device.command = ['sleep', '0.3']
+ device.flash_and_run(timeout=0.1)
+ device.stop()
+ assert device._process_ended_with_timeout is True
+
+
+def test_handler_and_device_log_correct_initialized_on_qemu(device, tmp_path) -> None:
+ device.device_config.build_dir = tmp_path
+ device.initialize_log_files()
+ assert isinstance(device.handler_log_file, HandlerLogFile)
+ assert isinstance(device.device_log_file, NullLogFile)
+ assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/simulator_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/simulator_adapter_test.py
new file mode 100755
index 0000000..c7e4003
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/simulator_adapter_test.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+import subprocess
+from pathlib import Path
+from unittest import mock
+
+import pytest
+
+from twister_harness.device.simulator_adapter import (
+ CustomSimulatorAdapter,
+ NativeSimulatorAdapter,
+ UnitSimulatorAdapter,
+)
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+@pytest.fixture(name='device')
+def fixture_adapter(tmp_path) -> NativeSimulatorAdapter:
+ return NativeSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
+
+
+def test_if_native_simulator_adapter_get_command_returns_proper_string(
+ device: NativeSimulatorAdapter, resources: Path
+) -> None:
+ device.device_config.build_dir = resources
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [str(resources.joinpath('zephyr', 'zephyr.exe'))]
+
+
+def test_if_native_simulator_adapter_runs_without_errors(
+ resources: Path, device: NativeSimulatorAdapter
+) -> None:
+ """
+ Run script which prints text line by line and ends without errors.
+ Verify if subprocess was ended without errors, and without timeout.
+ """
+ script_path = resources.joinpath('mock_script.py')
+ # patching original command by mock_script.py to simulate same behaviour as zephyr.exe
+ device.command = ['python3', str(script_path)]
+ device.initialize_log_files()
+ device.flash_and_run(timeout=4)
+ lines = list(device.iter_stdout) # give it time before close thread
+ device.stop()
+ assert device._process_ended_with_timeout is False
+ assert 'Readability counts.' in lines
+ assert os.path.isfile(device.handler_log_file.filename)
+ with open(device.handler_log_file.filename, 'r') as file:
+ file_lines = [line.strip() for line in file.readlines()]
+ assert file_lines[-2:] == lines[-2:]
+
+
+def test_if_native_simulator_adapter_finishes_after_timeout_while_there_is_no_data_from_subprocess(
+ resources: Path, device: NativeSimulatorAdapter
+) -> None:
+ """Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
+ script_path = resources.joinpath('mock_script.py')
+ device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
+ device.initialize_log_files()
+ device.flash_and_run(timeout=0.5)
+ lines = list(device.iter_stdout)
+ device.stop()
+ assert device._process_ended_with_timeout is True
+ assert device._exc is None
+ # this message should not be printed because script has been terminated due to timeout
+ assert 'End of script' not in lines, 'Script has not been terminated before end'
+
+
+def test_if_native_simulator_adapter_raises_exception_file_not_found(device: NativeSimulatorAdapter) -> None:
+ device.command = ['dummy']
+ with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
+ device.flash_and_run(timeout=0.1)
+ device.stop()
+ assert device._exc is not None
+ assert isinstance(device._exc, TwisterHarnessException)
+
+
+def test_if_simulator_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
+ device.command = []
+ exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
+ with pytest.raises(TwisterHarnessException, match=exception_msg):
+ device.flash_and_run(timeout=0.1)
+
+
+def test_handler_and_device_log_correct_initialized_on_simulators(device: NativeSimulatorAdapter) -> None:
+ device.initialize_log_files()
+ assert isinstance(device.handler_log_file, HandlerLogFile)
+ assert isinstance(device.device_log_file, NullLogFile)
+ assert device.handler_log_file.filename.endswith('handler.log') # type: ignore[union-attr]
+
+
+@mock.patch('asyncio.run', side_effect=subprocess.SubprocessError(1, 'Exception message'))
+def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess_error(
+ patched_run, device: NativeSimulatorAdapter
+):
+ device.command = ['echo', 'TEST']
+ with pytest.raises(TwisterHarnessException, match='Exception message'):
+ device.flash_and_run(timeout=0.1)
+ device.stop()
+
+
+@mock.patch('asyncio.run', side_effect=Exception(1, 'Raised other exception'))
+def test_if_simulator_adapter_raises_exception_when_subprocess_raised_an_error(
+ patched_run, device: NativeSimulatorAdapter
+):
+ device.command = ['echo', 'TEST']
+ with pytest.raises(TwisterHarnessException, match='Raised other exception'):
+ device.flash_and_run(timeout=0.1)
+ device.stop()
+
+
+@mock.patch('shutil.which', return_value='west')
+def test_if_custom_simulator_adapter_get_command_returns_proper_string(patched_which) -> None:
+ device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == ['west', 'build', '-d', 'build_dir', '-t', 'run']
+
+
+@mock.patch('shutil.which', return_value=None)
+def test_if_custom_simulator_adapter_get_command_returns_empty_string(patched_which) -> None:
+ device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == []
+
+
+def test_if_unit_simulator_adapter_get_command_returns_proper_string(resources: Path) -> None:
+ device = UnitSimulatorAdapter(DeviceConfig(build_dir=resources))
+ device.generate_command()
+ assert isinstance(device.command, list)
+ assert device.command == [str(resources.joinpath('testbinary'))]
diff --git a/scripts/pylib/pytest-twister-harness/tests/log_file/log_file_test.py b/scripts/pylib/pytest-twister-harness/tests/log_file/log_file_test.py
new file mode 100755
index 0000000..e2c82f1
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/log_file/log_file_test.py
@@ -0,0 +1,54 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import os
+
+import pytest
+
+from twister_harness.log_files.log_file import LogFile
+
+logger = logging.getLogger(__name__)
+
+
+@pytest.fixture
+def sample_log_file(tmpdir):
+ log_file = LogFile.create(build_dir=tmpdir)
+ yield log_file
+
+
+def test_if_filename_is_correct(sample_log_file: LogFile):
+ assert sample_log_file.filename.endswith('uninitialized.log') # type: ignore[union-attr]
+
+
+def test_handle_data_is_str(sample_log_file: LogFile):
+ msg = 'str message'
+ sample_log_file.handle(data=msg)
+ assert os.path.exists(path=sample_log_file.filename)
+ with open(file=sample_log_file.filename, mode='r') as file:
+ assert file.readline() == 'str message'
+
+
+def test_handle_data_is_byte(sample_log_file: LogFile):
+ msg = b'bytes message'
+ sample_log_file.handle(data=msg)
+ assert os.path.exists(path=sample_log_file.filename)
+ with open(file=sample_log_file.filename, mode='r') as file:
+ assert file.readline() == 'bytes message'
+
+
+def test_handle_data_is_empty(sample_log_file: LogFile):
+ msg = ''
+ sample_log_file.handle(data=msg)
+ assert not os.path.exists(path=sample_log_file.filename)
+
+
+def test_get_log_filename_null_filename():
+ log_file = LogFile.create()
+ assert log_file.filename == os.devnull
+
+
+def test_get_log_filename_sample_filename(tmpdir):
+ log_file = LogFile.create(build_dir=tmpdir)
+ assert log_file.filename == os.path.join(tmpdir, 'uninitialized.log')