blob: 558e206adf731177cde9e91d249db02a24a1ec12 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import abc
import asyncio
import asyncio.subprocess
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from asyncio.base_subprocess import BaseSubprocessTransport
from datetime import datetime
from functools import wraps
from pathlib import Path
from queue import Queue
from typing import Generator
import psutil
from twister_harness.constants import END_OF_DATA
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
# Workaround for RuntimeError: Event loop is closed
def silence_event_loop_closed(func):
def wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
return wrapper
BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__) # type: ignore
logger = logging.getLogger(__name__)
class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
:param twister_config: twister configuration
super().__init__(device_config, **kwargs)
self._process: asyncio.subprocess.Process | None = None
self._process_ended_with_timeout: bool = False
self.queue: Queue = Queue()
self._stop_job: bool = False
self._exc: Exception | None = None #: store any exception which appeared running this thread
self._thread: threading.Thread | None = None
self.command: list[str] = []
self.process_kwargs: dict = {
'stdout': asyncio.subprocess.PIPE,
'stderr': asyncio.subprocess.STDOUT,
'stdin': asyncio.subprocess.PIPE,
'env': self.env,
self._data_to_send: bytes | None = None
def connect(self, timeout: float = 1) -> None:
pass # pragma: no cover
def flash_and_run(self, timeout: float = 60.0) -> None:
if not self.command:
msg = 'Run simulation command is empty, please verify if it was generated properly.'
raise TwisterHarnessException(msg)
self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True)
# Give a time to start subprocess before test is executed
# Check if subprocess (simulation) has started without errors
if self._exc is not None:
logger.error('Simulation failed due to an exception: %s', self._exc)
raise self._exc
def _run_simulation(self, timeout: float) -> None:
log_command(logger, 'Running command', self.command, level=logging.INFO)
return_code: int =
except subprocess.SubprocessError as e:
logger.error('Running simulation failed due to subprocess error %s', e)
self._exc = TwisterHarnessException(e.args)
except FileNotFoundError as e:
logger.error(f'Running simulation failed due to file not found: {e.filename}')
self._exc = TwisterHarnessException(f'File not found: {e.filename}')
except Exception as e:
logger.error('Running simulation failed: %s', e)
self._exc = TwisterHarnessException(e.args)
if return_code == 0:'Running simulation finished with return code %s', return_code)
elif return_code == -15:'Running simulation stopped interrupted by user')
logger.warning('Running simulation finished with return code %s', return_code)
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
async def _run_command(self, timeout: float = 60.):
assert isinstance(self.command, (list, tuple, set))
# to avoid stupid and difficult to debug mistakes
# we are using asyncio to run subprocess to be able to read from stdout
# without blocking while loop (readline with timeout)
self._process = await asyncio.create_subprocess_exec(
logger.debug('Started subprocess with PID %s',
end_time = time.time() + timeout
while not self._stop_job and not self._process.stdout.at_eof(): # type: ignore[union-attr]
if line := await self._read_line(timeout=0.1):
if time.time() > end_time:
self._process_ended_with_timeout = True'Finished process with PID {} after {timeout} seconds timeout')
if self._data_to_send:
self._process.stdin.write(self._data_to_send) # type: ignore[union-attr]
await self._process.stdin.drain() # type: ignore[union-attr]
self._data_to_send = None
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
return await self._process.wait()
async def _read_line(self, timeout=0.1) -> bytes | None:
return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout) # type: ignore[union-attr]
except asyncio.TimeoutError:
return None
def disconnect(self):
pass # pragma: no cover
def stop(self) -> None:
"""Stop device."""
self._stop_job = True
time.sleep(0.1) # give a time to end while loop in running simulation
if self._process is not None and self._process.returncode is None:
logger.debug('Stopping all running processes for PID %s',
# kill subprocess if it is still running
for child in psutil.Process(
os.kill(, signal.SIGTERM)
except ProcessLookupError:
# kill subprocess if it is still running
os.kill(, signal.SIGTERM)
if self._thread is not None:
self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
if self._exc:
raise self._exc
def iter_stdout(self) -> Generator[str, None, None]:
"""Return output from serial."""
while True:
stream = self.queue.get()
if stream == END_OF_DATA:
logger.debug('No more data from running process')
self.handler_log_file.handle(data=stream + '\n')
yield stream
def write(self, data: bytes) -> None:
"""Write data to serial"""
while self._data_to_send:
# wait data will be write to self._process.stdin.write
self._data_to_send = data
def initialize_log_files(self):
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {} ====\n'
class NativeSimulatorAdapter(SimulatorAdapterBase):
"""Simulator adapter to run `zephyr.exe` simulation"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [
str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve())
class UnitSimulatorAdapter(SimulatorAdapterBase):
"""Simulator adapter to run unit tests"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())]
class CustomSimulatorAdapter(SimulatorAdapterBase):
def generate_command(self) -> None:
"""Return command to run."""
if (west := shutil.which('west')) is None:
logger.error('west not found')
self.command = []
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']