blob: cae17bc46e9aeec1f7f4b03c2f57cce489ace796 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import time
from twister_harness.device.fifo_handler import FifoHandler
from twister_harness.device.binary_adapter import BinaryAdapterBase
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class QemuAdapter(BinaryAdapterBase):
def __init__(self, device_config: DeviceConfig) -> None:
super().__init__(device_config)
qemu_fifo_file_path = self.device_config.build_dir / 'qemu-fifo'
self._fifo_connection: FifoHandler = FifoHandler(qemu_fifo_file_path, self.base_timeout)
def generate_command(self) -> None:
"""Set command to run."""
self.command = [self.west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']
if 'stdin' in self.process_kwargs:
self.process_kwargs.pop('stdin')
def _flash_and_run(self) -> None:
super()._flash_and_run()
self._create_fifo_connection()
def _create_fifo_connection(self) -> None:
self._fifo_connection.initiate_connection()
timeout_time: float = time.time() + self.base_timeout
while time.time() < timeout_time and self._is_binary_running():
if self._fifo_connection.is_open:
return
time.sleep(0.1)
msg = 'Cannot establish communication with QEMU device.'
logger.error(msg)
raise TwisterHarnessException(msg)
def _stop_subprocess(self) -> None:
super()._stop_subprocess()
self._fifo_connection.disconnect()
def _read_device_output(self) -> bytes:
try:
output = self._fifo_connection.readline()
except (OSError, ValueError):
# emulation was probably finished and thus fifo file was closed too
output = b''
return output
def _write_to_device(self, data: bytes) -> None:
self._fifo_connection.write(data)
self._fifo_connection.flush_write()
def _flush_device_output(self) -> None:
if self.is_device_running():
self._fifo_connection.flush_read()
def is_device_connected(self) -> bool:
"""Return true if device is connected."""
return bool(super().is_device_connected() and self._fifo_connection.is_open)