blob: 558e206adf731177cde9e91d249db02a24a1ec12 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import abc
import asyncio
import asyncio.subprocess
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from asyncio.base_subprocess import BaseSubprocessTransport
from datetime import datetime
from functools import wraps
from pathlib import Path
from queue import Queue
from typing import Generator
import psutil
from twister_harness.constants import END_OF_DATA
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
# Workaround for RuntimeError: Event loop is closed
# https://pythonalgos.com/runtimeerror-event-loop-is-closed-asyncio-fix/
def silence_event_loop_closed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise
return wrapper
BaseSubprocessTransport.__del__ = silence_event_loop_closed(BaseSubprocessTransport.__del__) # type: ignore
logger = logging.getLogger(__name__)
class SimulatorAdapterBase(DeviceAbstract, abc.ABC):
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
"""
:param twister_config: twister configuration
"""
super().__init__(device_config, **kwargs)
self._process: asyncio.subprocess.Process | None = None
self._process_ended_with_timeout: bool = False
self.queue: Queue = Queue()
self._stop_job: bool = False
self._exc: Exception | None = None #: store any exception which appeared running this thread
self._thread: threading.Thread | None = None
self.command: list[str] = []
self.process_kwargs: dict = {
'stdout': asyncio.subprocess.PIPE,
'stderr': asyncio.subprocess.STDOUT,
'stdin': asyncio.subprocess.PIPE,
'env': self.env,
}
self._data_to_send: bytes | None = None
def connect(self, timeout: float = 1) -> None:
pass # pragma: no cover
def flash_and_run(self, timeout: float = 60.0) -> None:
if not self.command:
msg = 'Run simulation command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
self._thread = threading.Thread(target=self._run_simulation, args=(timeout,), daemon=True)
self._thread.start()
# Give a time to start subprocess before test is executed
time.sleep(0.1)
# Check if subprocess (simulation) has started without errors
if self._exc is not None:
logger.error('Simulation failed due to an exception: %s', self._exc)
raise self._exc
def _run_simulation(self, timeout: float) -> None:
log_command(logger, 'Running command', self.command, level=logging.INFO)
try:
return_code: int = asyncio.run(self._run_command(timeout=timeout))
except subprocess.SubprocessError as e:
logger.error('Running simulation failed due to subprocess error %s', e)
self._exc = TwisterHarnessException(e.args)
except FileNotFoundError as e:
logger.error(f'Running simulation failed due to file not found: {e.filename}')
self._exc = TwisterHarnessException(f'File not found: {e.filename}')
except Exception as e:
logger.error('Running simulation failed: %s', e)
self._exc = TwisterHarnessException(e.args)
else:
if return_code == 0:
logger.info('Running simulation finished with return code %s', return_code)
elif return_code == -15:
logger.info('Running simulation stopped interrupted by user')
else:
logger.warning('Running simulation finished with return code %s', return_code)
finally:
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
async def _run_command(self, timeout: float = 60.):
assert isinstance(self.command, (list, tuple, set))
# to avoid stupid and difficult to debug mistakes
# we are using asyncio to run subprocess to be able to read from stdout
# without blocking while loop (readline with timeout)
self._process = await asyncio.create_subprocess_exec(
*self.command,
**self.process_kwargs
)
logger.debug('Started subprocess with PID %s', self._process.pid)
end_time = time.time() + timeout
while not self._stop_job and not self._process.stdout.at_eof(): # type: ignore[union-attr]
if line := await self._read_line(timeout=0.1):
self.queue.put(line.decode('utf-8').strip())
if time.time() > end_time:
self._process_ended_with_timeout = True
logger.info(f'Finished process with PID {self._process.pid} after {timeout} seconds timeout')
break
if self._data_to_send:
self._process.stdin.write(self._data_to_send) # type: ignore[union-attr]
await self._process.stdin.drain() # type: ignore[union-attr]
self._data_to_send = None
self.queue.put(END_OF_DATA) # indicate to the other threads that there will be no more data in queue
return await self._process.wait()
async def _read_line(self, timeout=0.1) -> bytes | None:
try:
return await asyncio.wait_for(self._process.stdout.readline(), timeout=timeout) # type: ignore[union-attr]
except asyncio.TimeoutError:
return None
def disconnect(self):
pass # pragma: no cover
def stop(self) -> None:
"""Stop device."""
self._stop_job = True
time.sleep(0.1) # give a time to end while loop in running simulation
if self._process is not None and self._process.returncode is None:
logger.debug('Stopping all running processes for PID %s', self._process.pid)
# kill subprocess if it is still running
for child in psutil.Process(self._process.pid).children(recursive=True):
try:
os.kill(child.pid, signal.SIGTERM)
except ProcessLookupError:
pass
# kill subprocess if it is still running
os.kill(self._process.pid, signal.SIGTERM)
if self._thread is not None:
self._thread.join(timeout=1) # Should end immediately, but just in case we set timeout for 1 sec
if self._exc:
raise self._exc
@property
def iter_stdout(self) -> Generator[str, None, None]:
"""Return output from serial."""
while True:
stream = self.queue.get()
if stream == END_OF_DATA:
logger.debug('No more data from running process')
break
self.handler_log_file.handle(data=stream + '\n')
yield stream
self.queue.task_done()
def write(self, data: bytes) -> None:
"""Write data to serial"""
while self._data_to_send:
# wait data will be write to self._process.stdin.write
time.sleep(0.1)
self._data_to_send = data
def initialize_log_files(self):
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
class NativeSimulatorAdapter(SimulatorAdapterBase):
"""Simulator adapter to run `zephyr.exe` simulation"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [
str((Path(self.device_config.build_dir) / 'zephyr' / 'zephyr.exe').resolve())
]
class UnitSimulatorAdapter(SimulatorAdapterBase):
"""Simulator adapter to run unit tests"""
def generate_command(self) -> None:
"""Return command to run."""
self.command = [str((Path(self.device_config.build_dir) / 'testbinary').resolve())]
class CustomSimulatorAdapter(SimulatorAdapterBase):
def generate_command(self) -> None:
"""Return command to run."""
if (west := shutil.which('west')) is None:
logger.error('west not found')
self.command = []
else:
self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']