# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from datetime import datetime
from pathlib import Path
from queue import Empty, Queue
from typing import Generator

import psutil

from twister_harness.constants import QEMU_FIFO_FILE_NAME
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.device.fifo_handler import FifoHandler
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig

logger = logging.getLogger(__name__)


class QemuAdapter(DeviceAbstract):
    """Adapter for Qemu simulator"""

    def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
        super().__init__(device_config, **kwargs)
        self._process: subprocess.Popen | None = None
        self._process_ended_with_timeout: bool = False
        self._exc: Exception | None = None  #: store any exception which appeared running this thread
        self._thread: threading.Thread | None = None
        self._emulation_was_finished: bool = False
        self.connection = FifoHandler(Path(self.device_config.build_dir).joinpath(QEMU_FIFO_FILE_NAME))
        self.command: list[str] = []
        self.timeout: float = 60  # running timeout in seconds
        self.booting_timeout_in_ms: int = 10_000  #: wait time for booting Qemu in milliseconds

    def generate_command(self) -> None:
        """Return command to flash."""
        if (west := shutil.which('west')) is None:
            logger.error('west not found')
            self.command = []
        else:
            self.command = [west, 'build', '-d', str(self.device_config.build_dir), '-t', 'run']

    def connect(self, timeout: float = 1) -> None:
        logger.debug('Opening connection')
        self.connection.connect()

    def flash_and_run(self, timeout: float = 60.0) -> None:
        self.timeout = timeout
        if not self.command:
            msg = 'Run simulation command is empty, please verify if it was generated properly.'
            logger.error(msg)
            raise TwisterHarnessException(msg)

        self._thread = threading.Thread(target=self._run_command, args=(self.timeout,), daemon=True)
        self._thread.start()
        # Give a time to start subprocess before test is executed
        time.sleep(0.1)
        # Check if subprocess (simulation) has started without errors
        if self._exc is not None:
            logger.error('Simulation failed due to an exception: %s', self._exc)
            raise self._exc

    def _run_command(self, timeout: float) -> None:
        log_command(logger, 'Running command', self.command, level=logging.INFO)
        try:
            self._process = subprocess.Popen(
                self.command,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                env=self.env
            )
            stdout, _ = self._process.communicate(timeout=timeout)
            return_code: int = self._process.returncode
        except subprocess.TimeoutExpired:
            logger.error('Running simulation finished after timeout: %s seconds', timeout)
            self._process_ended_with_timeout = True
            # we don't want to raise Timeout exception, but allowed a test to parse the output
            # and set proper status
        except subprocess.SubprocessError as e:
            logger.error('Running simulation failed due to subprocess error %s', e)
            self._exc = TwisterHarnessException(e.args)
        except FileNotFoundError as e:
            logger.error(f'Running simulation failed due to file not found: {e.filename}')
            self._exc = TwisterHarnessException(f'File not found: {e.filename}')
        except Exception as e:
            logger.error('Running simulation failed: %s', e)
            self._exc = TwisterHarnessException(e.args)
        else:
            if return_code == 0:
                logger.info('Running simulation finished with return code %s', return_code)
            elif return_code == -15:
                logger.info('Running simulation terminated')
            else:
                logger.warning('Running simulation finished with return code %s', return_code)
                for line in stdout.decode('utf-8').split('\n'):
                    logger.info(line)
        finally:
            self._emulation_was_finished = True

    def disconnect(self):
        logger.debug('Closing connection')
        self.connection.disconnect()

    def stop(self) -> None:
        """Stop device."""
        time.sleep(0.1)  # give a time to end while loop in running simulation
        if self._process is not None and self._process.returncode is None:
            logger.debug('Stopping all running processes for PID %s', self._process.pid)
            # kill all child subprocesses
            for child in psutil.Process(self._process.pid).children(recursive=True):
                try:
                    os.kill(child.pid, signal.SIGTERM)
                except ProcessLookupError:
                    pass
            # kill subprocess if it is still running
            os.kill(self._process.pid, signal.SIGTERM)
        if self._thread is not None:
            self._thread.join(timeout=1)  # Should end immediately, but just in case we set timeout for 1 sec
        if self._exc:
            raise self._exc

    def _wait_for_fifo(self):
        for _ in range(int(self.booting_timeout_in_ms / 10) or 1):
            if self.connection.is_open:
                break
            elif self._emulation_was_finished:
                msg = 'Problem with starting QEMU'
                logger.error(msg)
                raise TwisterHarnessException(msg)
            time.sleep(0.1)
        else:
            msg = 'Problem with starting QEMU - fifo file was not created yet'
            logger.error(msg)
            raise TwisterHarnessException(msg)

    @property
    def iter_stdout(self) -> Generator[str, None, None]:
        if not self.connection:
            return
        # fifo file can be not create yet, so we need to wait for a while
        self._wait_for_fifo()

        # create unblocking reading from fifo file
        q: Queue = Queue()

        def read_lines():
            while self.connection and self.connection.is_open:
                try:
                    line = self.connection.readline().decode('UTF-8').strip()
                except (OSError, ValueError):
                    # file could be closed already so we should stop reading
                    break
                if len(line) != 0:
                    q.put(line)

        t = threading.Thread(target=read_lines, daemon=True)
        t.start()

        end_time = time.time() + self.timeout
        try:
            while True:
                try:
                    stream = q.get(timeout=0.1)
                    self.handler_log_file.handle(data=stream + '\n')
                    yield stream
                except Empty:  # timeout appeared
                    pass
                if time.time() > end_time:
                    break
        except KeyboardInterrupt:
            # let thread to finish smoothly
            pass
        finally:
            t.join(1)

    def write(self, data: bytes) -> None:
        """Write data to serial"""
        if self.connection:
            self.connection.write(data)

    def initialize_log_files(self):
        self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
        start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
        self.handler_log_file.handle(start_msg)
