scripts: add pytest plugin

Adding pytest plugin dedicated to running pytest tests in Zephyr
project. This plugin provides a dut fixture which allows to handle
bidirectional communication with the device under test. This version
of plugin can be used for tests dedicated to real hardware, QEMU and
native_posix simulator.

Co-authored-by: Lukasz Fundakowski <lukasz.fundakowski@nordicsemi.no>
Co-authored-by: Grzegorz Chwierut <grzegorz.chwierut@nordicsemi.no>
Co-authored-by: Katarzyna Giadla <katarzyna.giadla@nordicsemi.no>
Signed-off-by: Piotr Golyzniak <piotr.golyzniak@nordicsemi.no>
diff --git a/scripts/pylib/pytest-twister-harness/.gitignore b/scripts/pylib/pytest-twister-harness/.gitignore
new file mode 100644
index 0000000..3372e28
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/.gitignore
@@ -0,0 +1,62 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+pip-wheel-metadata/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# Pycharm
+.idea/
+
+# VSCode
+.vscode/
diff --git a/scripts/pylib/pytest-twister-harness/README.rst b/scripts/pylib/pytest-twister-harness/README.rst
new file mode 100644
index 0000000..814282f
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/README.rst
@@ -0,0 +1,47 @@
+==============
+Pytest Twister harness
+==============
+
+Installation
+------------
+
+If you plan to use this plugin with Twister, then you don't need to install it
+separately by pip. When Twister uses this plugin for pytest tests, it updates
+`PYTHONPATH` variable, and then extends pytest command by
+`-p twister_harness.plugin` argument.
+
+
+Usage
+-----
+
+Run exemplary test shell application by Twister:
+
+.. code-block:: sh
+
+  cd ${ZEPHYR_BASE}
+
+  # native_posix & QEMU
+  ./scripts/twister -p native_posix -p qemu_x86 -T samples/subsys/testsuite/pytest/shell
+
+  # hardware
+  ./scripts/twister -p nrf52840dk_nrf52840 --device-testing --device-serial /dev/ttyACM0 -T samples/subsys/testsuite/pytest/shell
+
+or build shell application by west and call pytest directly:
+
+.. code-block:: sh
+
+  export PYTHONPATH=${ZEPHYR_BASE}/scripts/pylib/pytest-twister-harness/src:${PYTHONPATH}
+
+  cd ${ZEPHYR_BASE}/samples/subsys/testsuite/pytest/shell
+
+  # native_posix
+  west build -p -b native_posix -- -DCONFIG_NATIVE_UART_0_ON_STDINOUT=y
+  pytest --twister-harness --device-type=native --build-dir=build -p twister_harness.plugin
+
+  # QEMU
+  west build -p -b qemu_x86 -- -DQEMU_PIPE=qemu-fifo
+  pytest --twister-harness --device-type=qemu --build-dir=build -p twister_harness.plugin
+
+  # hardware
+  west build -p -b nrf52840dk_nrf52840
+  pytest --twister-harness --device-type=hardware --device-serial=/dev/ttyACM0 --build-dir=build -p twister_harness.plugin
diff --git a/scripts/pylib/pytest-twister-harness/pyproject.toml b/scripts/pylib/pytest-twister-harness/pyproject.toml
new file mode 100644
index 0000000..70119a1
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/pyproject.toml
@@ -0,0 +1,6 @@
+[build-system]
+build-backend = "setuptools.build_meta"
+requires = [
+    "setuptools >= 48.0.0",
+    "wheel",
+]
diff --git a/scripts/pylib/pytest-twister-harness/setup.cfg b/scripts/pylib/pytest-twister-harness/setup.cfg
new file mode 100644
index 0000000..35a8d93
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/setup.cfg
@@ -0,0 +1,33 @@
+[metadata]
+name = pytest-twister-harness
+version = attr: twister_harness.__version__
+description = Plugin for pytest to run tests which require interaction with real and simulated devices
+long_description = file: README.rst
+python_requires = ~=3.8
+classifiers =
+    Development Status :: 3 - Alpha
+    Intended Audience :: Developers
+    Topic :: Software Development :: Embedded Systems
+    Topic :: Software Development :: Quality Assurance
+    Operating System :: Posix :: Linux
+    Operating System :: Microsoft :: Windows
+    Programming Language :: Python :: 3.8
+    Programming Language :: Python :: 3.9
+    Programming Language :: Python :: 3.10
+    Programming Language :: Python :: 3.11
+
+[options]
+packages = find:
+package_dir =
+    =src
+install_requires =
+    psutil
+    pyserial
+    pytest>=7.0.0
+
+[options.packages.find]
+where = src
+
+[options.entry_points]
+pytest11 =
+    twister_harness = twister_harness.plugin
diff --git a/scripts/pylib/pytest-twister-harness/setup.py b/scripts/pylib/pytest-twister-harness/setup.py
new file mode 100644
index 0000000..fa79863
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/setup.py
@@ -0,0 +1,7 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import setuptools
+
+setuptools.setup()
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py
new file mode 100644
index 0000000..356fb95
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/__init__.py
@@ -0,0 +1,5 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+__version__ = '0.0.1'
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py
new file mode 100644
index 0000000..e1e44b3
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/constants.py
@@ -0,0 +1,8 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+QEMU_FIFO_FILE_NAME: str = 'qemu-fifo'
+END_OF_DATA = object()  #: used for indicating that there will be no more data in queue
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py
new file mode 100644
index 0000000..235c666
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py
new file mode 100644
index 0000000..13db01d
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_abstract.py
@@ -0,0 +1,94 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import abc
+import logging
+import os
+from typing import Generator
+
+from twister_harness.log_files.log_file import LogFile, NullLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceAbstract(abc.ABC):
+    """Class defines an interface for all devices."""
+
+    def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+        """
+        :param device_config: device configuration
+        """
+        self.device_config: DeviceConfig = device_config
+        self.handler_log_file: LogFile = NullLogFile.create()
+        self.device_log_file: LogFile = NullLogFile.create()
+
+    def __repr__(self) -> str:
+        return f'{self.__class__.__name__}()'
+
+    @property
+    def env(self) -> dict[str, str]:
+        env = os.environ.copy()
+        return env
+
+    @abc.abstractmethod
+    def connect(self, timeout: float = 1) -> None:
+        """Connect with the device (e.g. via UART)"""
+
+    @abc.abstractmethod
+    def disconnect(self) -> None:
+        """Close a connection with the device"""
+
+    @abc.abstractmethod
+    def generate_command(self) -> None:
+        """
+        Generate command which will be used during flashing or running device.
+        """
+
+    def flash_and_run(self, timeout: float = 60.0) -> None:
+        """
+        Flash and run application on a device.
+
+        :param timeout: time out in seconds
+        """
+
+    @property
+    @abc.abstractmethod
+    def iter_stdout(self) -> Generator[str, None, None]:
+        """Iterate stdout from a device."""
+
+    @abc.abstractmethod
+    def write(self, data: bytes) -> None:
+        """Write data bytes to device"""
+
+    @abc.abstractmethod
+    def initialize_log_files(self):
+        """
+        Initialize file to store logs.
+        """
+
+    def stop(self) -> None:
+        """Stop device."""
+
+    # @abc.abstractmethod
+    # def read(self, size=1) -> None:
+    #     """Read size bytes from device"""
+
+    # def read_until(self, expected, size=None):
+    #     """Read until an expected bytes sequence is found"""
+    #     lenterm = len(expected)
+    #     line = bytearray()
+    #     while True:
+    #         c = self.read(1)
+    #         if c:
+    #             line += c
+    #             if line[-lenterm:] == expected:
+    #                 break
+    #             if size is not None and len(line) >= size:
+    #                 break
+    #         else:
+    #             break
+    #     return bytes(line)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py
new file mode 100644
index 0000000..542b161
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/factory.py
@@ -0,0 +1,49 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+from typing import Type
+
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.device.hardware_adapter import HardwareAdapter
+from twister_harness.device.qemu_adapter import QemuAdapter
+from twister_harness.device.simulator_adapter import (
+    CustomSimulatorAdapter,
+    NativeSimulatorAdapter,
+    UnitSimulatorAdapter,
+)
+from twister_harness.exceptions import TwisterHarnessException
+
+logger = logging.getLogger(__name__)
+
+
+class DeviceFactory:
+    _devices: dict[str, Type[DeviceAbstract]] = {}
+
+    @classmethod
+    def discover(cls):
+        """Return available devices."""
+
+    @classmethod
+    def register_device_class(cls, name: str, klass: Type[DeviceAbstract]):
+        if name not in cls._devices:
+            cls._devices[name] = klass
+
+    @classmethod
+    def get_device(cls, name: str) -> Type[DeviceAbstract]:
+        logger.debug('Get device type "%s"', name)
+        try:
+            return cls._devices[name]
+        except KeyError as e:
+            logger.error('There is no device with name "%s"', name)
+            raise TwisterHarnessException(f'There is no device with name "{name}"') from e
+
+
+DeviceFactory.register_device_class('custom', CustomSimulatorAdapter)
+DeviceFactory.register_device_class('native', NativeSimulatorAdapter)
+DeviceFactory.register_device_class('unit', UnitSimulatorAdapter)
+DeviceFactory.register_device_class('hardware', HardwareAdapter)
+DeviceFactory.register_device_class('qemu', QemuAdapter)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py
new file mode 100755
index 0000000..c6bda2f
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/fifo_handler.py
@@ -0,0 +1,91 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import io
+import logging
+import os
+import threading
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+
+class FifoHandler:
+    """Creates FIFO file for reading and writing."""
+
+    def __init__(self, fifo: str | Path):
+        """
+        :param fifo: path to fifo file
+        """
+        self._fifo_in = str(fifo) + '.in'
+        self._fifo_out = str(fifo) + '.out'
+        self.file_in: io.BytesIO | None = None
+        self.file_out: io.BytesIO | None = None
+        self._threads: list[threading.Thread] = []
+
+    @staticmethod
+    def _make_fifo_file(filename: str) -> None:
+        if os.path.exists(filename):
+            os.unlink(filename)
+        os.mkfifo(filename)
+        logger.debug('Created new fifo file: %s', filename)
+
+    @property
+    def is_open(self) -> bool:
+        try:
+            return bool(
+                self.file_in is not None and self.file_out is not None
+                and self.file_in.fileno() and self.file_out.fileno()
+            )
+        except ValueError:
+            return False
+
+    def connect(self):
+        self._make_fifo_file(self._fifo_in)
+        self._make_fifo_file(self._fifo_out)
+        self._threads = [
+            threading.Thread(target=self._open_fifo_in, daemon=True),
+            threading.Thread(target=self._open_fifo_out, daemon=True)
+        ]
+        for t in self._threads:
+            t.start()
+
+    def _open_fifo_in(self):
+        self.file_in = open(self._fifo_in, 'wb', buffering=0)
+
+    def _open_fifo_out(self):
+        self.file_out = open(self._fifo_out, 'rb', buffering=0)
+
+    def disconnect(self):
+        if self.file_in is not None:
+            self.file_in.close()
+        if self.file_out is not None:
+            self.file_out.close()
+        for t in self._threads:
+            t.join(timeout=1)
+        logger.debug(f'Unlink {self._fifo_in}')
+        os.unlink(self._fifo_in)
+        logger.debug(f'Unlink {self._fifo_out}')
+        os.unlink(self._fifo_out)
+
+    def read(self, __size: int | None = None) -> bytes:
+        return self.file_out.read(__size)  # type: ignore[union-attr]
+
+    def readline(self, __size: int | None = None) -> bytes:
+        line = self.file_out.readline(__size)  # type: ignore[union-attr]
+        return line
+
+    def write(self, __buffer: bytes) -> int:
+        return self.file_in.write(__buffer)  # type: ignore[union-attr]
+
+    def flush(self):
+        if self.file_in:
+            self.file_in.flush()
+        if self.file_out:
+            self.file_out.flush()
+
+    def fileno(self) -> int:
+        return self.file_out.fileno()  # type: ignore[union-attr]
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py
new file mode 100644
index 0000000..e0d34f9
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py
@@ -0,0 +1,226 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+import pty
+import re
+import shutil
+import subprocess
+from datetime import datetime
+from typing import Generator
+
+import serial
+
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.helper import log_command
+from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+logger = logging.getLogger(__name__)
+
+
+class HardwareAdapter(DeviceAbstract):
+    """Adapter class for real device."""
+
+    def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+        super().__init__(device_config, **kwargs)
+        self.connection: serial.Serial | None = None
+        self.command: list[str] = []
+        self.process_kwargs: dict = {
+            'stdout': subprocess.PIPE,
+            'stderr': subprocess.STDOUT,
+            'env': self.env,
+        }
+        self.serial_pty_proc: subprocess.Popen | None = None
+
+    def connect(self, timeout: float = 1) -> None:
+        """
+        Open serial connection.
+
+        :param timeout: Read timeout value in seconds
+        """
+        if self.connection:
+            # already opened
+            return
+
+        if self.device_config.pre_script:
+            self.run_custom_script(self.device_config.pre_script, 30)
+
+        serial_name = self._open_serial_pty() or self.device_config.serial
+        logger.info('Opening serial connection for %s', serial_name)
+        try:
+            self.connection = serial.Serial(
+                serial_name,
+                baudrate=self.device_config.baud,
+                parity=serial.PARITY_NONE,
+                stopbits=serial.STOPBITS_ONE,
+                bytesize=serial.EIGHTBITS,
+                timeout=timeout
+            )
+        except serial.SerialException as e:
+            logger.exception('Cannot open connection: %s', e)
+            self._close_serial_pty()
+            raise
+
+        self.connection.flush()
+
+    def disconnect(self) -> None:
+        """Close serial connection."""
+        if self.connection:
+            serial_name = self.connection.port
+            self.connection.close()
+            self.connection = None
+            logger.info('Closed serial connection for %s', serial_name)
+        self._close_serial_pty()
+
+    def stop(self) -> None:
+        if self.device_config.post_script:
+            self.run_custom_script(self.device_config.post_script, 30)
+
+    def _open_serial_pty(self) -> str | None:
+        """Open a pty pair, run process and return tty name"""
+        if not self.device_config.serial_pty:
+            return None
+        master, slave = pty.openpty()
+        try:
+            self.serial_pty_proc = subprocess.Popen(
+                re.split(',| ', self.device_config.serial_pty),
+                stdout=master,
+                stdin=master,
+                stderr=master
+            )
+        except subprocess.CalledProcessError as e:
+            logger.exception('Failed to run subprocess %s, error %s',
+                             self.device_config.serial_pty, str(e))
+            raise
+        return os.ttyname(slave)
+
+    def _close_serial_pty(self) -> None:
+        """Terminate the process opened for serial pty script"""
+        if self.serial_pty_proc:
+            self.serial_pty_proc.terminate()
+            self.serial_pty_proc.communicate()
+            logger.info('Process %s terminated', self.device_config.serial_pty)
+            self.serial_pty_proc = None
+
+    def generate_command(self) -> None:
+        """Return command to flash."""
+        west = shutil.which('west')
+        if west is None:
+            raise TwisterHarnessException('west not found')
+
+        command = [
+            west,
+            'flash',
+            '--skip-rebuild',
+            '--build-dir', str(self.device_config.build_dir),
+        ]
+
+        command_extra_args = []
+        if self.device_config.west_flash_extra_args:
+            command_extra_args.extend(self.device_config.west_flash_extra_args)
+
+        if runner := self.device_config.runner:
+            command.extend(['--runner', runner])
+
+            if board_id := self.device_config.id:
+                if runner == 'pyocd':
+                    command_extra_args.append('--board-id')
+                    command_extra_args.append(board_id)
+                elif runner == 'nrfjprog':
+                    command_extra_args.append('--dev-id')
+                    command_extra_args.append(board_id)
+                elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']:
+                    command_extra_args.append('--cmd-pre-init')
+                    command_extra_args.append(f'hla_serial {board_id}')
+                elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP':
+                    command_extra_args.append('--cmd-pre-init')
+                    command_extra_args.append(f'cmsis_dap_serial {board_id}')
+                elif runner == 'jlink':
+                    command.append(f'--tool-opt=-SelectEmuBySN {board_id}')
+                elif runner == 'stm32cubeprogrammer':
+                    command.append(f'--tool-opt=sn={board_id}')
+
+        if command_extra_args:
+            command.append('--')
+            command.extend(command_extra_args)
+        self.command = command
+
+    @staticmethod
+    def run_custom_script(script, timeout):
+        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
+            try:
+                stdout, stderr = proc.communicate(timeout=timeout)
+                logger.debug(stdout.decode())
+                if proc.returncode != 0:
+                    logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
+
+            except subprocess.TimeoutExpired:
+                proc.kill()
+                proc.communicate()
+                logger.error("{} timed out".format(script))
+
+    def flash_and_run(self, timeout: float = 60.0) -> None:
+        if not self.command:
+            msg = 'Flash command is empty, please verify if it was generated properly.'
+            logger.error(msg)
+            raise TwisterHarnessException(msg)
+        if self.device_config.id:
+            logger.info('Flashing device %s', self.device_config.id)
+        log_command(logger, 'Flashing command', self.command, level=logging.INFO)
+        try:
+            process = subprocess.Popen(
+                self.command,
+                **self.process_kwargs
+            )
+        except subprocess.CalledProcessError:
+            logger.error('Error while flashing device')
+            raise TwisterHarnessException('Could not flash device')
+        else:
+            stdout = stderr = None
+            try:
+                stdout, stderr = process.communicate(timeout=self.device_config.flashing_timeout)
+            except subprocess.TimeoutExpired:
+                process.kill()
+            finally:
+                if stdout:
+                    self.device_log_file.handle(data=stdout)
+                    logger.debug(stdout.decode(errors='ignore'))
+                if stderr:
+                    self.device_log_file.handle(data=stderr)
+            if process.returncode == 0:
+                logger.info('Flashing finished')
+            else:
+                raise TwisterHarnessException(f'Could not flash device {self.device_config.id}')
+        finally:
+            if self.device_config.post_flash_script:
+                self.run_custom_script(self.device_config.post_flash_script, 30)
+
+    @property
+    def iter_stdout(self) -> Generator[str, None, None]:
+        """Return output from serial."""
+        if not self.connection:
+            return
+        self.connection.flush()
+        self.connection.reset_input_buffer()
+        while self.connection and self.connection.is_open:
+            stream = self.connection.readline()
+            self.handler_log_file.handle(data=stream)
+            yield stream.decode(errors='ignore').strip()
+
+    def write(self, data: bytes) -> None:
+        """Write data to serial"""
+        if self.connection:
+            self.connection.write(data)
+
+    def initialize_log_files(self) -> None:
+        self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
+        self.device_log_file = DeviceLogFile.create(build_dir=self.device_config.build_dir)
+        start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
+        self.handler_log_file.handle(start_msg)
+        self.device_log_file.handle(start_msg)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py
new file mode 100755
index 0000000..4c649b9
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/qemu_adapter.py
@@ -0,0 +1,196 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+import shutil
+import signal
+import subprocess
+import threading
+import time
+from datetime import datetime
+from pathlib import Path
+from queue import Empty, Queue
+from typing import Generator
+
+import psutil
+
+from twister_harness.constants import QEMU_FIFO_FILE_NAME
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.device.fifo_handler import FifoHandler
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.helper import log_command
+from twister_harness.log_files.log_file import HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+logger = logging.getLogger(__name__)
+
+
+class QemuAdapter(DeviceAbstract):
+    """Adapter for Qemu simulator"""
+
+    def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+        super().__init__(device_config, **kwargs)
+        self._process: subprocess.Popen | None = None
+        self._process_ended_with_timeout: bool = False
+        self._exc: Exception | None = None  #: store any exception which appeared running this thread
+        self._thread: threading.Thread | None = None
+        self._emulation_was_finished: bool = False
+        self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME))
+        self.command: list[str] = []
+        self.timeout: float = 60  # running timeout in seconds
+        self.booting_timeout_in_ms: int = 10_000  #: wait time for booting Qemu in milliseconds
+
+    def generate_command(self) -> None:
+        """Return command to flash."""
+        if (west := shutil.which('west')) is None:
+            logger.error('west not found')
+            self.command = []
+        else:
+            self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
+
+    def connect(self, timeout: float = 1) -> None:
+        logger.debug('Opening connection')
+        self.connection.connect()
+
+    def flash_and_run(self, timeout: float = 60.0) -> None:
+        self.timeout = timeout
+        if not self.command:
+            msg = 'Run simulation command is empty, please verify if it was generated properly.'
+            logger.error(msg)
+            raise TwisterHarnessException(msg)
+
+        self._thread = threading.Thread(target=self._run_command, args=(self.timeout,), daemon=True)
+        self._thread.start()
+        # Give a time to start subprocess before test is executed
+        time.sleep(0.1)
+        # Check if subprocess (simulation) has started without errors
+        if self._exc is not None:
+            logger.error('Simulation failed due to an exception: %s', self._exc)
+            raise self._exc
+
+    def _run_command(self, timeout: float) -> None:
+        log_command(logger, 'Running command', self.command, level=logging.INFO)
+        try:
+            self._process = subprocess.Popen(
+                self.command,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                env=self.env
+            )
+            stdout, _ = self._process.communicate(timeout=timeout)
+            return_code: int = self._process.returncode
+        except subprocess.TimeoutExpired:
+            logger.error('Running simulation finished after timeout: %s seconds', timeout)
+            self._process_ended_with_timeout = True
+            # we don't want to raise Timeout exception, but allowed a test to parse the output
+            # and set proper status
+        except subprocess.SubprocessError as e:
+            logger.error('Running simulation failed due to subprocess error %s', e)
+            self._exc = TwisterHarnessException(e.args)
+        except FileNotFoundError as e:
+            logger.error(f'Running simulation failed due to file not found: {e.filename}')
+            self._exc = TwisterHarnessException(f'File not found: {e.filename}')
+        except Exception as e:
+            logger.error('Running simulation failed: %s', e)
+            self._exc = TwisterHarnessException(e.args)
+        else:
+            if return_code == 0:
+                logger.info('Running simulation finished with return code %s', return_code)
+            elif return_code == -15:
+                logger.info('Running simulation terminated')
+            else:
+                logger.warning('Running simulation finished with return code %s', return_code)
+                for line in stdout.decode('utf-8').split('\n'):
+                    logger.info(line)
+        finally:
+            self._emulation_was_finished = True
+
+    def disconnect(self):
+        logger.debug('Closing connection')
+        self.connection.disconnect()
+
+    def stop(self) -> None:
+        """Stop device."""
+        time.sleep(0.1)  # give a time to end while loop in running simulation
+        if self._process is not None and self._process.returncode is None:
+            logger.debug('Stopping all running processes for PID %s', self._process.pid)
+            # kill all child subprocesses
+            for child in psutil.Process(self._process.pid).children(recursive=True):
+                try:
+                    os.kill(child.pid, signal.SIGTERM)
+                except ProcessLookupError:
+                    pass
+            # kill subprocess if it is still running
+            os.kill(self._process.pid, signal.SIGTERM)
+        if self._thread is not None:
+            self._thread.join(timeout=1)  # Should end immediately, but just in case we set timeout for 1 sec
+        if self._exc:
+            raise self._exc
+
+    def _wait_for_fifo(self):
+        for _ in range(int(self.booting_timeout_in_ms / 10) or 1):
+            if self.connection.is_open:
+                break
+            elif self._emulation_was_finished:
+                msg = 'Problem with starting QEMU'
+                logger.error(msg)
+                raise TwisterHarnessException(msg)
+            time.sleep(0.1)
+        else:
+            msg = 'Problem with starting QEMU - fifo file was not created yet'
+            logger.error(msg)
+            raise TwisterHarnessException(msg)
+
+    @property
+    def iter_stdout(self) -> Generator[str, None, None]:
+        if not self.connection:
+            return
+        # fifo file can be not create yet, so we need to wait for a while
+        self._wait_for_fifo()
+
+        # create unblocking reading from fifo file
+        q: Queue = Queue()
+
+        def read_lines():
+            while self.connection and self.connection.is_open:
+                try:
+                    line = self.connection.readline().decode('UTF-8').strip()
+                except (OSError, ValueError):
+                    # file could be closed already so we should stop reading
+                    break
+                if len(line) != 0:
+                    q.put(line)
+
+        t = threading.Thread(target=read_lines, daemon=True)
+        t.start()
+
+        end_time = time.time() + self.timeout
+        try:
+            while True:
+                try:
+                    stream = q.get(timeout=0.1)
+                    self.handler_log_file.handle(data=stream + '\n')
+                    yield stream
+                except Empty:  # timeout appeared
+                    pass
+                if time.time() > end_time:
+                    break
+        except KeyboardInterrupt:
+            # let thread to finish smoothly
+            pass
+        finally:
+            t.join(1)
+
+    def write(self, data: bytes) -> None:
+        """Write data to serial"""
+        if self.connection:
+            self.connection.write(data)
+
+    def initialize_log_files(self):
+        self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
+        start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
+        self.handler_log_file.handle(start_msg)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py
new file mode 100755
index 0000000..558e206
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/device/simulator_adapter.py
@@ -0,0 +1,220 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import abc
+import asyncio
+import asyncio.subprocess
+import logging
+import os
+import shutil
+import signal
+import subprocess
+import threading
+import time
+from asyncio.base_subprocess import BaseSubprocessTransport
+from datetime import datetime
+from functools import wraps
+from pathlib import Path
+from queue import Queue
+from typing import Generator
+
+import psutil
+
+from twister_harness.constants import END_OF_DATA
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.helper import log_command
+from twister_harness.log_files.log_file import HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+# Workaround for RuntimeError: Event loop is closed
+# https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/
+def silence_event_loop_closed(func):
+    @wraps(func)
+    def wrapper(self, *args, **kwargs):
+        try:
+            return func(self, *args, **kwargs)
+        except RuntimeError as e:
+            if str(e) != 'Event loop is closed':
+                raise
+
+    return wrapper
+
+
+BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__)  # type: ignore
+
+logger = logging.getLogger(__name__)
+
+
+class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
+
+    def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
+        """
+        :param twister_config: twister configuration
+        """
+        super().__init__(device_config, **kwargs)
+        self._process: asyncio.subprocess.Process | None = None
+        self._process_ended_with_timeout: bool = False
+        self.queue: Queue = Queue()
+        self._stop_job: bool = False
+        self._exc: Exception | None = None  #: store any exception which appeared running this thread
+        self._thread: threading.Thread | None = None
+        self.command: list[str] = []
+        self.process_kwargs: dict = {
+            'stdout': asyncio.subprocess.PIPE,
+            'stderr': asyncio.subprocess.STDOUT,
+            'stdin': asyncio.subprocess.PIPE,
+            'env': self.env,
+        }
+        self._data_to_send: bytes | None = None
+
+    def connect(self, timeout: float = 1) -> None:
+        pass  # pragma: no cover
+
+    def flash_and_run(self, timeout: float = 60.0) -> None:
+        if not self.command:
+            msg = 'Run simulation command is empty, please verify if it was generated properly.'
+            logger.error(msg)
+            raise TwisterHarnessException(msg)
+        self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True)
+        self._thread.start()
+        # Give a time to start subprocess before test is executed
+        time.sleep(0.1)
+        # Check if subprocess (simulation) has started without errors
+        if self._exc is not None:
+            logger.error('Simulation failed due to an exception: %s', self._exc)
+            raise self._exc
+
+    def _run_simulation(self, timeout: float) -> None:
+        log_command(logger, 'Running command', self.command, level=logging.INFO)
+        try:
+            return_code: int = asyncio.run(self._run_command(timeout=timeout))
+        except subprocess.SubprocessError as e:
+            logger.error('Running simulation failed due to subprocess error %s', e)
+            self._exc = TwisterHarnessException(e.args)
+        except FileNotFoundError as e:
+            logger.error(f'Running simulation failed due to file not found: {e.filename}')
+            self._exc = TwisterHarnessException(f'File not found: {e.filename}')
+        except Exception as e:
+            logger.error('Running simulation failed: %s', e)
+            self._exc = TwisterHarnessException(e.args)
+        else:
+            if return_code == 0:
+                logger.info('Running simulation finished with return code %s', return_code)
+            elif return_code == -15:
+                logger.info('Running simulation stopped interrupted by user')
+            else:
+                logger.warning('Running simulation finished with return code %s', return_code)
+        finally:
+            self.queue.put(END_OF_DATA)  # indicate to the other threads that there will be no more data in queue
+
+    async def _run_command(self, timeout: float = 60.):
+        assert isinstance(self.command, (list, tuple, set))
+        # to avoid stupid and difficult to debug mistakes
+        # we are using asyncio to run subprocess to be able to read from stdout
+        # without blocking while loop (readline with timeout)
+        self._process = await asyncio.create_subprocess_exec(
+            *self.command,
+            **self.process_kwargs
+        )
+        logger.debug('Started subprocess with PID %s', self._process.pid)
+        end_time = time.time() + timeout
+        while not self._stop_job and not self._process.stdout.at_eof():  # type: ignore[union-attr]
+            if line := await self._read_line(timeout=0.1):
+                self.queue.put(line.decode('utf-8').strip())
+            if time.time() > end_time:
+                self._process_ended_with_timeout = True
+                logger.info(f'Finished process with PID {self._process.pid} after {timeout} seconds timeout')
+                break
+            if self._data_to_send:
+                self._process.stdin.write(self._data_to_send)  # type: ignore[union-attr]
+                await self._process.stdin.drain()  # type: ignore[union-attr]
+                self._data_to_send = None
+
+        self.queue.put(END_OF_DATA)  # indicate to the other threads that there will be no more data in queue
+        return await self._process.wait()
+
+    async def _read_line(self, timeout=0.1) -> bytes | None:
+        try:
+            return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout)  # type: ignore[union-attr]
+        except asyncio.TimeoutError:
+            return None
+
+    def disconnect(self):
+        pass  # pragma: no cover
+
+    def stop(self) -> None:
+        """Stop device."""
+        self._stop_job = True
+        time.sleep(0.1)  # give a time to end while loop in running simulation
+        if self._process is not None and self._process.returncode is None:
+            logger.debug('Stopping all running processes for PID %s', self._process.pid)
+            # kill subprocess if it is still running
+            for child in psutil.Process(self._process.pid).children(recursive=True):
+                try:
+                    os.kill(child.pid, signal.SIGTERM)
+                except ProcessLookupError:
+                    pass
+            # kill subprocess if it is still running
+            os.kill(self._process.pid, signal.SIGTERM)
+        if self._thread is not None:
+            self._thread.join(timeout=1)  # Should end immediately, but just in case we set timeout for 1 sec
+        if self._exc:
+            raise self._exc
+
+    @property
+    def iter_stdout(self) -> Generator[str, None, None]:
+        """Return output from serial."""
+        while True:
+            stream = self.queue.get()
+            if stream == END_OF_DATA:
+                logger.debug('No more data from running process')
+                break
+            self.handler_log_file.handle(data=stream + '\n')
+            yield stream
+            self.queue.task_done()
+
+    def write(self, data: bytes) -> None:
+        """Write data to serial"""
+        while self._data_to_send:
+            # wait data will be write to self._process.stdin.write
+            time.sleep(0.1)
+        self._data_to_send = data
+
+    def initialize_log_files(self):
+        self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
+        start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
+        self.handler_log_file.handle(start_msg)
+
+
+class NativeSimulatorAdapter(SimulatorAdapterBase):
+    """Simulator adapter to run `zephyr.exe` simulation"""
+
+    def generate_command(self) -> None:
+        """Return command to run."""
+        self.command = [
+            str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve())
+        ]
+
+
+class UnitSimulatorAdapter(SimulatorAdapterBase):
+    """Simulator adapter to run unit tests"""
+
+    def generate_command(self) -> None:
+        """Return command to run."""
+        self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())]
+
+
+class CustomSimulatorAdapter(SimulatorAdapterBase):
+
+    def generate_command(self) -> None:
+        """Return command to run."""
+        if (west := shutil.which('west')) is None:
+            logger.error('west not found')
+            self.command = []
+        else:
+            self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py
new file mode 100644
index 0000000..4fbd3b5
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/exceptions.py
@@ -0,0 +1,6 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class TwisterHarnessException(Exception):
+    """General Twister harness exception."""
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py
new file mode 100644
index 0000000..235c666
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py
new file mode 100644
index 0000000..90e82cf
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures/dut.py
@@ -0,0 +1,39 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+from typing import Generator, Type
+
+import pytest
+
+from twister_harness.device.device_abstract import DeviceAbstract
+from twister_harness.device.factory import DeviceFactory
+from twister_harness.twister_harness_config import DeviceConfig, TwisterHarnessConfig
+
+logger = logging.getLogger(__name__)
+
+
+@pytest.fixture(scope='function')
+def dut(request: pytest.FixtureRequest) -> Generator[DeviceAbstract, None, None]:
+    """Return device instance."""
+    twister_harness_config: TwisterHarnessConfig = request.config.twister_harness_config  # type: ignore
+    device_config: DeviceConfig = twister_harness_config.devices[0]
+    device_type = device_config.type
+
+    device_class: Type[DeviceAbstract] = DeviceFactory.get_device(device_type)
+
+    device = device_class(device_config)
+
+    try:
+        device.connect()
+        device.generate_command()
+        device.initialize_log_files()
+        device.flash_and_run()
+        device.connect()
+        yield device
+    except KeyboardInterrupt:
+        pass
+    finally:  # to make sure we close all running processes after user broke execution
+        device.disconnect()
+        device.stop()
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py
new file mode 100644
index 0000000..d60717d
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/helper.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os.path
+import platform
+import shlex
+
+_WINDOWS = platform.system() == 'Windows'
+
+logger = logging.getLogger(__name__)
+
+
+def log_command(logger: logging.Logger, msg: str, args: list, level: int = logging.DEBUG):
+    """
+    Platform-independent helper for logging subprocess invocations.
+
+    Will log a command string that can be copy/pasted into a POSIX
+    shell on POSIX platforms. This is not available on Windows, so
+    the entire args array is logged instead.
+
+    :param logger: logging.Logger to use
+    :param msg: message to associate with the command
+    :param args: argument list as passed to subprocess module
+    :param level: log level
+    """
+    msg = f'{msg}: %s'
+    if _WINDOWS:
+        logger.log(level, msg, str(args))
+    else:
+        logger.log(level, msg, shlex.join(args))
+
+
+def normalize_filename(filename: str) -> str:
+    filename = os.path.expanduser(os.path.expandvars(filename))
+    filename = os.path.normpath(os.path.abspath(filename))
+    return filename
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py
new file mode 100644
index 0000000..84fa9ca
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging.config
+import os
+
+import pytest
+
+
+def configure_logging(config: pytest.Config) -> None:
+    """Configure logging."""
+    output_dir = config.option.output_dir
+    os.makedirs(output_dir, exist_ok=True)
+    log_file = os.path.join(output_dir, 'twister_harness.log')
+
+    if hasattr(config, 'workerinput'):
+        worker_id = config.workerinput['workerid']
+        log_file = os.path.join(output_dir, f'twister_harness_{worker_id}.log')
+
+    log_format = '%(asctime)s:%(levelname)s:%(name)s: %(message)s'
+    log_level = config.getoption('--log-level') or config.getini('log_level') or logging.INFO
+    log_file = config.getoption('--log-file') or config.getini('log_file') or log_file
+    log_format = config.getini('log_cli_format') or log_format
+
+    default_config = {
+        'version': 1,
+        'disable_existing_loggers': False,
+        'formatters': {
+            'standard': {
+                'format': log_format,
+            },
+            'simply': {
+                'format': '%(asctime)s.%(msecs)d:%(levelname)s: %(message)s',
+                'datefmt': '%H:%M:%S'
+            }
+        },
+        'handlers': {
+            'file': {
+                'class': 'logging.FileHandler',
+                'level': 'DEBUG',
+                'formatter': 'standard',
+                'filters': [],
+                'filename': log_file,
+                'encoding': 'utf8',
+                'mode': 'w'
+            },
+            'console': {
+                'class': 'logging.StreamHandler',
+                'level': 'DEBUG',
+                'formatter': 'simply',
+                'filters': [],
+            }
+        },
+        'loggers': {
+            '': {
+                'handlers': ['console', 'file'],
+                'level': 'WARNING',
+                'propagate': False
+            },
+            'twister_harness': {
+                'handlers': ['console', 'file'],
+                'level': log_level,
+                'propagate': False,
+            }
+        }
+    }
+
+    logging.config.dictConfig(default_config)
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py
new file mode 100755
index 0000000..235c666
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/__init__.py
@@ -0,0 +1,3 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py
new file mode 100755
index 0000000..7c4b333
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/log_files/log_file.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+import sys
+from pathlib import Path
+
+from twister_harness.helper import normalize_filename
+
+logger = logging.getLogger(__name__)
+
+
+class LogFile:
+    """Base class for logging files."""
+    name = 'uninitialized'
+
+    def __init__(self, filename: str | Path) -> None:
+        self.default_encoding = sys.getdefaultencoding()
+        self.filename = filename
+
+    @staticmethod
+    def get_log_filename(build_dir: Path | str, name: str) -> str:
+        """
+        :param build_dir: path to building directory.
+        :param name: name of the logging file.
+        :return: path to logging file
+        """
+        if not build_dir:
+            filename = os.devnull
+        else:
+            name = name + '.log'
+            filename = os.path.join(build_dir, name)
+            filename = normalize_filename(filename=filename)
+        return filename
+
+    def handle(self, data: str | bytes) -> None:
+        """Save information to logging file."""
+        if data:
+            data = data.decode(encoding=self.default_encoding) if isinstance(data, bytes) else data
+            with open(file=self.filename, mode='a+', encoding=self.default_encoding) as log_file:
+                log_file.write(data)  # type: ignore[arg-type]
+
+    @classmethod
+    def create(cls, build_dir: Path | str = '') -> LogFile:
+        filename = cls.get_log_filename(build_dir=build_dir, name=cls.name)
+        return cls(filename)
+
+
+class BuildLogFile(LogFile):
+    """Save logs from the building."""
+    name = 'build'
+
+
+class HandlerLogFile(LogFile):
+    """Save output from a device."""
+    name = 'handler'
+
+
+class DeviceLogFile(LogFile):
+    """Save errors during flashing onto device."""
+    name = 'device'
+
+
+class NullLogFile(LogFile):
+    """Placeholder for no initialized log file"""
+    def handle(self, data: str | bytes) -> None:
+        """This method does nothing."""
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py
new file mode 100644
index 0000000..a510648
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py
@@ -0,0 +1,148 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+import os
+from pathlib import Path
+
+import pytest
+
+from twister_harness.log import configure_logging
+from twister_harness.twister_harness_config import TwisterHarnessConfig
+
+logger = logging.getLogger(__name__)
+
+pytest_plugins = (
+    'twister_harness.fixtures.dut'
+)
+
+
+def pytest_addoption(parser: pytest.Parser):
+    twister_harness_group = parser.getgroup('Twister harness')
+    twister_harness_group.addoption(
+        '--twister-harness',
+        action='store_true',
+        default=False,
+        help='Activate Twister harness plugin'
+    )
+    parser.addini(
+        'twister_harness',
+        'Activate Twister harness plugin',
+        type='bool'
+    )
+    twister_harness_group.addoption(
+        '-O',
+        '--outdir',
+        metavar='PATH',
+        dest='output_dir',
+        help='Output directory for logs. If not provided then use '
+             '--build-dir path as default.'
+    )
+    twister_harness_group.addoption(
+        '--platform',
+        help='Choose specific platform'
+    )
+    twister_harness_group.addoption(
+        '--device-type',
+        choices=('native', 'qemu', 'hardware', 'unit', 'custom'),
+        help='Choose type of device (hardware, qemu, etc.)'
+    )
+    twister_harness_group.addoption(
+        '--device-serial',
+        help='Serial device for accessing the board '
+             '(e.g., /dev/ttyACM0)'
+    )
+    twister_harness_group.addoption(
+        '--device-serial-baud',
+        type=int,
+        default=115200,
+        help='Serial device baud rate (default 115200)'
+    )
+    twister_harness_group.addoption(
+        '--runner',
+        help='use the specified west runner (pyocd, nrfjprog, etc)'
+    )
+    twister_harness_group.addoption(
+        '--device-id',
+        help='ID of connected hardware device (for example 000682459367)'
+    )
+    twister_harness_group.addoption(
+        '--device-product',
+        help='Product name of connected hardware device (for example "STM32 STLink")'
+    )
+    twister_harness_group.addoption(
+        '--device-serial-pty',
+        metavar='PATH',
+        help='Script for controlling pseudoterminal. '
+             'E.g --device-testing --device-serial-pty=<script>'
+    )
+    twister_harness_group.addoption(
+        '--west-flash-extra-args',
+        help='Extend parameters for west flash. '
+             'E.g. --west-flash-extra-args="--board-id=foobar,--erase" '
+             'will translate to "west flash -- --board-id=foobar --erase"'
+    )
+    twister_harness_group.addoption(
+        '--flashing-timeout',
+        type=int,
+        default=60,
+        help='Set timeout for the device flash operation in seconds.'
+    )
+    twister_harness_group.addoption(
+        '--build-dir',
+        dest='build_dir',
+        metavar='PATH',
+        help='Directory with built application.'
+    )
+    twister_harness_group.addoption(
+        '--binary-file',
+        metavar='PATH',
+        help='Path to file which should be flashed.'
+    )
+    twister_harness_group.addoption(
+        '--pre-script',
+        metavar='PATH'
+    )
+    twister_harness_group.addoption(
+        '--post-script',
+        metavar='PATH'
+    )
+    twister_harness_group.addoption(
+        '--post-flash-script',
+        metavar='PATH'
+    )
+
+
+def pytest_configure(config: pytest.Config):
+    if config.getoption('help'):
+        return
+
+    if not (config.getoption('twister_harness') or config.getini('twister_harness')):
+        return
+
+    validate_options(config)
+
+    if config.option.output_dir is None:
+        config.option.output_dir = config.option.build_dir
+    config.option.output_dir = _normalize_path(config.option.output_dir)
+
+    # create output directory if not exists
+    os.makedirs(config.option.output_dir, exist_ok=True)
+
+    configure_logging(config)
+
+    config.twister_harness_config = TwisterHarnessConfig.create(config)  # type: ignore
+
+
+def validate_options(config: pytest.Config) -> None:
+    """Verify if user provided proper options"""
+    # TBD
+
+
+def _normalize_path(path: str | Path) -> str:
+    path = os.path.expanduser(os.path.expandvars(path))
+    path = os.path.normpath(os.path.abspath(path))
+    return path
diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py
new file mode 100644
index 0000000..7f3251a
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py
@@ -0,0 +1,75 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass, field
+from pathlib import Path
+
+import pytest
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class DeviceConfig:
+    platform: str = ''
+    type: str = ''
+    serial: str = ''
+    baud: int = 115200
+    runner: str = ''
+    id: str = ''
+    product: str = ''
+    serial_pty: str = ''
+    west_flash_extra_args: list[str] = field(default_factory=list, repr=False)
+    flashing_timeout: int = 60  # [s]
+    build_dir: Path | str = ''
+    binary_file: Path | str = ''
+    name: str = ''
+    pre_script: str = ''
+    post_script: str = ''
+    post_flash_script: str = ''
+
+
+@dataclass
+class TwisterHarnessConfig:
+    """Store Twister harness configuration to have easy access in test."""
+    output_dir: Path = Path('twister_harness_out')
+    devices: list[DeviceConfig] = field(default_factory=list, repr=False)
+
+    @classmethod
+    def create(cls, config: pytest.Config) -> TwisterHarnessConfig:
+        """Create new instance from pytest.Config."""
+        output_dir: Path = config.option.output_dir
+
+        devices = []
+
+        west_flash_extra_args: list[str] = []
+        if config.option.west_flash_extra_args:
+            west_flash_extra_args = [w.strip() for w in config.option.west_flash_extra_args.split(',')]
+        device_from_cli = DeviceConfig(
+            platform=config.option.platform,
+            type=config.option.device_type,
+            serial=config.option.device_serial,
+            baud=config.option.device_serial_baud,
+            runner=config.option.runner,
+            id=config.option.device_id,
+            product=config.option.device_product,
+            serial_pty=config.option.device_serial_pty,
+            west_flash_extra_args=west_flash_extra_args,
+            flashing_timeout=config.option.flashing_timeout,
+            build_dir=config.option.build_dir,
+            binary_file=config.option.binary_file,
+            pre_script=config.option.pre_script,
+            post_script=config.option.post_script,
+            post_flash_script=config.option.post_flash_script,
+        )
+
+        devices.append(device_from_cli)
+
+        return cls(
+            output_dir=output_dir,
+            devices=devices
+        )
diff --git a/scripts/pylib/pytest-twister-harness/tests/conftest.py b/scripts/pylib/pytest-twister-harness/tests/conftest.py
new file mode 100755
index 0000000..a3e7f7a
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/conftest.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from pathlib import Path
+
+import pytest
+
+# pytest_plugins = ['pytester']
+
+
+@pytest.fixture
+def resources(request: pytest.FixtureRequest) -> Path:
+    """Return path to `data` folder"""
+    return Path(request.module.__file__).parent.joinpath('data')
+
+
+@pytest.fixture(scope='function')
+def copy_example(pytester) -> Path:
+    """Copy example tests to temporary directory and return path the temp directory."""
+    resources_dir = Path(__file__).parent / 'data'
+    pytester.copy_example(str(resources_dir))
+    return pytester.path
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/data/fifo_mock.py b/scripts/pylib/pytest-twister-harness/tests/device/data/fifo_mock.py
new file mode 100755
index 0000000..f269766
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/data/fifo_mock.py
@@ -0,0 +1,100 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import os
+import sys
+import threading
+import time
+from argparse import ArgumentParser
+
+content = """
+The Zen of Python, by Tim Peters
+
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Although that way may not be obvious at first unless you're Dutch.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+Namespaces are one honking great idea -- let's do more of those!
+"""
+
+
+class FifoFile:
+    def __init__(self, filename, mode):
+        self.filename = filename
+        self.mode = mode
+        self.thread = None
+        self.file = None
+        self.logger = logging.getLogger(__name__)
+
+    def _open(self):
+        self.logger.info(f'Creating fifo file: {self.filename}')
+        end_time = time.time() + 2
+        while not os.path.exists(self.filename):
+            time.sleep(0.1)
+            if time.time() > end_time:
+                self.logger.error(f'Did not able create fifo file: {self.filename}')
+                return
+        self.file = open(self.filename, self.mode, buffering=0)
+        self.logger.info(f'File created: {self.filename}')
+
+    def open(self):
+        self.thread = threading.Thread(target=self._open(), daemon=True)
+        self.thread.start()
+
+    def write(self, data):
+        if self.file:
+            self.file.write(data)
+
+    def read(self):
+        if self.file:
+            return self.file.readline()
+
+    def close(self):
+        if self.file:
+            self.file.close()
+        self.thread.join(1)
+        self.logger.info(f'Closed file: {self.filename}')
+
+    def __enter__(self):
+        self.open()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+
+def main():
+    logging.basicConfig(level='DEBUG')
+    parser = ArgumentParser()
+    parser.add_argument('file')
+    args = parser.parse_args()
+    read_path = args.file + '.in'
+    write_path = args.file + '.out'
+    logger = logging.getLogger(__name__)
+    logger.info('Start')
+
+    with FifoFile(read_path, 'rb'), FifoFile(write_path, 'wb') as wf:
+        for line in content.split('\n'):
+            wf.write(f'{line}\n'.encode('utf-8'))
+            time.sleep(0.1)
+    return 0
+
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/data/mock_script.py b/scripts/pylib/pytest-twister-harness/tests/device/data/mock_script.py
new file mode 100755
index 0000000..631e93e
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/data/mock_script.py
@@ -0,0 +1,68 @@
+#!/usr/bin/python
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Simply mock for bash script to use with unit tests.
+"""
+import sys
+import time
+from argparse import ArgumentParser
+
+s = """
+The Zen of Python, by Tim Peters
+
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Although that way may not be obvious at first unless you're Dutch.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+Namespaces are one honking great idea -- let's do more of those!
+"""
+
+
+def main() -> int:
+    parser = ArgumentParser()
+    parser.add_argument('--sleep', action='store', default=0, type=float)
+    parser.add_argument('--long-sleep', action='store_true')
+    parser.add_argument('--return-code', action='store', default=0, type=int)
+    parser.add_argument('--exception', action='store_true')
+
+    args = parser.parse_args()
+
+    if args.exception:
+        # simulate crashing application
+        raise Exception
+
+    if args.long_sleep:
+        # prints data and wait for certain time
+        for line in s.split('\n'):
+            print(line, flush=True)
+        time.sleep(args.sleep)
+    else:
+        # prints lines with delay
+        for line in s.split('\n'):
+            print(line, flush=True)
+            time.sleep(args.sleep)
+
+    print('End of script', flush=True)
+    print('Returns with code', args.return_code, flush=True)
+    return args.return_code
+
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py
new file mode 100644
index 0000000..b7e40e9
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+from pathlib import Path
+from unittest import mock
+
+import pytest
+
+from twister_harness.device.hardware_adapter import HardwareAdapter
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+@pytest.fixture(name='device')
+def fixture_adapter() -> HardwareAdapter:
+    device_config = DeviceConfig(
+        runner='runner',
+        build_dir=Path('build'),
+        platform='platform',
+        id='p_id',
+    )
+    return HardwareAdapter(device_config)
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_1(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == ['west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'runner']
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_2(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'pyocd'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd', '--', '--board-id', 'p_id'
+    ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_raise_exception_if_west_is_not_installed(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = None
+    with pytest.raises(TwisterHarnessException, match='west not found'):
+        device.generate_command()
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_3(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'nrfjprog'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
+    ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_4(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'openocd'
+    device.device_config.product = 'STM32 STLink'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'openocd',
+        '--', '--cmd-pre-init', 'hla_serial p_id'
+    ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_5(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'openocd'
+    device.device_config.product = 'EDBG CMSIS-DAP'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'openocd',
+        '--', '--cmd-pre-init', 'cmsis_dap_serial p_id'
+    ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_6(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'jlink'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'jlink',
+        '--tool-opt=-SelectEmuBySN p_id'
+    ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_7(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'stm32cubeprogrammer'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'stm32cubeprogrammer',
+        '--tool-opt=sn=p_id'
+    ]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_8(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.runner = 'openocd'
+    device.device_config.product = 'STLINK-V3'
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build',
+        '--runner', 'openocd', '--', '--cmd-pre-init', 'hla_serial p_id'
+    ]
+
+
+def test_if_hardware_adapter_raises_exception_empty_command(device: HardwareAdapter) -> None:
+    device.command = []
+    exception_msg = 'Flash command is empty, please verify if it was generated properly.'
+    with pytest.raises(TwisterHarnessException, match=exception_msg):
+        device.flash_and_run()
+
+
+def test_handler_and_device_log_correct_initialized_on_hardware(device: HardwareAdapter, tmp_path: Path) -> None:
+    device.device_config.build_dir = tmp_path
+    device.initialize_log_files()
+    assert isinstance(device.handler_log_file, HandlerLogFile)
+    assert isinstance(device.device_log_file, DeviceLogFile)
+    assert device.handler_log_file.filename.endswith('handler.log')  # type: ignore[union-attr]
+    assert device.device_log_file.filename.endswith('device.log')  # type: ignore[union-attr]
+
+
+@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
+def test_device_log_correct_error_handle(patched_popen, device: HardwareAdapter, tmp_path: Path) -> None:
+    popen_mock = mock.Mock()
+    popen_mock.communicate.return_value = (b'', b'flashing error')
+    patched_popen.return_value = popen_mock
+    device.device_config.build_dir = tmp_path
+    device.initialize_log_files()
+    device.command = [
+        'west', 'flash', '--skip-rebuild', '--build-dir', str(tmp_path),
+        '--runner', 'nrfjprog', '--', '--dev-id', 'p_id'
+    ]
+    with pytest.raises(expected_exception=TwisterHarnessException, match='Could not flash device p_id'):
+        device.flash_and_run()
+    assert os.path.isfile(device.device_log_file.filename)
+    with open(device.device_log_file.filename, 'r') as file:
+        assert 'flashing error' in file.readlines()
+
+
+@mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen')
+@mock.patch('twister_harness.device.hardware_adapter.serial.Serial')
+def test_if_hardware_adapter_uses_serial_pty(
+    patched_serial, patched_popen, device: HardwareAdapter, monkeypatch: pytest.MonkeyPatch
+):
+    device.device_config.serial_pty = 'script.py'
+
+    popen_mock = mock.Mock()
+    popen_mock.communicate.return_value = (b'output', b'error')
+    patched_popen.return_value = popen_mock
+
+    monkeypatch.setattr('twister_harness.device.hardware_adapter.pty.openpty', lambda: (123, 456))
+    monkeypatch.setattr('twister_harness.device.hardware_adapter.os.ttyname', lambda x: f'/pty/ttytest/{x}')
+
+    serial_mock = mock.Mock()
+    serial_mock.port = '/pty/ttytest/456'
+    patched_serial.return_value = serial_mock
+
+    device.connect()
+    assert device.connection.port == '/pty/ttytest/456'  # type: ignore[union-attr]
+    assert device.serial_pty_proc
+    patched_popen.assert_called_with(
+        ['script.py'],
+        stdout=123,
+        stdin=123,
+        stderr=123
+    )
+
+    device.disconnect()
+    assert not device.connection
+    assert not device.serial_pty_proc
+
+
+@mock.patch('twister_harness.device.hardware_adapter.shutil.which')
+def test_if_get_command_returns_proper_string_with_west_flash(patched_which, device: HardwareAdapter) -> None:
+    patched_which.return_value = 'west'
+    device.device_config.west_flash_extra_args = ['--board-id=foobar', '--erase']
+    device.device_config.runner = 'pyocd'
+    device.device_config.id = ''
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [
+        'west', 'flash', '--skip-rebuild', '--build-dir', 'build', '--runner', 'pyocd',
+        '--', '--board-id=foobar', '--erase'
+    ]
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py
new file mode 100755
index 0000000..9b47ce3
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+import subprocess
+from typing import Generator
+from unittest import mock
+from unittest.mock import patch
+
+import pytest
+
+from twister_harness.device.qemu_adapter import QemuAdapter
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+@pytest.fixture(name='device')
+def fixture_device_adapter(tmp_path) -> Generator[QemuAdapter, None, None]:
+    build_dir = tmp_path / 'build_dir'
+    adapter = QemuAdapter(DeviceConfig(build_dir=build_dir))
+    yield adapter
+    try:
+        adapter.stop()  # to make sure all running processes are closed
+    except TwisterHarnessException:
+        pass
+
+
+@patch('shutil.which', return_value='/usr/bin/west')
+def test_if_generate_command_creates_proper_command(patched_which):
+    adapter = QemuAdapter(DeviceConfig(build_dir='build_dir'))
+    adapter.generate_command()
+    assert adapter.command == ['/usr/bin/west', 'build', '-d', 'build_dir', '-t', 'run']
+
+
+@patch('shutil.which', return_value=None)
+def test_if_generate_command_creates_empty_listy_if_west_is_not_installed(patched_which):
+    adapter = QemuAdapter(DeviceConfig())
+    adapter.generate_command()
+    assert adapter.command == []
+
+
+def test_if_qemu_adapter_raises_exception_for_empty_command(device) -> None:
+    device.command = []
+    exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
+    with pytest.raises(TwisterHarnessException, match=exception_msg):
+        device.flash_and_run(timeout=0.1)
+
+
+def test_if_qemu_adapter_raises_exception_file_not_found(device) -> None:
+    device.command = ['dummy']
+    with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
+        device.flash_and_run(timeout=0.1)
+        device.stop()
+    assert device._exc is not None
+    assert isinstance(device._exc, TwisterHarnessException)
+
+
+@mock.patch('subprocess.Popen', side_effect=subprocess.SubprocessError(1, 'Exception message'))
+def test_if_qemu_adapter_raises_exception_when_subprocess_raised_an_error(patched_run, device):
+    device.command = ['echo', 'TEST']
+    with pytest.raises(TwisterHarnessException, match='Exception message'):
+        device.flash_and_run(timeout=0.1)
+        device.stop()
+
+
+def test_if_qemu_adapter_runs_without_errors(resources, tmp_path) -> None:
+    fifo_file_path = str(tmp_path / 'qemu-fifo')
+    script_path = resources.joinpath('fifo_mock.py')
+    device = QemuAdapter(DeviceConfig(build_dir=str(tmp_path)))
+    device.booting_timeout_in_ms = 1000
+    device.command = ['python', str(script_path), fifo_file_path]
+    device.connect()
+    device.initialize_log_files()
+    device.flash_and_run(timeout=1)
+    lines = list(device.iter_stdout)
+    assert 'Readability counts.' in lines
+    assert os.path.isfile(device.handler_log_file.filename)
+    with open(device.handler_log_file.filename, 'r') as file:
+        file_lines = [line.strip() for line in file.readlines()]
+    assert file_lines[-2:] == lines[-2:]
+    device.disconnect()
+
+
+def test_if_qemu_adapter_finishes_after_timeout(device) -> None:
+    device.command = ['sleep', '0.3']
+    device.flash_and_run(timeout=0.1)
+    device.stop()
+    assert device._process_ended_with_timeout is True
+
+
+def test_handler_and_device_log_correct_initialized_on_qemu(device, tmp_path) -> None:
+    device.device_config.build_dir = tmp_path
+    device.initialize_log_files()
+    assert isinstance(device.handler_log_file, HandlerLogFile)
+    assert isinstance(device.device_log_file, NullLogFile)
+    assert device.handler_log_file.filename.endswith('handler.log')  # type: ignore[union-attr]
diff --git a/scripts/pylib/pytest-twister-harness/tests/device/simulator_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/simulator_adapter_test.py
new file mode 100755
index 0000000..c7e4003
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/device/simulator_adapter_test.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+import subprocess
+from pathlib import Path
+from unittest import mock
+
+import pytest
+
+from twister_harness.device.simulator_adapter import (
+    CustomSimulatorAdapter,
+    NativeSimulatorAdapter,
+    UnitSimulatorAdapter,
+)
+from twister_harness.exceptions import TwisterHarnessException
+from twister_harness.log_files.log_file import HandlerLogFile, NullLogFile
+from twister_harness.twister_harness_config import DeviceConfig
+
+
+@pytest.fixture(name='device')
+def fixture_adapter(tmp_path) -> NativeSimulatorAdapter:
+    return NativeSimulatorAdapter(DeviceConfig(build_dir=tmp_path))
+
+
+def test_if_native_simulator_adapter_get_command_returns_proper_string(
+        device: NativeSimulatorAdapter, resources: Path
+) -> None:
+    device.device_config.build_dir = resources
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [str(resources.joinpath('zephyr', 'zephyr.exe'))]
+
+
+def test_if_native_simulator_adapter_runs_without_errors(
+        resources: Path, device: NativeSimulatorAdapter
+) -> None:
+    """
+    Run script which prints text line by line and ends without errors.
+    Verify if subprocess was ended without errors, and without timeout.
+    """
+    script_path = resources.joinpath('mock_script.py')
+    # patching original command by mock_script.py to simulate same behaviour as zephyr.exe
+    device.command = ['python3', str(script_path)]
+    device.initialize_log_files()
+    device.flash_and_run(timeout=4)
+    lines = list(device.iter_stdout)  # give it time before close thread
+    device.stop()
+    assert device._process_ended_with_timeout is False
+    assert 'Readability counts.' in lines
+    assert os.path.isfile(device.handler_log_file.filename)
+    with open(device.handler_log_file.filename, 'r') as file:
+        file_lines = [line.strip() for line in file.readlines()]
+    assert file_lines[-2:] == lines[-2:]
+
+
+def test_if_native_simulator_adapter_finishes_after_timeout_while_there_is_no_data_from_subprocess(
+        resources: Path, device: NativeSimulatorAdapter
+) -> None:
+    """Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
+    script_path = resources.joinpath('mock_script.py')
+    device.command = ['python3', str(script_path), '--long-sleep', '--sleep=5']
+    device.initialize_log_files()
+    device.flash_and_run(timeout=0.5)
+    lines = list(device.iter_stdout)
+    device.stop()
+    assert device._process_ended_with_timeout is True
+    assert device._exc is None
+    # this message should not be printed because script has been terminated due to timeout
+    assert 'End of script' not in lines, 'Script has not been terminated before end'
+
+
+def test_if_native_simulator_adapter_raises_exception_file_not_found(device: NativeSimulatorAdapter) -> None:
+    device.command = ['dummy']
+    with pytest.raises(TwisterHarnessException, match='File not found: dummy'):
+        device.flash_and_run(timeout=0.1)
+        device.stop()
+    assert device._exc is not None
+    assert isinstance(device._exc, TwisterHarnessException)
+
+
+def test_if_simulator_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
+    device.command = []
+    exception_msg = 'Run simulation command is empty, please verify if it was generated properly.'
+    with pytest.raises(TwisterHarnessException, match=exception_msg):
+        device.flash_and_run(timeout=0.1)
+
+
+def test_handler_and_device_log_correct_initialized_on_simulators(device: NativeSimulatorAdapter) -> None:
+    device.initialize_log_files()
+    assert isinstance(device.handler_log_file, HandlerLogFile)
+    assert isinstance(device.device_log_file, NullLogFile)
+    assert device.handler_log_file.filename.endswith('handler.log')  # type: ignore[union-attr]
+
+
+@mock.patch('asyncio.run', side_effect=subprocess.SubprocessError(1, 'Exception message'))
+def test_if_simulator_adapter_raises_exception_when_subprocess_raised_subprocess_error(
+    patched_run, device: NativeSimulatorAdapter
+):
+    device.command = ['echo', 'TEST']
+    with pytest.raises(TwisterHarnessException, match='Exception message'):
+        device.flash_and_run(timeout=0.1)
+        device.stop()
+
+
+@mock.patch('asyncio.run', side_effect=Exception(1, 'Raised other exception'))
+def test_if_simulator_adapter_raises_exception_when_subprocess_raised_an_error(
+    patched_run, device: NativeSimulatorAdapter
+):
+    device.command = ['echo', 'TEST']
+    with pytest.raises(TwisterHarnessException, match='Raised other exception'):
+        device.flash_and_run(timeout=0.1)
+        device.stop()
+
+
+@mock.patch('shutil.which', return_value='west')
+def test_if_custom_simulator_adapter_get_command_returns_proper_string(patched_which) -> None:
+    device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == ['west', 'build', '-d', 'build_dir', '-t', 'run']
+
+
+@mock.patch('shutil.which', return_value=None)
+def test_if_custom_simulator_adapter_get_command_returns_empty_string(patched_which) -> None:
+    device = CustomSimulatorAdapter(DeviceConfig(build_dir='build_dir'))
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == []
+
+
+def test_if_unit_simulator_adapter_get_command_returns_proper_string(resources: Path) -> None:
+    device = UnitSimulatorAdapter(DeviceConfig(build_dir=resources))
+    device.generate_command()
+    assert isinstance(device.command, list)
+    assert device.command == [str(resources.joinpath('testbinary'))]
diff --git a/scripts/pylib/pytest-twister-harness/tests/log_file/log_file_test.py b/scripts/pylib/pytest-twister-harness/tests/log_file/log_file_test.py
new file mode 100755
index 0000000..e2c82f1
--- /dev/null
+++ b/scripts/pylib/pytest-twister-harness/tests/log_file/log_file_test.py
@@ -0,0 +1,54 @@
+# Copyright (c) 2023 Nordic Semiconductor ASA
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import os
+
+import pytest
+
+from twister_harness.log_files.log_file import LogFile
+
+logger = logging.getLogger(__name__)
+
+
+@pytest.fixture
+def sample_log_file(tmpdir):
+    log_file = LogFile.create(build_dir=tmpdir)
+    yield log_file
+
+
+def test_if_filename_is_correct(sample_log_file: LogFile):
+    assert sample_log_file.filename.endswith('uninitialized.log')  # type: ignore[union-attr]
+
+
+def test_handle_data_is_str(sample_log_file: LogFile):
+    msg = 'str message'
+    sample_log_file.handle(data=msg)
+    assert os.path.exists(path=sample_log_file.filename)
+    with open(file=sample_log_file.filename, mode='r') as file:
+        assert file.readline() == 'str message'
+
+
+def test_handle_data_is_byte(sample_log_file: LogFile):
+    msg = b'bytes message'
+    sample_log_file.handle(data=msg)
+    assert os.path.exists(path=sample_log_file.filename)
+    with open(file=sample_log_file.filename, mode='r') as file:
+        assert file.readline() == 'bytes message'
+
+
+def test_handle_data_is_empty(sample_log_file: LogFile):
+    msg = ''
+    sample_log_file.handle(data=msg)
+    assert not os.path.exists(path=sample_log_file.filename)
+
+
+def test_get_log_filename_null_filename():
+    log_file = LogFile.create()
+    assert log_file.filename == os.devnull
+
+
+def test_get_log_filename_sample_filename(tmpdir):
+    log_file = LogFile.create(build_dir=tmpdir)
+    assert log_file.filename == os.path.join(tmpdir, 'uninitialized.log')