blob: 228981be0f170b94b72d5661c720fcf14c164487 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import abc
import logging
import subprocess
from twister_harness.device.device_adapter import DeviceAdapter
from twister_harness.device.utils import log_command, terminate_process
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class BinaryAdapterBase(DeviceAdapter, abc.ABC):
def __init__(self, device_config: DeviceConfig) -> None:
"""
:param twister_config: twister configuration
"""
super().__init__(device_config)
self._process: subprocess.Popen | None = None
self.process_kwargs: dict = {
'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'stdin': subprocess.PIPE,
'env': self.env,
}
@abc.abstractmethod
def generate_command(self) -> None:
"""Generate and set command which will be used during running device."""
def _flash_and_run(self) -> None:
self._run_subprocess()
def _run_subprocess(self) -> None:
if not self.command:
msg = 'Run command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
log_command(logger, 'Running command', self.command, level=logging.DEBUG)
try:
self._process = subprocess.Popen(self.command, **self.process_kwargs)
except subprocess.SubprocessError as exc:
msg = f'Running subprocess failed due to SubprocessError {exc}'
logger.error(msg)
raise TwisterHarnessException(msg) from exc
except FileNotFoundError as exc:
msg = f'Running subprocess failed due to file not found: {exc.filename}'
logger.error(msg)
raise TwisterHarnessException(msg) from exc
except Exception as exc:
msg = f'Running subprocess failed {exc}'
logger.error(msg)
raise TwisterHarnessException(msg) from exc
def _connect_device(self) -> None:
"""
This method was implemented only to imitate standard connect behavior
like in Serial class.
"""
def _disconnect_device(self) -> None:
"""
This method was implemented only to imitate standard disconnect behavior
like in serial connection.
"""
def _close_device(self) -> None:
"""Terminate subprocess"""
self._stop_subprocess()
def _stop_subprocess(self) -> None:
if self._process is None:
# subprocess already stopped
return
return_code: int | None = self._process.poll()
if return_code is None:
terminate_process(self._process)
return_code = self._process.wait(self.base_timeout)
self._process = None
logger.debug('Running subprocess finished with return code %s', return_code)
def _read_device_output(self) -> bytes:
return self._process.stdout.readline()
def _write_to_device(self, data: bytes) -> None:
self._process.stdin.write(data)
self._process.stdin.flush()
def _flush_device_output(self) -> None:
if self.is_device_running():
self._process.stdout.flush()
def is_device_running(self) -> bool:
return self._device_run.is_set() and self._is_binary_running()
def _is_binary_running(self) -> bool:
if self._process is None or self._process.poll() is not None:
return False
return True
def is_device_connected(self) -> bool:
"""Return true if device is connected."""
return self.is_device_running() and self._device_connected.is_set()
def _clear_internal_resources(self) -> None:
super()._clear_internal_resources()
self._process = None
class NativeSimulatorAdapter(BinaryAdapterBase):
"""Simulator adapter to run `zephyr.exe` simulation"""
def generate_command(self) -> None:
"""Set command to run."""
self.command = [str(self.device_config.build_dir / 'zephyr' / 'zephyr.exe')]
class UnitSimulatorAdapter(BinaryAdapterBase):
"""Simulator adapter to run unit tests"""
def generate_command(self) -> None:
"""Set command to run."""
self.command = [str(self.device_config.build_dir / 'testbinary')]
class CustomSimulatorAdapter(BinaryAdapterBase):
def generate_command(self) -> None:
"""Set command to run."""
self.command = [self.west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']