blob: e0d34f9f0120886df181601a5252e64095448de1 [file] [log] [blame]
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
import pty
import re
import shutil
import subprocess
from datetime import datetime
from typing import Generator
import serial
from twister_harness.device.device_abstract import DeviceAbstract
from twister_harness.exceptions import TwisterHarnessException
from twister_harness.helper import log_command
from twister_harness.log_files.log_file import DeviceLogFile, HandlerLogFile
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class HardwareAdapter(DeviceAbstract):
"""Adapter class for real device."""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
super().__init__(device_config, **kwargs)
self.connection: serial.Serial | None = None
self.command: list[str] = []
self.process_kwargs: dict = {
'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'env': self.env,
}
self.serial_pty_proc: subprocess.Popen | None = None
def connect(self, timeout: float = 1) -> None:
"""
Open serial connection.
:param timeout: Read timeout value in seconds
"""
if self.connection:
# already opened
return
if self.device_config.pre_script:
self.run_custom_script(self.device_config.pre_script, 30)
serial_name = self._open_serial_pty() or self.device_config.serial
logger.info('Opening serial connection for %s', serial_name)
try:
self.connection = serial.Serial(
serial_name,
baudrate=self.device_config.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=timeout
)
except serial.SerialException as e:
logger.exception('Cannot open connection: %s', e)
self._close_serial_pty()
raise
self.connection.flush()
def disconnect(self) -> None:
"""Close serial connection."""
if self.connection:
serial_name = self.connection.port
self.connection.close()
self.connection = None
logger.info('Closed serial connection for %s', serial_name)
self._close_serial_pty()
def stop(self) -> None:
if self.device_config.post_script:
self.run_custom_script(self.device_config.post_script, 30)
def _open_serial_pty(self) -> str | None:
"""Open a pty pair, run process and return tty name"""
if not self.device_config.serial_pty:
return None
master, slave = pty.openpty()
try:
self.serial_pty_proc = subprocess.Popen(
re.split(',| ', self.device_config.serial_pty),
stdout=master,
stdin=master,
stderr=master
)
except subprocess.CalledProcessError as e:
logger.exception('Failed to run subprocess %s, error %s',
self.device_config.serial_pty, str(e))
raise
return os.ttyname(slave)
def _close_serial_pty(self) -> None:
"""Terminate the process opened for serial pty script"""
if self.serial_pty_proc:
self.serial_pty_proc.terminate()
self.serial_pty_proc.communicate()
logger.info('Process %s terminated', self.device_config.serial_pty)
self.serial_pty_proc = None
def generate_command(self) -> None:
"""Return command to flash."""
west = shutil.which('west')
if west is None:
raise TwisterHarnessException('west not found')
command = [
west,
'flash',
'--skip-rebuild',
'--build-dir', str(self.device_config.build_dir),
]
command_extra_args = []
if self.device_config.west_flash_extra_args:
command_extra_args.extend(self.device_config.west_flash_extra_args)
if runner := self.device_config.runner:
command.extend(['--runner', runner])
if board_id := self.device_config.id:
if runner == 'pyocd':
command_extra_args.append('--board-id')
command_extra_args.append(board_id)
elif runner == 'nrfjprog':
command_extra_args.append('--dev-id')
command_extra_args.append(board_id)
elif runner == 'openocd' and self.device_config.product in ['STM32 STLink', 'STLINK-V3']:
command_extra_args.append('--cmd-pre-init')
command_extra_args.append(f'hla_serial {board_id}')
elif runner == 'openocd' and self.device_config.product == 'EDBG CMSIS-DAP':
command_extra_args.append('--cmd-pre-init')
command_extra_args.append(f'cmsis_dap_serial {board_id}')
elif runner == 'jlink':
command.append(f'--tool-opt=-SelectEmuBySN {board_id}')
elif runner == 'stm32cubeprogrammer':
command.append(f'--tool-opt=sn={board_id}')
if command_extra_args:
command.append('--')
command.extend(command_extra_args)
self.command = command
@staticmethod
def run_custom_script(script, timeout):
with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
try:
stdout, stderr = proc.communicate(timeout=timeout)
logger.debug(stdout.decode())
if proc.returncode != 0:
logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
logger.error("{} timed out".format(script))
def flash_and_run(self, timeout: float = 60.0) -> None:
if not self.command:
msg = 'Flash command is empty, please verify if it was generated properly.'
logger.error(msg)
raise TwisterHarnessException(msg)
if self.device_config.id:
logger.info('Flashing device %s', self.device_config.id)
log_command(logger, 'Flashing command', self.command, level=logging.INFO)
try:
process = subprocess.Popen(
self.command,
**self.process_kwargs
)
except subprocess.CalledProcessError:
logger.error('Error while flashing device')
raise TwisterHarnessException('Could not flash device')
else:
stdout = stderr = None
try:
stdout, stderr = process.communicate(timeout=self.device_config.flashing_timeout)
except subprocess.TimeoutExpired:
process.kill()
finally:
if stdout:
self.device_log_file.handle(data=stdout)
logger.debug(stdout.decode(errors='ignore'))
if stderr:
self.device_log_file.handle(data=stderr)
if process.returncode == 0:
logger.info('Flashing finished')
else:
raise TwisterHarnessException(f'Could not flash device {self.device_config.id}')
finally:
if self.device_config.post_flash_script:
self.run_custom_script(self.device_config.post_flash_script, 30)
@property
def iter_stdout(self) -> Generator[str, None, None]:
"""Return output from serial."""
if not self.connection:
return
self.connection.flush()
self.connection.reset_input_buffer()
while self.connection and self.connection.is_open:
stream = self.connection.readline()
self.handler_log_file.handle(data=stream)
yield stream.decode(errors='ignore').strip()
def write(self, data: bytes) -> None:
"""Write data to serial"""
if self.connection:
self.connection.write(data)
def initialize_log_files(self) -> None:
self.handler_log_file = HandlerLogFile.create(build_dir=self.device_config.build_dir)
self.device_log_file = DeviceLogFile.create(build_dir=self.device_config.build_dir)
start_msg = f'\n==== Logging started at {datetime.now()} ====\n'
self.handler_log_file.handle(start_msg)
self.device_log_file.handle(start_msg)