|  | #!/usr/bin/env python3 | 
|  | # Copyright (c) 2023 Intel Corporation | 
|  | # Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. | 
|  | # | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  | """ | 
|  | Tests for handlers.py classes' methods | 
|  | """ | 
|  |  | 
|  | import itertools | 
|  | from unittest import mock | 
|  | import os | 
|  | import pytest | 
|  | import signal | 
|  | import subprocess | 
|  | import sys | 
|  |  | 
|  | from contextlib import nullcontext | 
|  | from importlib import reload | 
|  | from serial import SerialException | 
|  | from subprocess import CalledProcessError, TimeoutExpired | 
|  | from types import SimpleNamespace | 
|  |  | 
|  | import twisterlib.harness | 
|  |  | 
|  | ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") | 
|  |  | 
|  | from twisterlib.error import TwisterException | 
|  | from twisterlib.statuses import TwisterStatus | 
|  | from twisterlib.handlers import ( | 
|  | Handler, | 
|  | BinaryHandler, | 
|  | DeviceHandler, | 
|  | QEMUHandler, | 
|  | SimulationHandler | 
|  | ) | 
|  | from twisterlib.hardwaremap import ( | 
|  | DUT | 
|  | ) | 
|  |  | 
|  | @pytest.fixture | 
|  | def mocked_instance(tmp_path): | 
|  | instance = mock.Mock() | 
|  |  | 
|  | testsuite = mock.Mock() | 
|  | type(testsuite).source_dir = mock.PropertyMock(return_value='') | 
|  | instance.testsuite = testsuite | 
|  |  | 
|  | build_dir = tmp_path / 'build_dir' | 
|  | os.makedirs(build_dir) | 
|  | type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir)) | 
|  |  | 
|  | platform = mock.Mock() | 
|  | type(platform).binaries = mock.PropertyMock(return_value=[]) | 
|  | instance.platform = platform | 
|  |  | 
|  | type(instance.testsuite).timeout = mock.PropertyMock(return_value=60) | 
|  | type(instance.platform).timeout_multiplier = mock.PropertyMock( | 
|  | return_value=2 | 
|  | ) | 
|  |  | 
|  | instance.status = TwisterStatus.NONE | 
|  | instance.reason = None | 
|  |  | 
|  | return instance | 
|  |  | 
|  |  | 
|  | @pytest.fixture | 
|  | def faux_timer(): | 
|  | class Counter: | 
|  | def __init__(self): | 
|  | self.t = 0 | 
|  |  | 
|  | def time(self): | 
|  | self.t += 1 | 
|  | return self.t | 
|  |  | 
|  | return Counter() | 
|  |  | 
|  |  | 
|  | TESTDATA_1 = [ | 
|  | (True, False, 'posix', ['Install pyserial python module with pip to use' \ | 
|  | ' --device-testing option.'], None), | 
|  | (False, True, 'nt', [], None), | 
|  | (True, True, 'posix', ['Install pyserial python module with pip to use' \ | 
|  | ' --device-testing option.'], ImportError), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'fail_serial, fail_pty, os_name, expected_outs, expected_error', | 
|  | TESTDATA_1, | 
|  | ids=['import serial', 'import pty nt', 'import serial+pty posix'] | 
|  | ) | 
|  | def test_imports( | 
|  | capfd, | 
|  | fail_serial, | 
|  | fail_pty, | 
|  | os_name, | 
|  | expected_outs, | 
|  | expected_error | 
|  | ): | 
|  | class ImportRaiser: | 
|  | def find_spec(self, fullname, path, target=None): | 
|  | if fullname == 'serial' and fail_serial: | 
|  | raise ImportError() | 
|  | if fullname == 'pty' and fail_pty: | 
|  | raise ImportError() | 
|  |  | 
|  | modules_mock = sys.modules.copy() | 
|  | modules_mock['serial'] = None if fail_serial else modules_mock['serial'] | 
|  | modules_mock['pty'] = None if fail_pty else modules_mock['pty'] | 
|  |  | 
|  | meta_path_mock = sys.meta_path[:] | 
|  | meta_path_mock.insert(0, ImportRaiser()) | 
|  |  | 
|  | with mock.patch('os.name', os_name), \ | 
|  | mock.patch.dict('sys.modules', modules_mock, clear=True), \ | 
|  | mock.patch('sys.meta_path', meta_path_mock), \ | 
|  | pytest.raises(expected_error) if expected_error else nullcontext(): | 
|  | reload(twisterlib.handlers) | 
|  |  | 
|  | out, _ = capfd.readouterr() | 
|  | assert all([expected_out in out for expected_out in expected_outs]) | 
|  |  | 
|  |  | 
|  | def test_handler_final_handle_actions(mocked_instance): | 
|  | instance = mocked_instance | 
|  | instance.testcases = [mock.Mock()] | 
|  |  | 
|  | handler = Handler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.suite_name_check = True | 
|  |  | 
|  | harness = twisterlib.harness.Test() | 
|  | harness.status = TwisterStatus.NONE | 
|  | harness.detected_suite_names = mock.Mock() | 
|  | harness.matched_run_id = False | 
|  | harness.run_id_exists = True | 
|  | harness.recording = mock.Mock() | 
|  |  | 
|  | handler._final_handle_actions(harness) | 
|  |  | 
|  | assert handler.instance.status == TwisterStatus.FAIL | 
|  | assert handler.instance.reason == 'RunID mismatch' | 
|  | assert all(testcase.status == TwisterStatus.FAIL for \ | 
|  | testcase in handler.instance.testcases) | 
|  |  | 
|  | handler.instance.reason = 'This reason shan\'t be changed.' | 
|  | handler._final_handle_actions(harness) | 
|  |  | 
|  | instance.assert_has_calls([mock.call.record(harness.recording)]) | 
|  |  | 
|  | assert handler.instance.reason == 'This reason shan\'t be changed.' | 
|  |  | 
|  |  | 
|  | TESTDATA_2 = [ | 
|  | (['dummy_testsuite_name'], False), | 
|  | ([], True), | 
|  | (['another_dummy_name', 'yet_another_dummy_name'], True), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'detected_suite_names, should_be_called', | 
|  | TESTDATA_2, | 
|  | ids=['detected one expected', 'detected none', 'detected two unexpected'] | 
|  | ) | 
|  | def test_handler_verify_ztest_suite_name( | 
|  | mocked_instance, | 
|  | detected_suite_names, | 
|  | should_be_called | 
|  | ): | 
|  | instance = mocked_instance | 
|  | type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name'] | 
|  |  | 
|  | harness_status = TwisterStatus.PASS | 
|  |  | 
|  | with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked: | 
|  | handler = Handler(instance, 'build', mock.Mock()) | 
|  | handler._verify_ztest_suite_name( | 
|  | harness_status, | 
|  | detected_suite_names, | 
|  | ) | 
|  |  | 
|  | if should_be_called: | 
|  | _missing_mocked.assert_called_once() | 
|  | else: | 
|  | _missing_mocked.assert_not_called() | 
|  |  | 
|  |  | 
|  | def test_handler_missing_suite_name(mocked_instance): | 
|  | instance = mocked_instance | 
|  | instance.testcases = [mock.Mock()] | 
|  |  | 
|  | handler = Handler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.suite_name_check = True | 
|  |  | 
|  | expected_suite_names = ['dummy_testsuite_name'] | 
|  |  | 
|  | handler._missing_suite_name(expected_suite_names) | 
|  |  | 
|  | assert handler.instance.status == TwisterStatus.FAIL | 
|  | assert handler.instance.reason == 'Testsuite mismatch' | 
|  | assert all( | 
|  | testcase.status == TwisterStatus.FAIL for testcase in handler.instance.testcases | 
|  | ) | 
|  |  | 
|  |  | 
|  | def test_handler_terminate(mocked_instance): | 
|  | def mock_kill_function(pid, sig): | 
|  | if pid < 0: | 
|  | raise ProcessLookupError | 
|  |  | 
|  | instance = mocked_instance | 
|  |  | 
|  | handler = Handler(instance, 'build', mock.Mock()) | 
|  |  | 
|  | mock_process = mock.Mock() | 
|  | mock_child1 = mock.Mock(pid=1) | 
|  | mock_child2 = mock.Mock(pid=2) | 
|  | mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) | 
|  |  | 
|  | mock_proc = mock.Mock(pid=0) | 
|  | mock_proc.terminate = mock.Mock(return_value=None) | 
|  | mock_proc.kill = mock.Mock(return_value=None) | 
|  |  | 
|  | with mock.patch('psutil.Process', return_value=mock_process), \ | 
|  | mock.patch( | 
|  | 'os.kill', | 
|  | mock.Mock(side_effect=mock_kill_function) | 
|  | ) as mock_kill: | 
|  | handler.terminate(mock_proc) | 
|  |  | 
|  | assert handler.terminated | 
|  | mock_proc.terminate.assert_called_once() | 
|  | mock_proc.kill.assert_called_once() | 
|  | mock_kill.assert_has_calls( | 
|  | [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] | 
|  | ) | 
|  |  | 
|  | mock_child_neg1 = mock.Mock(pid=-1) | 
|  | mock_process.children = mock.Mock( | 
|  | return_value=[mock_child_neg1, mock_child2] | 
|  | ) | 
|  | handler.terminated = False | 
|  | mock_kill.reset_mock() | 
|  |  | 
|  | handler.terminate(mock_proc) | 
|  |  | 
|  | mock_kill.assert_has_calls( | 
|  | [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] | 
|  | ) | 
|  |  | 
|  |  | 
|  | def test_binaryhandler_try_kill_process_by_pid(mocked_instance): | 
|  | def mock_kill_function(pid, sig): | 
|  | if pid < 0: | 
|  | raise ProcessLookupError | 
|  |  | 
|  | instance = mocked_instance | 
|  |  | 
|  | handler = BinaryHandler(instance, 'build', mock.Mock()) | 
|  | handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') | 
|  |  | 
|  | with mock.patch( | 
|  | 'os.kill', | 
|  | mock.Mock(side_effect=mock_kill_function) | 
|  | ) as mock_kill, \ | 
|  | mock.patch('os.unlink', mock.Mock()) as mock_unlink: | 
|  | with mock.patch('builtins.open', mock.mock_open(read_data='1')): | 
|  | handler.try_kill_process_by_pid() | 
|  |  | 
|  | mock_unlink.assert_called_once_with( | 
|  | os.path.join('dummy', 'path', 'to', 'pid.pid') | 
|  | ) | 
|  | mock_kill.assert_called_once_with(1, signal.SIGKILL) | 
|  |  | 
|  | mock_unlink.reset_mock() | 
|  | mock_kill.reset_mock() | 
|  | handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') | 
|  |  | 
|  | with mock.patch('builtins.open', mock.mock_open(read_data='-1')): | 
|  | handler.try_kill_process_by_pid() | 
|  |  | 
|  | mock_unlink.assert_called_once_with( | 
|  | os.path.join('dummy', 'path', 'to', 'pid.pid') | 
|  | ) | 
|  | mock_kill.assert_called_once_with(-1, signal.SIGKILL) | 
|  |  | 
|  |  | 
|  | TESTDATA_3 = [ | 
|  | ( | 
|  | [b'This\\r\\n\n', b'is\r', b'some \x1B[31mANSI\x1B[39m in\n', b'a short\n', b'file.'], | 
|  | mock.Mock(status=TwisterStatus.NONE, capture_coverage=False), | 
|  | [ | 
|  | mock.call('This\\r\\n\n'), | 
|  | mock.call('is\r'), | 
|  | mock.call('some ANSI in\n'), | 
|  | mock.call('a short\n'), | 
|  | mock.call('file.') | 
|  | ], | 
|  | [ | 
|  | mock.call('This'), | 
|  | mock.call('is'), | 
|  | mock.call('some \x1B[31mANSI\x1B[39m in'), | 
|  | mock.call('a short'), | 
|  | mock.call('file.') | 
|  | ], | 
|  | None, | 
|  | False | 
|  | ), | 
|  | ( | 
|  | [b'Too much.'] * 120,  # Should be more than the timeout | 
|  | mock.Mock(status=TwisterStatus.PASS, capture_coverage=False), | 
|  | None, | 
|  | None, | 
|  | True, | 
|  | False | 
|  | ), | 
|  | ( | 
|  | [b'Too much.'] * 120,  # Should be more than the timeout | 
|  | mock.Mock(status=TwisterStatus.PASS, capture_coverage=False), | 
|  | None, | 
|  | None, | 
|  | True, | 
|  | False | 
|  | ), | 
|  | ( | 
|  | [b'Too much.'] * 120,  # Should be more than the timeout | 
|  | mock.Mock(status=TwisterStatus.PASS, capture_coverage=True), | 
|  | None, | 
|  | None, | 
|  | False, | 
|  | True | 
|  | ), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'proc_stdout, harness, expected_handler_calls,' | 
|  | ' expected_harness_calls, should_be_less, timeout_wait', | 
|  | TESTDATA_3, | 
|  | ids=[ | 
|  | 'no timeout', | 
|  | 'timeout', | 
|  | 'timeout with harness status', | 
|  | 'timeout with capture_coverage, wait timeout' | 
|  | ] | 
|  | ) | 
|  | def test_binaryhandler_output_handler( | 
|  | mocked_instance, | 
|  | faux_timer, | 
|  | proc_stdout, | 
|  | harness, | 
|  | expected_handler_calls, | 
|  | expected_harness_calls, | 
|  | should_be_less, | 
|  | timeout_wait | 
|  | ): | 
|  | class MockStdout(mock.Mock): | 
|  | def __init__(self, text): | 
|  | super().__init__(text) | 
|  | self.text = text | 
|  | self.line_index = 0 | 
|  |  | 
|  | def readline(self): | 
|  | if self.line_index == len(self.text): | 
|  | self.line_index = 0 | 
|  | return b'' | 
|  | else: | 
|  | line = self.text[self.line_index] | 
|  | self.line_index += 1 | 
|  | return line | 
|  |  | 
|  | class MockProc(mock.Mock): | 
|  | def __init__(self, pid, stdout): | 
|  | super().__init__(pid, stdout) | 
|  | self.pid = mock.PropertyMock(return_value=pid) | 
|  | self.stdout = MockStdout(stdout) | 
|  |  | 
|  | def wait(self, *args, **kwargs): | 
|  | if timeout_wait: | 
|  | raise TimeoutExpired('dummy cmd', 'dummyamount') | 
|  |  | 
|  | handler = BinaryHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) | 
|  | handler.terminate = mock.Mock() | 
|  |  | 
|  | proc = MockProc(1, proc_stdout) | 
|  |  | 
|  | with mock.patch( | 
|  | 'builtins.open', | 
|  | mock.mock_open(read_data='') | 
|  | ) as mock_file, \ | 
|  | mock.patch('time.time', side_effect=faux_timer.time): | 
|  | handler._output_handler(proc, harness) | 
|  |  | 
|  | mock_file.assert_called_with(handler.log, 'w') | 
|  |  | 
|  | if expected_handler_calls: | 
|  | mock_file.return_value.write.assert_has_calls(expected_handler_calls) | 
|  | if expected_harness_calls: | 
|  | harness.handle.assert_has_calls(expected_harness_calls) | 
|  | if should_be_less is not None: | 
|  | if should_be_less: | 
|  | assert mock_file.return_value.write.call_count < len(proc_stdout) | 
|  | else: | 
|  | assert mock_file.return_value.write.call_count == len(proc_stdout) | 
|  | if timeout_wait: | 
|  | handler.terminate.assert_called_once_with(proc) | 
|  |  | 
|  |  | 
|  | TESTDATA_4 = [ | 
|  | (True, False, True, None, None, | 
|  | ['valgrind', '--error-exitcode=2', '--leak-check=full', | 
|  | f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp', | 
|  | '--log-file=build_dir/valgrind.log', '--track-origins=yes', | 
|  | 'generator']), | 
|  | (False, True, False, 123, None, ['generator', '-C', 'build_dir', 'run', '--seed=123']), | 
|  | (False, False, False, None, ['ex1', 'ex2'], ['build_dir/zephyr/zephyr.exe', 'ex1', 'ex2']), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'robot_test, call_make_run, enable_valgrind, seed,' \ | 
|  | ' extra_args, expected', | 
|  | TESTDATA_4, | 
|  | ids=['robot, valgrind', 'make run, seed', 'binary, extra'] | 
|  | ) | 
|  | def test_binaryhandler_create_command( | 
|  | mocked_instance, | 
|  | robot_test, | 
|  | call_make_run, | 
|  | enable_valgrind, | 
|  | seed, | 
|  | extra_args, | 
|  | expected | 
|  | ): | 
|  | options = SimpleNamespace() | 
|  | options.enable_valgrind = enable_valgrind | 
|  | options.coverage_basedir = "coverage_basedir" | 
|  | options.sim_name = None | 
|  | handler = BinaryHandler(mocked_instance, 'build', options, 'generator', False) | 
|  | handler.binary = 'bin' | 
|  | handler.call_make_run = call_make_run | 
|  | handler.seed = seed | 
|  | handler.extra_test_args = extra_args | 
|  | handler.build_dir = 'build_dir' | 
|  | handler.instance.sysbuild = False | 
|  | handler.platform = SimpleNamespace() | 
|  | handler.platform.resc = "file.resc" | 
|  | handler.platform.uart = "uart" | 
|  |  | 
|  | command = handler._create_command(robot_test) | 
|  |  | 
|  | assert command == expected | 
|  |  | 
|  |  | 
|  | TESTDATA_5 = [ | 
|  | (False, False, False), | 
|  | (True, False, False), | 
|  | (True, True, False), | 
|  | (False, False, True), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'enable_asan, enable_lsan, enable_ubsan', | 
|  | TESTDATA_5, | 
|  | ids=['none', 'asan', 'asan, lsan', 'ubsan'] | 
|  | ) | 
|  | def test_binaryhandler_create_env( | 
|  | mocked_instance, | 
|  | enable_asan, | 
|  | enable_lsan, | 
|  | enable_ubsan | 
|  | ): | 
|  | handler = BinaryHandler(mocked_instance, 'build', mock.Mock( | 
|  | enable_asan=enable_asan, | 
|  | enable_lsan=enable_lsan, | 
|  | enable_ubsan=enable_ubsan | 
|  | )) | 
|  |  | 
|  | env = { | 
|  | 'example_env_var': True, | 
|  | 'ASAN_OPTIONS': 'dummy=dummy:', | 
|  | 'UBSAN_OPTIONS': 'dummy=dummy:' | 
|  | } | 
|  |  | 
|  | with mock.patch('os.environ', env): | 
|  | res = handler._create_env() | 
|  |  | 
|  | assert env['example_env_var'] == res['example_env_var'] | 
|  |  | 
|  | if enable_ubsan: | 
|  | assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS'] | 
|  | assert 'log_path=stdout:' in res['UBSAN_OPTIONS'] | 
|  | assert 'halt_on_error=1:' in res['UBSAN_OPTIONS'] | 
|  |  | 
|  | if enable_asan: | 
|  | assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS'] | 
|  | assert 'log_path=stdout:' in res['ASAN_OPTIONS'] | 
|  |  | 
|  | if not enable_lsan: | 
|  | assert 'detect_leaks=0' in res['ASAN_OPTIONS'] | 
|  |  | 
|  |  | 
|  | TESTDATA_6 = [ | 
|  | (TwisterStatus.NONE, False, 2, True, TwisterStatus.FAIL, 'Valgrind error', False), | 
|  | (TwisterStatus.NONE, False, 1, False, TwisterStatus.FAIL, 'Exited with 1', False), | 
|  | (TwisterStatus.FAIL, False, 0, False, TwisterStatus.FAIL, "foobar", False), | 
|  | ('success', False, 0, False, 'success', None, False), | 
|  | (TwisterStatus.NONE, True, 1, True, TwisterStatus.FAIL, 'Exited with 1', True), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'harness_status, terminated, returncode, enable_valgrind,' \ | 
|  | ' expected_status, expected_reason, do_add_missing', | 
|  | TESTDATA_6, | 
|  | ids=['valgrind error', 'failed', 'harness failed', 'custom success', 'no status'] | 
|  | ) | 
|  | def test_binaryhandler_update_instance_info( | 
|  | mocked_instance, | 
|  | harness_status, | 
|  | terminated, | 
|  | returncode, | 
|  | enable_valgrind, | 
|  | expected_status, | 
|  | expected_reason, | 
|  | do_add_missing | 
|  | ): | 
|  | handler = BinaryHandler(mocked_instance, 'build', mock.Mock( | 
|  | enable_valgrind=enable_valgrind | 
|  | )) | 
|  | handler.terminated = terminated | 
|  | handler.returncode = returncode | 
|  | missing_mock = mock.Mock() | 
|  | handler.instance.add_missing_case_status = missing_mock | 
|  | mocked_harness = mock.Mock(status=harness_status, reason="foobar") | 
|  |  | 
|  | handler._update_instance_info(mocked_harness) | 
|  |  | 
|  | assert handler.instance.status == expected_status | 
|  | assert handler.instance.reason == expected_reason | 
|  |  | 
|  | if do_add_missing: | 
|  | missing_mock.assert_called_once_with(TwisterStatus.BLOCK, expected_reason) | 
|  |  | 
|  |  | 
|  | TESTDATA_7 = [ | 
|  | (True, False, False), | 
|  | (False, True, False), | 
|  | (False, False, True), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'is_robot_test, coverage, isatty', | 
|  | TESTDATA_7, | 
|  | ids=['robot test', 'coverage', 'isatty'] | 
|  | ) | 
|  | def test_binaryhandler_handle( | 
|  | mocked_instance, | 
|  | caplog, | 
|  | is_robot_test, | 
|  | coverage, | 
|  | isatty | 
|  | ): | 
|  | thread_mock_obj = mock.Mock() | 
|  |  | 
|  | def mock_popen(command, *args, **kwargs,): | 
|  | return mock.Mock( | 
|  | __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)), | 
|  | __exit__=mock.Mock(return_value=None) | 
|  | ) | 
|  |  | 
|  | def mock_thread(target, *args, **kwargs): | 
|  | return thread_mock_obj | 
|  |  | 
|  | handler = BinaryHandler(mocked_instance, 'build', mock.Mock(coverage=coverage)) | 
|  | handler.sourcedir = 'source_dir' | 
|  | handler.build_dir = 'build_dir' | 
|  | handler.name= 'Dummy Name' | 
|  | handler._create_command = mock.Mock(return_value=['dummy' , 'command']) | 
|  | handler._create_env = mock.Mock(return_value=[]) | 
|  | handler._update_instance_info = mock.Mock() | 
|  | handler._final_handle_actions = mock.Mock() | 
|  | handler.terminate = mock.Mock() | 
|  | handler.try_kill_process_by_pid = mock.Mock() | 
|  |  | 
|  | robot_mock = mock.Mock() | 
|  | harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock) | 
|  |  | 
|  | popen_mock = mock.Mock(side_effect=mock_popen) | 
|  | thread_mock = mock.Mock(side_effect=mock_thread) | 
|  | call_mock = mock.Mock() | 
|  |  | 
|  | with mock.patch('subprocess.call', call_mock), \ | 
|  | mock.patch('subprocess.Popen', popen_mock), \ | 
|  | mock.patch('threading.Thread', thread_mock), \ | 
|  | mock.patch('sys.stdout.isatty', return_value=isatty): | 
|  | handler.handle(harness) | 
|  |  | 
|  | if is_robot_test: | 
|  | robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY) | 
|  | return | 
|  |  | 
|  | assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text | 
|  |  | 
|  | thread_mock_obj.join.assert_called() | 
|  | handler._update_instance_info.assert_called_once() | 
|  | handler._final_handle_actions.assert_called_once() | 
|  |  | 
|  | if isatty: | 
|  | call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY) | 
|  |  | 
|  |  | 
|  | TESTDATA_8 = [ | 
|  | ('renode', True, True, False, False), | 
|  | ('native', False, False, False, True), | 
|  | ('build', False, True, False, False), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready', | 
|  | TESTDATA_8, | 
|  | ids=[t[0] for t in TESTDATA_8] | 
|  | ) | 
|  | def test_simulationhandler_init( | 
|  | mocked_instance, | 
|  | type_str, | 
|  | is_pid_fn, | 
|  | expected_call_make_run, | 
|  | is_binary, | 
|  | expected_ready | 
|  | ): | 
|  | handler = SimulationHandler(mocked_instance, type_str, mock.Mock()) | 
|  |  | 
|  | assert handler.call_make_run == expected_call_make_run | 
|  | assert handler.ready == expected_ready | 
|  |  | 
|  | if is_pid_fn: | 
|  | assert handler.pid_fn == os.path.join(mocked_instance.build_dir, | 
|  | 'renode.pid') | 
|  | if is_binary: | 
|  | assert handler.pid_fn == os.path.join(mocked_instance.build_dir, | 
|  | 'zephyr', 'zephyr.exe') | 
|  |  | 
|  |  | 
|  | TESTDATA_9 = [ | 
|  | (3, 2, 0, 0, 3, -1, True, False, False, 1), | 
|  | (4, 1, 0, 0, -1, -1, False, True, False, 0), | 
|  | (5, 0, 1, 2, -1, 4, False, False, True, 3) | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'success_count, in_waiting_count, oserror_count, readline_error_count,' | 
|  | ' haltless_count, statusless_count, end_by_halt, end_by_close,' | 
|  | ' end_by_status, expected_line_count', | 
|  | TESTDATA_9, | 
|  | ids=[ | 
|  | 'halt event', | 
|  | 'serial closes', | 
|  | 'harness status with errors' | 
|  | ] | 
|  | ) | 
|  | def test_devicehandler_monitor_serial( | 
|  | mocked_instance, | 
|  | success_count, | 
|  | in_waiting_count, | 
|  | oserror_count, | 
|  | readline_error_count, | 
|  | haltless_count, | 
|  | statusless_count, | 
|  | end_by_halt, | 
|  | end_by_close, | 
|  | end_by_status, | 
|  | expected_line_count | 
|  | ): | 
|  | is_open_iter = iter(lambda: True, False) | 
|  | line_iter = [ | 
|  | TypeError('dummy TypeError') if x % 2 else \ | 
|  | SerialException('dummy SerialException') for x in range( | 
|  | readline_error_count | 
|  | ) | 
|  | ] + [ | 
|  | f'line no {idx}'.encode('utf-8') for idx in range(success_count) | 
|  | ] | 
|  | in_waiting_iter = [False] * in_waiting_count + [ | 
|  | TypeError('dummy TypeError') | 
|  | ] if end_by_close else ( | 
|  | [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count | 
|  | ) + [True] * (success_count + readline_error_count) | 
|  |  | 
|  | is_set_iter = [False] * haltless_count + [True] \ | 
|  | if end_by_halt else iter(lambda: False, True) | 
|  |  | 
|  | status_iter = [TwisterStatus.NONE] * statusless_count + [TwisterStatus.PASS] \ | 
|  | if end_by_status else iter(lambda: TwisterStatus.NONE, TwisterStatus.PASS) | 
|  |  | 
|  | halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter)) | 
|  | ser = mock.Mock( | 
|  | isOpen=mock.Mock(side_effect=is_open_iter), | 
|  | readline=mock.Mock(side_effect=line_iter) | 
|  | ) | 
|  | type(ser).in_waiting = mock.PropertyMock( | 
|  | side_effect=in_waiting_iter, | 
|  | return_value=False | 
|  | ) | 
|  | harness = mock.Mock(capture_coverage=False) | 
|  | type(harness).status=mock.PropertyMock(side_effect=status_iter) | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock(enable_coverage=not end_by_status)) | 
|  |  | 
|  | with mock.patch('builtins.open', mock.mock_open(read_data='')): | 
|  | handler.monitor_serial(ser, halt_event, harness) | 
|  |  | 
|  | if not end_by_close: | 
|  | ser.close.assert_called_once() | 
|  |  | 
|  | print(harness.call_args_list) | 
|  |  | 
|  | harness.handle.assert_has_calls( | 
|  | [mock.call(f'line no {idx}') for idx in range(expected_line_count)] | 
|  | ) | 
|  |  | 
|  |  | 
|  | def test_devicehandler_monitor_serial_splitlines(mocked_instance): | 
|  | halt_event = mock.Mock(is_set=mock.Mock(return_value=False)) | 
|  | ser = mock.Mock( | 
|  | isOpen=mock.Mock(side_effect=[True, True, False]), | 
|  | in_waiting=mock.Mock(return_value=False), | 
|  | readline=mock.Mock(return_value='\nline1\nline2\n'.encode('utf-8')) | 
|  | ) | 
|  | harness = mock.Mock(status=TwisterStatus.PASS) | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock(enable_coverage=False)) | 
|  |  | 
|  | with mock.patch('builtins.open', mock.mock_open(read_data='')): | 
|  | handler.monitor_serial(ser, halt_event, harness) | 
|  |  | 
|  | assert harness.handle.call_count == 2 | 
|  |  | 
|  |  | 
|  | TESTDATA_10 = [ | 
|  | ( | 
|  | 'dummy_platform', | 
|  | 'dummy fixture', | 
|  | [ | 
|  | mock.Mock( | 
|  | fixtures=[], | 
|  | platform='dummy_platform', | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='another_platform', | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=None, | 
|  | serial=None, | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=mock.Mock(), | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=mock.Mock(), | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ) | 
|  | ], | 
|  | 3 | 
|  | ), | 
|  | ( | 
|  | 'dummy_platform', | 
|  | 'dummy fixture', | 
|  | [ | 
|  | mock.Mock( | 
|  | fixtures=[], | 
|  | platform='dummy_platform', | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='another_platform', | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=None, | 
|  | serial=None, | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=mock.Mock(), | 
|  | available=1, | 
|  | failures=1, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=mock.Mock(), | 
|  | available=1, | 
|  | failures=0, | 
|  | counter_increment=mock.Mock(), | 
|  | counter=0 | 
|  | ) | 
|  | ], | 
|  | 4 | 
|  | ), | 
|  | ( | 
|  | 'dummy_platform', | 
|  | 'dummy fixture', | 
|  | [], | 
|  | TwisterException | 
|  | ), | 
|  | ( | 
|  | 'dummy_platform', | 
|  | 'dummy fixture', | 
|  | [ | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=mock.Mock(), | 
|  | counter_increment=mock.Mock(), | 
|  | failures=0, | 
|  | available=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['another fixture'], | 
|  | platform='dummy_platform', | 
|  | serial_pty=mock.Mock(), | 
|  | counter_increment=mock.Mock(), | 
|  | failures=0, | 
|  | available=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['dummy fixture'], | 
|  | platform='dummy_platform', | 
|  | serial=mock.Mock(), | 
|  | counter_increment=mock.Mock(), | 
|  | failures=0, | 
|  | available=0 | 
|  | ), | 
|  | mock.Mock( | 
|  | fixtures=['another fixture'], | 
|  | platform='dummy_platform', | 
|  | serial=mock.Mock(), | 
|  | counter_increment=mock.Mock(), | 
|  | failures=0, | 
|  | available=0 | 
|  | ) | 
|  | ], | 
|  | None | 
|  | ) | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'platform_name, fixture, duts, expected', | 
|  | TESTDATA_10, | 
|  | ids=['two good duts, select the first one', | 
|  | 'two duts, the first was failed once, select the second not failed', | 
|  | 'exception - no duts', 'no available duts'] | 
|  | ) | 
|  | def test_devicehandler_device_is_available( | 
|  | mocked_instance, | 
|  | platform_name, | 
|  | fixture, | 
|  | duts, | 
|  | expected | 
|  | ): | 
|  | mocked_instance.platform.name = platform_name | 
|  | mocked_instance.testsuite.harness_config = {'fixture': fixture} | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.duts = duts | 
|  |  | 
|  | if isinstance(expected, int): | 
|  | device = handler.device_is_available(mocked_instance) | 
|  |  | 
|  | assert device == duts[expected] | 
|  | assert device.available == 0 | 
|  | device.counter_increment.assert_called_once() | 
|  | elif expected is None: | 
|  | device = handler.device_is_available(mocked_instance) | 
|  |  | 
|  | assert device is None | 
|  | elif isinstance(expected, type): | 
|  | with pytest.raises(expected): | 
|  | device = handler.device_is_available(mocked_instance) | 
|  | else: | 
|  | assert False | 
|  |  | 
|  |  | 
|  | def test_devicehandler_make_dut_available(mocked_instance): | 
|  | serial = mock.Mock(name='dummy_serial') | 
|  | duts = [ | 
|  | mock.Mock(available=0, serial=serial, serial_pty=None), | 
|  | mock.Mock(available=0, serial=None, serial_pty=serial), | 
|  | mock.Mock( | 
|  | available=0, | 
|  | serial=mock.Mock('another_serial'), | 
|  | serial_pty=None | 
|  | ) | 
|  | ] | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.duts = duts | 
|  |  | 
|  | handler.make_dut_available(duts[1]) | 
|  |  | 
|  | assert len([None for d in handler.duts if d.available == 1]) == 1 | 
|  | assert handler.duts[0].available == 0 | 
|  | assert handler.duts[2].available == 0 | 
|  |  | 
|  | handler.make_dut_available(duts[0]) | 
|  |  | 
|  | assert len([None for d in handler.duts if d.available == 1]) == 2 | 
|  | assert handler.duts[2].available == 0 | 
|  |  | 
|  |  | 
|  | TESTDATA_11 = [ | 
|  | (mock.Mock(pid=0, returncode=0), False), | 
|  | (mock.Mock(pid=0, returncode=1), False), | 
|  | (mock.Mock(pid=0, returncode=1), True) | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'mock_process, raise_timeout', | 
|  | TESTDATA_11, | 
|  | ids=['proper script', 'error', 'timeout'] | 
|  | ) | 
|  | def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout): | 
|  | def raise_timeout_fn(timeout=-1): | 
|  | if raise_timeout and timeout != -1: | 
|  | raise subprocess.TimeoutExpired(None, timeout) | 
|  | else: | 
|  | return mock.Mock(), mock.Mock() | 
|  |  | 
|  | def assert_popen(command, *args, **kwargs): | 
|  | return mock.Mock( | 
|  | __enter__=mock.Mock(return_value=mock_process), | 
|  | __exit__=mock.Mock(return_value=None) | 
|  | ) | 
|  |  | 
|  | mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn) | 
|  |  | 
|  | script = [os.path.join('test','script', 'path'), 'arg'] | 
|  | timeout = 60 | 
|  |  | 
|  | with mock.patch('subprocess.Popen', side_effect=assert_popen): | 
|  | DeviceHandler.run_custom_script(script, timeout) | 
|  |  | 
|  | if raise_timeout: | 
|  | assert all( | 
|  | t in caplog.text.lower() for t in [str(script), 'timed out'] | 
|  | ) | 
|  | mock_process.assert_has_calls( | 
|  | [ | 
|  | mock.call.communicate(timeout=timeout), | 
|  | mock.call.kill(), | 
|  | mock.call.communicate() | 
|  | ] | 
|  | ) | 
|  | elif mock_process.returncode == 0: | 
|  | assert not any([r.levelname == 'ERROR' for r in caplog.records]) | 
|  | else: | 
|  | assert 'timed out' not in caplog.text.lower() | 
|  | assert 'custom script failure' in caplog.text.lower() | 
|  |  | 
|  |  | 
|  | TESTDATA_12 = [ | 
|  | (0, False), | 
|  | (4, False), | 
|  | (0, True) | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'num_of_failures, raise_exception', | 
|  | TESTDATA_12, | 
|  | ids=['no failures', 'with failures', 'exception'] | 
|  | ) | 
|  | def test_devicehandler_get_hardware( | 
|  | mocked_instance, | 
|  | caplog, | 
|  | num_of_failures, | 
|  | raise_exception | 
|  | ): | 
|  | expected_hardware = mock.Mock() | 
|  |  | 
|  | def mock_availability(handler, instance, no=num_of_failures): | 
|  | if raise_exception: | 
|  | raise TwisterException(f'dummy message') | 
|  | if handler.no: | 
|  | handler.no -= 1 | 
|  | return None | 
|  | return expected_hardware | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.no = num_of_failures | 
|  |  | 
|  | with mock.patch.object( | 
|  | DeviceHandler, | 
|  | 'device_is_available', | 
|  | mock_availability | 
|  | ): | 
|  | hardware = handler.get_hardware() | 
|  |  | 
|  | if raise_exception: | 
|  | assert 'dummy message' in caplog.text.lower() | 
|  | assert mocked_instance.status == TwisterStatus.FAIL | 
|  | assert mocked_instance.reason == 'dummy message' | 
|  | else: | 
|  | assert hardware == expected_hardware | 
|  |  | 
|  |  | 
|  | TESTDATA_13 = [ | 
|  | ( | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] | 
|  | ), | 
|  | ( | 
|  | [], | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] | 
|  | ), | 
|  | ( | 
|  | '--dummy', | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--', '--dummy'] | 
|  | ), | 
|  | ( | 
|  | '--dummy1,--dummy2,"--dummy, 3"', | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--', '--dummy1', '--dummy2', '--dummy, 3'] | 
|  | ), | 
|  |  | 
|  | ( | 
|  | None, | 
|  | 'runner', | 
|  | 'product', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'runner', 'param1', 'param2'] | 
|  | ), | 
|  |  | 
|  | ( | 
|  | None, | 
|  | 'pyocd', | 
|  | 'product', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | 'nrfjprog', | 
|  | 'product', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | 'openocd', | 
|  | 'STM32 STLink', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'openocd', 'param1', 'param2', | 
|  | '--', '--cmd-pre-init', 'hla_serial 12345'] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | 'openocd', | 
|  | 'STLINK-V3', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'openocd', 'param1', 'param2', | 
|  | '--', '--cmd-pre-init', 'hla_serial 12345'] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | 'openocd', | 
|  | 'EDBG CMSIS-DAP', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'openocd', 'param1', 'param2', | 
|  | '--', '--cmd-pre-init', 'cmsis_dap_serial 12345'] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | 'jlink', | 
|  | 'product', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'jlink', '--dev-id', 12345, | 
|  | 'param1', 'param2'] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | 'stm32cubeprogrammer', | 
|  | 'product', | 
|  | None, | 
|  | ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', | 
|  | '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345', | 
|  | 'param1', 'param2'] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | 'flash_command', | 
|  | ['flash_command', '--build-dir', '$build_dir', '--board-id', 12345] | 
|  | ), | 
|  | ( | 
|  | None, | 
|  | None, | 
|  | None, | 
|  | 'path to/flash_command,with,args,"1, 2, 3",4', | 
|  | ['path to/flash_command', '--build-dir', '$build_dir', '--board-id', | 
|  | 12345, 'with', 'args', '1, 2, 3', '4'] | 
|  | ), | 
|  | ] | 
|  |  | 
|  | TESTDATA_13_2 = [(True), (False)] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'self_west_flash, runner,' \ | 
|  | ' hardware_product_name, self_flash_command, expected', | 
|  | TESTDATA_13, | 
|  | ids=['default', '--west-flash', 'one west flash value', | 
|  | 'multiple west flash values', 'generic runner', 'pyocd', | 
|  | 'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3', | 
|  | 'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer', | 
|  | 'flash_command', 'flash_command with args'] | 
|  | ) | 
|  | @pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id']) | 
|  | def test_devicehandler_create_command( | 
|  | mocked_instance, | 
|  | self_west_flash, | 
|  | runner, | 
|  | hardware_probe, | 
|  | hardware_product_name, | 
|  | self_flash_command, | 
|  | expected | 
|  | ): | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.options = mock.Mock(west_flash=self_west_flash, | 
|  | flash_command=self_flash_command, verbose=0) | 
|  | handler.generator_cmd = 'generator_cmd' | 
|  |  | 
|  | expected = [handler.build_dir if val == '$build_dir' else \ | 
|  | val for val in expected] | 
|  |  | 
|  | hardware = mock.Mock( | 
|  | product=hardware_product_name, | 
|  | probe_id=12345 if hardware_probe else None, | 
|  | id=12345 if not hardware_probe else None, | 
|  | runner_params=['param1', 'param2'] | 
|  | ) | 
|  |  | 
|  | command = handler._create_command(runner, hardware) | 
|  |  | 
|  | assert command == expected | 
|  |  | 
|  |  | 
|  | TESTDATA_14 = [ | 
|  | ('success', Handler.FailureType.NONE, 'success', None, False), | 
|  | (TwisterStatus.FAIL, Handler.FailureType.NONE, TwisterStatus.FAIL, | 
|  | "foobar", True), | 
|  | (TwisterStatus.ERROR, Handler.FailureType.NONE, TwisterStatus.ERROR, 'foobar', True), | 
|  | (TwisterStatus.NONE, Handler.FailureType.NONE, TwisterStatus.FAIL, 'Unknown Error', True), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'harness_status, failure_type,' \ | 
|  | ' expected_status, expected_reason, do_add_missing', | 
|  | TESTDATA_14, | 
|  | ids=['custom success', 'failed', 'error', 'no status'] | 
|  | ) | 
|  | def test_devicehandler_update_instance_info( | 
|  | mocked_instance, | 
|  | harness_status, | 
|  | failure_type, | 
|  | expected_status, | 
|  | expected_reason, | 
|  | do_add_missing | 
|  | ): | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  | missing_mock = mock.Mock() | 
|  | handler.instance.add_missing_case_status = missing_mock | 
|  | mocked_harness = mock.Mock(status=harness_status, reason="foobar") | 
|  |  | 
|  | handler._update_instance_info(mocked_harness, failure_type=failure_type) | 
|  |  | 
|  |  | 
|  | assert handler.instance.status == expected_status | 
|  | assert handler.instance.reason == expected_reason | 
|  |  | 
|  | if do_add_missing: | 
|  | missing_mock.assert_called_with('blocked', expected_reason) | 
|  |  | 
|  |  | 
|  | TESTDATA_15 = [ | 
|  | ('dummy device', 'dummy pty', None, None, True, False, False), | 
|  | ( | 
|  | 'dummy device', | 
|  | 'dummy pty', | 
|  | mock.Mock(communicate=mock.Mock(return_value=('', ''))), | 
|  | SerialException, | 
|  | False, | 
|  | True, | 
|  | 'dummy pty' | 
|  | ), | 
|  | ( | 
|  | 'dummy device', | 
|  | None, | 
|  | None, | 
|  | SerialException, | 
|  | False, | 
|  | False, | 
|  | 'dummy device' | 
|  | ) | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'serial_device, serial_pty, ser_pty_process, expected_exception,' \ | 
|  | ' expected_result, terminate_ser_pty_process, make_available', | 
|  | TESTDATA_15, | 
|  | ids=['valid', 'serial pty process', 'no serial pty'] | 
|  | ) | 
|  | def test_devicehandler_create_serial_connection( | 
|  | mocked_instance, | 
|  | serial_device, | 
|  | serial_pty, | 
|  | ser_pty_process, | 
|  | expected_exception, | 
|  | expected_result, | 
|  | terminate_ser_pty_process, | 
|  | make_available | 
|  | ): | 
|  | def mock_serial(*args, **kwargs): | 
|  | if expected_exception: | 
|  | raise expected_exception('') | 
|  | return expected_result | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) | 
|  | missing_mock = mock.Mock() | 
|  | handler.instance.add_missing_case_status = missing_mock | 
|  | twisterlib.handlers.terminate_process = mock.Mock() | 
|  |  | 
|  | dut = DUT() | 
|  | dut.available = 0 | 
|  | dut.failures = 0 | 
|  | handler.duts = [dut] | 
|  |  | 
|  | hardware_baud = 14400 | 
|  | flash_timeout = 60 | 
|  | serial_mock = mock.Mock(side_effect=mock_serial) | 
|  |  | 
|  | with mock.patch('serial.Serial', serial_mock), \ | 
|  | pytest.raises(expected_exception) if expected_exception else \ | 
|  | nullcontext(): | 
|  | result = handler._create_serial_connection(dut, serial_device, hardware_baud, | 
|  | flash_timeout, serial_pty, | 
|  | ser_pty_process) | 
|  |  | 
|  | if expected_result: | 
|  | assert result is not None | 
|  | assert dut.failures == 0 | 
|  |  | 
|  | if expected_exception: | 
|  | assert handler.instance.status == TwisterStatus.FAIL | 
|  | assert handler.instance.reason == 'Serial Device Error' | 
|  | assert dut.available == 1 | 
|  | assert dut.failures == 1 | 
|  | missing_mock.assert_called_once_with('blocked', 'Serial Device Error') | 
|  |  | 
|  | if terminate_ser_pty_process: | 
|  | twisterlib.handlers.terminate_process.assert_called_once() | 
|  | ser_pty_process.communicate.assert_called_once() | 
|  |  | 
|  |  | 
|  | TESTDATA_16 = [ | 
|  | ('dummy1 dummy2', None), | 
|  | ('dummy1,dummy2', CalledProcessError), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'serial_pty, popen_exception', | 
|  | TESTDATA_16, | 
|  | ids=['pty', 'pty process error'] | 
|  | ) | 
|  | def test_devicehandler_start_serial_pty( | 
|  | mocked_instance, | 
|  | serial_pty, | 
|  | popen_exception | 
|  | ): | 
|  | def mock_popen(command, *args, **kwargs): | 
|  | assert command == ['dummy1', 'dummy2'] | 
|  | if popen_exception: | 
|  | raise popen_exception(command, 'Dummy error') | 
|  | return mock.Mock() | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  |  | 
|  | popen_mock = mock.Mock(side_effect=mock_popen) | 
|  |  | 
|  | with mock.patch('subprocess.Popen', popen_mock): | 
|  | result = handler._start_serial_pty(serial_pty, 'master') | 
|  |  | 
|  | if popen_exception: | 
|  | assert result is None | 
|  | else: | 
|  | assert result is not None | 
|  |  | 
|  | TESTDATA_17 = [ | 
|  | (False, False, False, False, None, False, False, | 
|  | TwisterStatus.NONE, None, []), | 
|  | (True, True, False, False, None, False, False, | 
|  | TwisterStatus.NONE, None, []), | 
|  | (True, False, True, False, None, False, False, | 
|  | TwisterStatus.ERROR, 'Device issue (Flash error)', []), | 
|  | (True, False, False, True, None, False, False, | 
|  | TwisterStatus.ERROR, 'Device issue (Timeout)', ['Flash operation timed out.']), | 
|  | (True, False, False, False, 1, False, False, | 
|  | TwisterStatus.ERROR, 'Device issue (Flash error?)', []), | 
|  | (True, False, False, False, 0, True, False, | 
|  | TwisterStatus.NONE, None, ['Timed out while monitoring serial output on IPName']), | 
|  | (True, False, False, False, 0, False, True, | 
|  | TwisterStatus.NONE, None, ["Terminating serial-pty:'Serial PTY'", | 
|  | "Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \ | 
|  | ' returncode, do_timeout_thread, use_pty,' \ | 
|  | ' expected_status, expected_reason, expected_logs', | 
|  | TESTDATA_17, | 
|  | ids=['no hardware', 'create serial failure', 'popen called process error', | 
|  | 'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev'] | 
|  | ) | 
|  | def test_devicehandler_handle( | 
|  | mocked_instance, | 
|  | caplog, | 
|  | has_hardware, | 
|  | raise_create_serial, | 
|  | raise_popen, | 
|  | raise_timeout, | 
|  | returncode, | 
|  | do_timeout_thread, | 
|  | use_pty, | 
|  | expected_status, | 
|  | expected_reason, | 
|  | expected_logs | 
|  | ): | 
|  | def mock_start_serial_pty(serial_pty, serial_pty_master): | 
|  | return mock.Mock( | 
|  | name='dummy serial PTY process', | 
|  | communicate=mock.Mock( | 
|  | return_value=('', '') | 
|  | ) | 
|  | ) | 
|  |  | 
|  | def mock_create_serial(*args, **kwargs): | 
|  | if raise_create_serial: | 
|  | raise SerialException('dummy cmd', 'dummy msg') | 
|  | return mock.Mock(name='dummy serial') | 
|  |  | 
|  | def mock_thread(*args, **kwargs): | 
|  | is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread)) | 
|  |  | 
|  | return mock.Mock(is_alive=is_alive_mock) | 
|  |  | 
|  | def mock_terminate(proc, *args, **kwargs): | 
|  | proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock())) | 
|  |  | 
|  | def mock_communicate(*args, **kwargs): | 
|  | if raise_timeout: | 
|  | raise TimeoutExpired('dummy cmd', 'dummyamount') | 
|  | return mock.Mock(), mock.Mock() | 
|  |  | 
|  | def mock_popen(command, *args, **kwargs): | 
|  | if raise_popen: | 
|  | raise CalledProcessError('dummy proc', 'dummy msg') | 
|  |  | 
|  | mock_process = mock.Mock( | 
|  | pid=1, | 
|  | returncode=returncode, | 
|  | communicate=mock.Mock(side_effect=mock_communicate) | 
|  | ) | 
|  |  | 
|  | return mock.Mock( | 
|  | __enter__=mock.Mock(return_value=mock_process), | 
|  | __exit__=mock.Mock(return_value=None) | 
|  | ) | 
|  |  | 
|  | hardware = None if not has_hardware else mock.Mock( | 
|  | baud=14400, | 
|  | runner='dummy runner', | 
|  | serial_pty='Serial PTY' if use_pty else None, | 
|  | serial='dummy serial', | 
|  | pre_script='dummy pre script', | 
|  | post_script='dummy post script', | 
|  | post_flash_script='dummy post flash script', | 
|  | flash_timeout=60, | 
|  | flash_with_test=True | 
|  | ) | 
|  |  | 
|  | handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.get_hardware = mock.Mock(return_value=hardware) | 
|  | handler.options = mock.Mock( | 
|  | timeout_multiplier=1, | 
|  | west_flash=None, | 
|  | west_runner=None, | 
|  | verbose=0 | 
|  | ) | 
|  | handler._start_serial_pty = mock.Mock(side_effect=mock_start_serial_pty) | 
|  | handler._create_command = mock.Mock(return_value=['dummy', 'command']) | 
|  | handler.run_custom_script = mock.Mock() | 
|  | handler._create_serial_connection = mock.Mock( | 
|  | side_effect=mock_create_serial | 
|  | ) | 
|  | handler.monitor_serial = mock.Mock() | 
|  | handler.terminate = mock.Mock(side_effect=mock_terminate) | 
|  | handler._update_instance_info = mock.Mock() | 
|  | handler._final_handle_actions = mock.Mock() | 
|  | handler.make_dut_available = mock.Mock() | 
|  | twisterlib.handlers.terminate_process = mock.Mock() | 
|  | handler.instance.platform.name = 'IPName' | 
|  |  | 
|  | harness = mock.Mock() | 
|  |  | 
|  | openpty_mock = mock.Mock(return_value=('master', 'slave')) | 
|  | ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') | 
|  |  | 
|  | with mock.patch('builtins.open', mock.mock_open(read_data='')), \ | 
|  | mock.patch('subprocess.Popen', side_effect=mock_popen), \ | 
|  | mock.patch('threading.Event', mock.Mock()), \ | 
|  | mock.patch('threading.Thread', side_effect=mock_thread), \ | 
|  | mock.patch('pty.openpty', openpty_mock), \ | 
|  | mock.patch('os.ttyname', ttyname_mock): | 
|  | handler.handle(harness) | 
|  |  | 
|  | handler.get_hardware.assert_called_once() | 
|  |  | 
|  | messages = [record.msg for record in caplog.records] | 
|  | assert all([msg in messages for msg in expected_logs]) | 
|  |  | 
|  | if not has_hardware: | 
|  | return | 
|  |  | 
|  | handler.run_custom_script.assert_has_calls([ | 
|  | mock.call('dummy pre script', mock.ANY) | 
|  | ]) | 
|  |  | 
|  | if raise_create_serial: | 
|  | return | 
|  |  | 
|  | handler.run_custom_script.assert_has_calls([ | 
|  | mock.call('dummy pre script', mock.ANY), | 
|  | mock.call('dummy post flash script', mock.ANY), | 
|  | mock.call('dummy post script', mock.ANY) | 
|  | ]) | 
|  |  | 
|  | if expected_reason: | 
|  | assert handler.instance.reason == expected_reason | 
|  | if expected_status: | 
|  | assert handler.instance.status == expected_status | 
|  |  | 
|  | handler.make_dut_available.assert_called_once_with(hardware) | 
|  |  | 
|  |  | 
|  | TESTDATA_18 = [ | 
|  | (True, True, True), | 
|  | (False, False, False), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', | 
|  | TESTDATA_18, | 
|  | ids=['ignore crash', 'qemu crash'] | 
|  | ) | 
|  | def test_qemuhandler_init( | 
|  | mocked_instance, | 
|  | ignore_qemu_crash, | 
|  | expected_ignore_crash, | 
|  | expected_ignore_unexpected_eof | 
|  | ): | 
|  | mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash | 
|  |  | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) | 
|  |  | 
|  | assert handler.ignore_crash == expected_ignore_crash | 
|  | assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof | 
|  |  | 
|  |  | 
|  | def test_qemuhandler_get_cpu_time(): | 
|  | def mock_process(pid): | 
|  | return mock.Mock( | 
|  | cpu_times=mock.Mock( | 
|  | return_value=mock.Mock( | 
|  | user=20.0, | 
|  | system=64.0 | 
|  | ) | 
|  | ) | 
|  | ) | 
|  |  | 
|  | with mock.patch('psutil.Process', mock_process): | 
|  | res = QEMUHandler._get_cpu_time(0) | 
|  |  | 
|  | assert res == pytest.approx(84.0) | 
|  |  | 
|  |  | 
|  | TESTDATA_19 = [ | 
|  | ( | 
|  | True, | 
|  | os.path.join('self', 'dummy_dir', '1'), | 
|  | mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')), | 
|  | os.path.join('dummy_dir', '1') | 
|  | ), | 
|  | ( | 
|  | False, | 
|  | os.path.join('self', 'dummy_dir', '2'), | 
|  | mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')), | 
|  | os.path.join('self', 'dummy_dir', '2') | 
|  | ), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'self_sysbuild, self_build_dir, build_dir, expected', | 
|  | TESTDATA_19, | 
|  | ids=['domains build dir', 'self build dir'] | 
|  | ) | 
|  | def test_qemuhandler_get_default_domain_build_dir( | 
|  | mocked_instance, | 
|  | self_sysbuild, | 
|  | self_build_dir, | 
|  | build_dir, | 
|  | expected | 
|  | ): | 
|  | get_default_domain_mock = mock.Mock() | 
|  | type(get_default_domain_mock()).build_dir = build_dir | 
|  | domains_mock = mock.Mock(get_default_domain=get_default_domain_mock) | 
|  | from_file_mock = mock.Mock(return_value=domains_mock) | 
|  |  | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.instance.sysbuild = self_sysbuild | 
|  | handler.build_dir = self_build_dir | 
|  |  | 
|  | with mock.patch('domains.Domains.from_file', from_file_mock): | 
|  | result = handler.get_default_domain_build_dir() | 
|  |  | 
|  | assert result == expected | 
|  |  | 
|  |  | 
|  | TESTDATA_20 = [ | 
|  | ( | 
|  | os.path.join('self', 'dummy_dir', 'log1'), | 
|  | os.path.join('self', 'dummy_dir', 'pid1'), | 
|  | os.path.join('sysbuild', 'dummy_dir', 'bd1'), | 
|  | True | 
|  | ), | 
|  | ( | 
|  | os.path.join('self', 'dummy_dir', 'log2'), | 
|  | os.path.join('self', 'dummy_dir', 'pid2'), | 
|  | os.path.join('sysbuild', 'dummy_dir', 'bd2'), | 
|  | False | 
|  | ), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', | 
|  | TESTDATA_20, | 
|  | ids=['pid exists', 'pid missing'] | 
|  | ) | 
|  | def test_qemuhandler_set_qemu_filenames( | 
|  | mocked_instance, | 
|  | self_log, | 
|  | self_pid_fn, | 
|  | sysbuild_build_dir, | 
|  | exists_pid_fn | 
|  | ): | 
|  | unlink_mock = mock.Mock() | 
|  | exists_mock = mock.Mock(return_value=exists_pid_fn) | 
|  |  | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.log = self_log | 
|  | handler.pid_fn = self_pid_fn | 
|  |  | 
|  | with mock.patch('os.unlink', unlink_mock), \ | 
|  | mock.patch('os.path.exists', exists_mock): | 
|  | handler._set_qemu_filenames(sysbuild_build_dir) | 
|  |  | 
|  | assert handler.fifo_fn == mocked_instance.build_dir + \ | 
|  | os.path.sep + 'qemu-fifo' | 
|  |  | 
|  | assert handler.pid_fn ==  sysbuild_build_dir + os.path.sep + 'qemu.pid' | 
|  |  | 
|  | assert handler.log_fn == self_log | 
|  |  | 
|  | if exists_pid_fn: | 
|  | unlink_mock.assert_called_once_with(sysbuild_build_dir + \ | 
|  | os.path.sep + 'qemu.pid') | 
|  |  | 
|  |  | 
|  | def test_qemuhandler_create_command(mocked_instance): | 
|  | sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') | 
|  |  | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock(), 'dummy_cmd', False) | 
|  |  | 
|  | result = handler._create_command(sysbuild_build_dir) | 
|  |  | 
|  | assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir', | 
|  | 'run'] | 
|  |  | 
|  |  | 
|  | TESTDATA_21 = [ | 
|  | ( | 
|  | 0, | 
|  | False, | 
|  | None, | 
|  | TwisterStatus.FAIL, | 
|  | Handler.FailureType.NONE, | 
|  | TwisterStatus.FAIL, | 
|  | "foobar", | 
|  | False | 
|  | ), | 
|  | ( | 
|  | 1, | 
|  | True, | 
|  | None, | 
|  | TwisterStatus.FAIL, | 
|  | Handler.FailureType.NONE, | 
|  | TwisterStatus.FAIL, | 
|  | "foobar", | 
|  | False | 
|  | ), | 
|  | ( | 
|  | 0, | 
|  | False, | 
|  | None, | 
|  | TwisterStatus.NONE, | 
|  | Handler.FailureType.TIMEOUT, | 
|  | TwisterStatus.FAIL, | 
|  | 'Timeout', | 
|  | True | 
|  | ), | 
|  | ( | 
|  | 1, | 
|  | False, | 
|  | None, | 
|  | TwisterStatus.NONE, | 
|  | Handler.FailureType.NONE, | 
|  | TwisterStatus.FAIL, | 
|  | 'Exited with 1', | 
|  | True | 
|  | ), | 
|  | ( | 
|  | 1, | 
|  | False, | 
|  | 'preexisting reason', | 
|  | 'good dummy status', | 
|  | Handler.FailureType.NONE, | 
|  | TwisterStatus.FAIL, | 
|  | 'preexisting reason', | 
|  | True | 
|  | ), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'self_returncode, self_ignore_qemu_crash,' \ | 
|  | ' self_instance_reason, harness_status, failure_type,' \ | 
|  | ' expected_status, expected_reason, expected_called_missing_case', | 
|  | TESTDATA_21, | 
|  | ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail'] | 
|  | ) | 
|  | def test_qemuhandler_update_instance_info( | 
|  | mocked_instance, | 
|  | self_returncode, | 
|  | self_ignore_qemu_crash, | 
|  | self_instance_reason, | 
|  | harness_status, | 
|  | failure_type, | 
|  | expected_status, | 
|  | expected_reason, | 
|  | expected_called_missing_case | 
|  | ): | 
|  | mocked_instance.add_missing_case_status = mock.Mock() | 
|  | mocked_instance.reason = self_instance_reason | 
|  | mocked_harness = mock.Mock(status=harness_status, reason="foobar") | 
|  |  | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) | 
|  | handler.returncode = self_returncode | 
|  | handler.ignore_crash = self_ignore_qemu_crash | 
|  |  | 
|  | handler._update_instance_info(mocked_harness, failure_type) | 
|  |  | 
|  | assert handler.instance.status == expected_status | 
|  | assert handler.instance.reason == expected_reason | 
|  |  | 
|  | if expected_called_missing_case: | 
|  | mocked_instance.add_missing_case_status.assert_called_once_with( | 
|  | TwisterStatus.BLOCK, expected_reason | 
|  | ) | 
|  |  | 
|  |  | 
|  | def test_qemuhandler_thread_get_fifo_names(): | 
|  | fifo_fn = 'dummy' | 
|  |  | 
|  | fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) | 
|  |  | 
|  | assert fifo_in ==  'dummy.in' | 
|  | assert fifo_out ==  'dummy.out' | 
|  |  | 
|  |  | 
|  | TESTDATA_24 = [ | 
|  | (TwisterStatus.FAIL, 'timeout', TwisterStatus.FAIL, 'timeout'), | 
|  | (TwisterStatus.FAIL, 'Execution error', TwisterStatus.FAIL, 'Execution error'), | 
|  | (TwisterStatus.FAIL, 'unexpected eof', TwisterStatus.FAIL, 'unexpected eof'), | 
|  | (TwisterStatus.FAIL, 'unexpected byte', TwisterStatus.FAIL, 'unexpected byte'), | 
|  | (TwisterStatus.NONE, None, TwisterStatus.NONE, 'Unknown Error'), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | '_status, _reason, expected_status, expected_reason', | 
|  | TESTDATA_24, | 
|  | ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown'] | 
|  | ) | 
|  | def test_qemuhandler_thread_update_instance_info( | 
|  | mocked_instance, | 
|  | _status, | 
|  | _reason, | 
|  | expected_status, | 
|  | expected_reason | 
|  | ): | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) | 
|  |  | 
|  | QEMUHandler._thread_update_instance_info(handler, _status, _reason) | 
|  |  | 
|  | assert handler.instance.status == expected_status | 
|  | assert handler.instance.reason == expected_reason | 
|  |  | 
|  |  | 
|  | TESTDATA_25 = [ | 
|  | ( | 
|  | ('1\n' * 60).encode('utf-8'), | 
|  | 60, | 
|  | 1, | 
|  | [TwisterStatus.NONE] * 60 + [TwisterStatus.PASS] * 6, | 
|  | 1000, | 
|  | False, | 
|  | TwisterStatus.FAIL, | 
|  | 'timeout', | 
|  | [mock.call('1\n'), mock.call('1\n')] | 
|  | ), | 
|  | ( | 
|  | ('1\n' * 60).encode('utf-8'), | 
|  | 60, | 
|  | -1, | 
|  | [TwisterStatus.NONE] * 60 + [TwisterStatus.PASS] * 30, | 
|  | 100, | 
|  | False, | 
|  | TwisterStatus.FAIL, | 
|  | None, | 
|  | [mock.call('1\n'), mock.call('1\n')] | 
|  | ), | 
|  | ( | 
|  | b'', | 
|  | 60, | 
|  | 1, | 
|  | [TwisterStatus.PASS] * 3, | 
|  | 100, | 
|  | False, | 
|  | TwisterStatus.FAIL, | 
|  | 'unexpected eof', | 
|  | [] | 
|  | ), | 
|  | ( | 
|  | b'\x81', | 
|  | 60, | 
|  | 1, | 
|  | [TwisterStatus.PASS] * 3, | 
|  | 100, | 
|  | False, | 
|  | TwisterStatus.FAIL, | 
|  | 'unexpected byte', | 
|  | [] | 
|  | ), | 
|  | ( | 
|  | '1\n2\n3\n4\n5\n'.encode('utf-8'), | 
|  | 600, | 
|  | 1, | 
|  | [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7, | 
|  | 100, | 
|  | False, | 
|  | TwisterStatus.PASS, | 
|  | None, | 
|  | [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] | 
|  | ), | 
|  | ( | 
|  | '1\n2\n3\n4\n5\n'.encode('utf-8'), | 
|  | 600, | 
|  | 0, | 
|  | [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7, | 
|  | 100, | 
|  | False, | 
|  | TwisterStatus.FAIL, | 
|  | 'timeout', | 
|  | [mock.call('1\n'), mock.call('2\n')] | 
|  | ), | 
|  | ( | 
|  | '1\n2\n3\n4\n5\n'.encode('utf-8'), | 
|  | 60, | 
|  | 1, | 
|  | [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7, | 
|  | (n for n in [100, 100, 10000]), | 
|  | True, | 
|  | TwisterStatus.PASS, | 
|  | None, | 
|  | [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] | 
|  | ), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'content, timeout, pid, harness_statuses, cputime, capture_coverage,' \ | 
|  | ' expected_status, expected_reason, expected_log_calls', | 
|  | TESTDATA_25, | 
|  | ids=[ | 
|  | 'timeout', | 
|  | 'harness failed', | 
|  | 'unexpected eof', | 
|  | 'unexpected byte', | 
|  | 'harness success', | 
|  | 'timeout by pid=0', | 
|  | 'capture_coverage' | 
|  | ] | 
|  | ) | 
|  | def test_qemuhandler_thread( | 
|  | mocked_instance, | 
|  | faux_timer, | 
|  | content, | 
|  | timeout, | 
|  | pid, | 
|  | harness_statuses, | 
|  | cputime, | 
|  | capture_coverage, | 
|  | expected_status, | 
|  | expected_reason, | 
|  | expected_log_calls | 
|  | ): | 
|  | def mock_cputime(pid): | 
|  | if pid > 0: | 
|  | return cputime if isinstance(cputime, int) else next(cputime) | 
|  | else: | 
|  | raise ProcessLookupError() | 
|  |  | 
|  | type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout) | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) | 
|  | handler.ignore_unexpected_eof = False | 
|  | handler.pid_fn = 'pid_fn' | 
|  | handler.fifo_fn = 'fifo_fn' | 
|  |  | 
|  | file_objs = {} | 
|  |  | 
|  | def mocked_open(filename, *args, **kwargs): | 
|  | if filename == handler.pid_fn: | 
|  | contents = str(pid).encode('utf-8') | 
|  | elif filename == handler.fifo_fn + '.out': | 
|  | contents = content | 
|  | else: | 
|  | contents = b'' | 
|  |  | 
|  | file_object = mock.mock_open(read_data=contents).return_value | 
|  | file_object.__iter__.return_value = contents.splitlines(True) | 
|  | if file_object not in file_objs: | 
|  | file_objs[filename] = file_object | 
|  |  | 
|  | return file_object | 
|  |  | 
|  | harness = mock.Mock(capture_coverage=capture_coverage, handle=print) | 
|  | type(harness).status = mock.PropertyMock(side_effect=harness_statuses) | 
|  |  | 
|  | p = mock.Mock() | 
|  | p.poll = mock.Mock( | 
|  | side_effect=itertools.cycle([True, True, True, True, False]) | 
|  | ) | 
|  |  | 
|  | mock_thread_get_fifo_names = mock.Mock( | 
|  | return_value=('fifo_fn.in', 'fifo_fn.out') | 
|  | ) | 
|  | mock_thread_update_instance_info = mock.Mock() | 
|  |  | 
|  | with mock.patch('time.time', side_effect=faux_timer.time), \ | 
|  | mock.patch('builtins.open', new=mocked_open), \ | 
|  | mock.patch('select.poll', return_value=p), \ | 
|  | mock.patch('os.path.exists', return_value=True), \ | 
|  | mock.patch('os.unlink', mock.Mock()), \ | 
|  | mock.patch('os.mkfifo', mock.Mock()), \ | 
|  | mock.patch('os.kill', mock.Mock()), \ | 
|  | mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time', | 
|  | mock_cputime), \ | 
|  | mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names', | 
|  | mock_thread_get_fifo_names), \ | 
|  | mock.patch('twisterlib.handlers.QEMUHandler.' \ | 
|  | '_thread_update_instance_info', | 
|  | mock_thread_update_instance_info): | 
|  | QEMUHandler._thread( | 
|  | handler, | 
|  | handler.get_test_timeout(), | 
|  | handler.build_dir, | 
|  | handler.log, | 
|  | handler.fifo_fn, | 
|  | handler.pid_fn, | 
|  | harness, | 
|  | handler.ignore_unexpected_eof | 
|  | ) | 
|  |  | 
|  | mock_thread_update_instance_info.assert_called_once_with( | 
|  | handler, | 
|  | expected_status, | 
|  | mock.ANY | 
|  | ) | 
|  |  | 
|  | file_objs[handler.log].write.assert_has_calls(expected_log_calls) | 
|  |  | 
|  |  | 
|  | TESTDATA_26 = [ | 
|  | (True, False, TwisterStatus.NONE, True, | 
|  | ['No timeout, return code from QEMU (1): 1', | 
|  | 'return code from QEMU (1): 1']), | 
|  | (False, True, TwisterStatus.PASS, True, ['return code from QEMU (1): 0']), | 
|  | (False, True, TwisterStatus.FAIL, False, ['return code from QEMU (None): 1']), | 
|  | ] | 
|  |  | 
|  | @pytest.mark.parametrize( | 
|  | 'isatty, do_timeout, harness_status, exists_pid_fn, expected_logs', | 
|  | TESTDATA_26, | 
|  | ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn'] | 
|  | ) | 
|  | def test_qemuhandler_handle( | 
|  | mocked_instance, | 
|  | caplog, | 
|  | tmp_path, | 
|  | isatty, | 
|  | do_timeout, | 
|  | harness_status, | 
|  | exists_pid_fn, | 
|  | expected_logs | 
|  | ): | 
|  | def mock_wait(*args, **kwargs): | 
|  | if do_timeout: | 
|  | raise TimeoutExpired('dummy cmd', 'dummyamount') | 
|  |  | 
|  | mock_process = mock.Mock(pid=0, returncode=1) | 
|  | mock_process.communicate = mock.Mock( | 
|  | return_value=(mock.Mock(), mock.Mock()) | 
|  | ) | 
|  | mock_process.wait = mock.Mock(side_effect=mock_wait) | 
|  |  | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) | 
|  |  | 
|  | def mock_path_exists(name, *args, **kwargs): | 
|  | return exists_pid_fn | 
|  |  | 
|  | def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None): | 
|  | return mock.Mock( | 
|  | __enter__=mock.Mock(return_value=mock_process), | 
|  | __exit__=mock.Mock(return_value=None), | 
|  | communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock())) | 
|  | ) | 
|  |  | 
|  | def mock_thread(name=None, target=None, daemon=None, args=None): | 
|  | return mock.Mock() | 
|  |  | 
|  | def mock_filenames(sysbuild_build_dir): | 
|  | handler.fifo_fn = os.path.join('dummy', 'qemu-fifo') | 
|  | handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid') | 
|  | handler.log_fn = os.path.join('dummy', 'log') | 
|  |  | 
|  | harness = mock.Mock(status=harness_status) | 
|  | handler_options_west_flash = [] | 
|  |  | 
|  | domain_build_dir = os.path.join('sysbuild', 'dummydir') | 
|  | command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run'] | 
|  |  | 
|  | handler.options = mock.Mock( | 
|  | timeout_multiplier=1, | 
|  | west_flash=handler_options_west_flash, | 
|  | west_runner=None, | 
|  | verbose=0, | 
|  | ) | 
|  | handler.run_custom_script = mock.Mock(return_value=None) | 
|  | handler.make_device_available = mock.Mock(return_value=None) | 
|  | handler._final_handle_actions = mock.Mock(return_value=None) | 
|  | handler._create_command = mock.Mock(return_value=command) | 
|  | handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames) | 
|  | handler.get_default_domain_build_dir = mock.Mock(return_value=domain_build_dir) | 
|  | handler.terminate = mock.Mock() | 
|  |  | 
|  | unlink_mock = mock.Mock() | 
|  |  | 
|  | with mock.patch('subprocess.Popen', side_effect=mock_popen), \ | 
|  | mock.patch('builtins.open', mock.mock_open(read_data='1')), \ | 
|  | mock.patch('threading.Thread', side_effect=mock_thread), \ | 
|  | mock.patch('os.path.exists', side_effect=mock_path_exists), \ | 
|  | mock.patch('os.unlink', unlink_mock), \ | 
|  | mock.patch('sys.stdout.isatty', return_value=isatty): | 
|  | handler.handle(harness) | 
|  |  | 
|  | assert all([expected_log in caplog.text for expected_log in expected_logs]) | 
|  |  | 
|  |  | 
|  | def test_qemuhandler_get_fifo(mocked_instance): | 
|  | handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) | 
|  | handler.fifo_fn = 'fifo_fn' | 
|  |  | 
|  | result = handler.get_fifo() | 
|  |  | 
|  | assert result == 'fifo_fn' |