| #!/usr/bin/env python3 |
| # Copyright (c) 2023 Intel Corporation |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| """ |
| Tests for handlers.py classes' methods |
| """ |
| |
| import itertools |
| import mock |
| import os |
| import pytest |
| import signal |
| import subprocess |
| import sys |
| |
| from contextlib import nullcontext |
| from importlib import reload |
| from serial import SerialException |
| from subprocess import CalledProcessError, TimeoutExpired |
| |
| import twisterlib.harness |
| |
| from conftest import ZEPHYR_BASE |
| from twisterlib.error import TwisterException |
| from twisterlib.handlers import ( |
| Handler, |
| BinaryHandler, |
| DeviceHandler, |
| QEMUHandler, |
| SimulationHandler |
| ) |
| |
| |
| @pytest.fixture |
| def mocked_instance(tmp_path): |
| instance = mock.Mock() |
| |
| testsuite = mock.Mock() |
| type(testsuite).source_dir = mock.PropertyMock(return_value='') |
| instance.testsuite = testsuite |
| |
| build_dir = tmp_path / 'build_dir' |
| os.makedirs(build_dir) |
| type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir)) |
| |
| platform = mock.Mock() |
| type(platform).binaries = mock.PropertyMock(return_value=[]) |
| instance.platform = platform |
| |
| type(instance.testsuite).timeout = mock.PropertyMock(return_value=60) |
| type(instance.platform).timeout_multiplier = mock.PropertyMock( |
| return_value=2 |
| ) |
| |
| instance.status = None |
| instance.reason = 'Unknown' |
| |
| return instance |
| |
| |
| @pytest.fixture |
| def faux_timer(): |
| class Counter: |
| def __init__(self): |
| self.t = 0 |
| |
| def time(self): |
| self.t += 1 |
| return self.t |
| |
| return Counter() |
| |
| |
| TESTDATA_1 = [ |
| (True, False, 'posix', ['Install pyserial python module with pip to use' \ |
| ' --device-testing option.'], None), |
| (False, True, 'nt', [], None), |
| (True, True, 'posix', ['Install pyserial python module with pip to use' \ |
| ' --device-testing option.'], ImportError), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'fail_serial, fail_pty, os_name, expected_outs, expected_error', |
| TESTDATA_1, |
| ids=['import serial', 'import pty nt', 'import serial+pty posix'] |
| ) |
| def test_imports( |
| capfd, |
| fail_serial, |
| fail_pty, |
| os_name, |
| expected_outs, |
| expected_error |
| ): |
| class ImportRaiser: |
| def find_spec(self, fullname, path, target=None): |
| if fullname == 'serial' and fail_serial: |
| raise ImportError() |
| if fullname == 'pty' and fail_pty: |
| raise ImportError() |
| |
| modules_mock = sys.modules.copy() |
| modules_mock['serial'] = None if fail_serial else modules_mock['serial'] |
| modules_mock['pty'] = None if fail_pty else modules_mock['pty'] |
| |
| meta_path_mock = sys.meta_path[:] |
| meta_path_mock.insert(0, ImportRaiser()) |
| |
| with mock.patch('os.name', os_name), \ |
| mock.patch.dict('sys.modules', modules_mock, clear=True), \ |
| mock.patch('sys.meta_path', meta_path_mock), \ |
| pytest.raises(expected_error) if expected_error else nullcontext(): |
| reload(twisterlib.handlers) |
| |
| out, _ = capfd.readouterr() |
| assert all([expected_out in out for expected_out in expected_outs]) |
| |
| |
| def test_handler_final_handle_actions(mocked_instance): |
| instance = mocked_instance |
| instance.testcases = [mock.Mock()] |
| |
| handler = Handler(mocked_instance) |
| handler.suite_name_check = True |
| |
| harness = twisterlib.harness.Test() |
| harness.state = mock.Mock() |
| harness.detected_suite_names = mock.Mock() |
| harness.matched_run_id = False |
| harness.run_id_exists = True |
| |
| handler_time = mock.Mock() |
| |
| handler._final_handle_actions(harness, handler_time) |
| |
| assert handler.instance.status == 'failed' |
| assert handler.instance.execution_time == handler_time |
| assert handler.instance.reason == 'RunID mismatch' |
| assert all(testcase.status == 'failed' for \ |
| testcase in handler.instance.testcases) |
| |
| handler.instance.reason = 'This reason shan\'t be changed.' |
| handler._final_handle_actions(harness, handler_time) |
| |
| assert handler.instance.reason == 'This reason shan\'t be changed.' |
| |
| |
| TESTDATA_2 = [ |
| (['dummy_testsuite_name'], False), |
| ([], True), |
| (['another_dummy_name', 'yet_another_dummy_name'], True), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'detected_suite_names, should_be_called', |
| TESTDATA_2, |
| ids=['detected one expected', 'detected none', 'detected two unexpected'] |
| ) |
| def test_handler_verify_ztest_suite_name( |
| mocked_instance, |
| detected_suite_names, |
| should_be_called |
| ): |
| instance = mocked_instance |
| type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name'] |
| |
| harness_state = 'passed' |
| |
| handler_time = mock.Mock() |
| |
| with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked: |
| handler = Handler(instance) |
| handler._verify_ztest_suite_name( |
| harness_state, |
| detected_suite_names, |
| handler_time |
| ) |
| |
| if should_be_called: |
| _missing_mocked.assert_called_once() |
| else: |
| _missing_mocked.assert_not_called() |
| |
| |
| def test_handler_missing_suite_name(mocked_instance): |
| instance = mocked_instance |
| instance.testcases = [mock.Mock()] |
| |
| handler = Handler(mocked_instance) |
| handler.suite_name_check = True |
| |
| expected_suite_names = ['dummy_testsuite_name'] |
| |
| handler_time = mock.Mock() |
| |
| handler._missing_suite_name(expected_suite_names, handler_time) |
| |
| assert handler.instance.status == 'failed' |
| assert handler.instance.execution_time == handler_time |
| assert handler.instance.reason == 'Testsuite mismatch' |
| assert all( |
| testcase.status == 'failed' for testcase in handler.instance.testcases |
| ) |
| |
| |
| def test_handler_record(mocked_instance): |
| instance = mocked_instance |
| instance.testcases = [mock.Mock()] |
| |
| handler = Handler(instance) |
| |
| harness = twisterlib.harness.Harness() |
| harness.recording = [ {'field_1': 'recording_1_1', 'field_2': 'recording_1_2'}, |
| {'field_1': 'recording_2_1', 'field_2': 'recording_2_2'} |
| ] |
| |
| with mock.patch( |
| 'builtins.open', |
| mock.mock_open(read_data='') |
| ) as mock_file, \ |
| mock.patch( |
| 'csv.DictWriter.writerow', |
| mock.Mock() |
| ) as mock_writeheader, \ |
| mock.patch( |
| 'csv.DictWriter.writerows', |
| mock.Mock() |
| ) as mock_writerows: |
| handler.record(harness) |
| |
| print(mock_file.mock_calls) |
| |
| mock_file.assert_called_with( |
| os.path.join(instance.build_dir, 'recording.csv'), |
| 'at' |
| ) |
| |
| mock_writeheader.assert_has_calls([mock.call({ k:k for k in harness.recording[0].keys()})]) |
| mock_writerows.assert_has_calls([mock.call(harness.recording)]) |
| |
| |
| def test_handler_terminate(mocked_instance): |
| def mock_kill_function(pid, sig): |
| if pid < 0: |
| raise ProcessLookupError |
| |
| instance = mocked_instance |
| |
| handler = Handler(instance) |
| |
| mock_process = mock.Mock() |
| mock_child1 = mock.Mock(pid=1) |
| mock_child2 = mock.Mock(pid=2) |
| mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) |
| |
| mock_proc = mock.Mock(pid=0) |
| mock_proc.terminate = mock.Mock(return_value=None) |
| mock_proc.kill = mock.Mock(return_value=None) |
| |
| with mock.patch('psutil.Process', return_value=mock_process), \ |
| mock.patch( |
| 'os.kill', |
| mock.Mock(side_effect=mock_kill_function) |
| ) as mock_kill: |
| handler.terminate(mock_proc) |
| |
| assert handler.terminated |
| mock_proc.terminate.assert_called_once() |
| mock_proc.kill.assert_called_once() |
| mock_kill.assert_has_calls( |
| [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] |
| ) |
| |
| mock_child_neg1 = mock.Mock(pid=-1) |
| mock_process.children = mock.Mock( |
| return_value=[mock_child_neg1, mock_child2] |
| ) |
| handler.terminated = False |
| mock_kill.reset_mock() |
| |
| handler.terminate(mock_proc) |
| |
| mock_kill.assert_has_calls( |
| [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] |
| ) |
| |
| |
| def test_binaryhandler_try_kill_process_by_pid(mocked_instance): |
| def mock_kill_function(pid, sig): |
| if pid < 0: |
| raise ProcessLookupError |
| |
| instance = mocked_instance |
| |
| handler = BinaryHandler(instance, 'build') |
| handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') |
| |
| with mock.patch( |
| 'os.kill', |
| mock.Mock(side_effect=mock_kill_function) |
| ) as mock_kill, \ |
| mock.patch('os.unlink', mock.Mock()) as mock_unlink: |
| with mock.patch('builtins.open', mock.mock_open(read_data='1')): |
| handler.try_kill_process_by_pid() |
| |
| mock_unlink.assert_called_once_with( |
| os.path.join('dummy', 'path', 'to', 'pid.pid') |
| ) |
| mock_kill.assert_called_once_with(1, signal.SIGKILL) |
| |
| mock_unlink.reset_mock() |
| mock_kill.reset_mock() |
| handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') |
| |
| with mock.patch('builtins.open', mock.mock_open(read_data='-1')): |
| handler.try_kill_process_by_pid() |
| |
| mock_unlink.assert_called_once_with( |
| os.path.join('dummy', 'path', 'to', 'pid.pid') |
| ) |
| mock_kill.assert_called_once_with(-1, signal.SIGKILL) |
| |
| |
| TESTDATA_3 = [ |
| ( |
| [b'This\\r\\n', b'is\r', b'a short', b'file.'], |
| mock.Mock(state=False, capture_coverage=False), |
| [ |
| mock.call('This\\r\\n'), |
| mock.call('is\r'), |
| mock.call('a short'), |
| mock.call('file.') |
| ], |
| [ |
| mock.call('This'), |
| mock.call('is'), |
| mock.call('a short'), |
| mock.call('file.') |
| ], |
| None, |
| False |
| ), |
| ( |
| [b'Too much.'] * 120, # Should be more than the timeout |
| mock.Mock(state=False, capture_coverage=False), |
| None, |
| None, |
| True, |
| False |
| ), |
| ( |
| [b'Too much.'] * 120, # Should be more than the timeout |
| mock.Mock(state=True, capture_coverage=False), |
| None, |
| None, |
| True, |
| False |
| ), |
| ( |
| [b'Too much.'] * 120, # Should be more than the timeout |
| mock.Mock(state=True, capture_coverage=True), |
| None, |
| None, |
| False, |
| True |
| ), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'proc_stdout, harness, expected_handler_calls,' |
| ' expected_harness_calls, should_be_less, timeout_wait', |
| TESTDATA_3, |
| ids=[ |
| 'no timeout', |
| 'timeout', |
| 'timeout with harness state', |
| 'timeout with capture_coverage, wait timeout' |
| ] |
| ) |
| def test_binaryhandler_output_handler( |
| mocked_instance, |
| faux_timer, |
| proc_stdout, |
| harness, |
| expected_handler_calls, |
| expected_harness_calls, |
| should_be_less, |
| timeout_wait |
| ): |
| class MockStdout(mock.Mock): |
| def __init__(self, text): |
| super().__init__(text) |
| self.text = text |
| self.line_index = 0 |
| |
| def readline(self): |
| if self.line_index == len(self.text): |
| self.line_index = 0 |
| return b'' |
| else: |
| line = self.text[self.line_index] |
| self.line_index += 1 |
| return line |
| |
| class MockProc(mock.Mock): |
| def __init__(self, pid, stdout): |
| super().__init__(pid, stdout) |
| self.pid = mock.PropertyMock(return_value=pid) |
| self.stdout = MockStdout(stdout) |
| |
| def wait(self, *args, **kwargs): |
| if timeout_wait: |
| raise TimeoutExpired('dummy cmd', 'dummyamount') |
| |
| handler = BinaryHandler(mocked_instance, 'build') |
| handler.terminate = mock.Mock() |
| handler.options = mock.Mock(timeout_multiplier=1) |
| |
| proc = MockProc(1, proc_stdout) |
| |
| with mock.patch( |
| 'builtins.open', |
| mock.mock_open(read_data='') |
| ) as mock_file, \ |
| mock.patch('time.time', side_effect=faux_timer.time): |
| handler._output_handler(proc, harness) |
| |
| mock_file.assert_called_with(handler.log, 'wt') |
| |
| if expected_handler_calls: |
| mock_file.return_value.write.assert_has_calls(expected_handler_calls) |
| if expected_harness_calls: |
| harness.handle.assert_has_calls(expected_harness_calls) |
| if should_be_less is not None: |
| if should_be_less: |
| assert mock_file.return_value.write.call_count < len(proc_stdout) |
| else: |
| assert mock_file.return_value.write.call_count == len(proc_stdout) |
| if timeout_wait: |
| handler.terminate.assert_called_once_with(proc) |
| |
| |
| TESTDATA_4 = [ |
| (True, False, False, True, None, None, |
| ['valgrind', '--error-exitcode=2', '--leak-check=full', |
| f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp', |
| '--log-file=build_dir/valgrind.log', '--track-origins=yes', |
| 'generator', 'run_renode_test']), |
| (False, True, False, False, 123, None, ['generator', 'run', '--seed=123']), |
| (False, False, True, False, None, None, |
| ['west', 'flash', '--skip-rebuild', '-d', 'build_dir']), |
| (False, False, False, False, None, ['ex1', 'ex2'], ['bin', 'ex1', 'ex2']), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'robot_test, call_make_run, call_west_flash, enable_valgrind, seed,' \ |
| ' extra_args, expected', |
| TESTDATA_4, |
| ids=['robot, valgrind', 'make run, seed', 'west flash', 'binary, extra'] |
| ) |
| def test_binaryhandler_create_command( |
| mocked_instance, |
| robot_test, |
| call_make_run, |
| call_west_flash, |
| enable_valgrind, |
| seed, |
| extra_args, |
| expected |
| ): |
| handler = BinaryHandler(mocked_instance, 'build') |
| handler.generator_cmd = 'generator' |
| handler.binary = 'bin' |
| handler.call_make_run = call_make_run |
| handler.call_west_flash = call_west_flash |
| handler.options = mock.Mock(enable_valgrind=enable_valgrind) |
| handler.seed = seed |
| handler.extra_test_args = extra_args |
| handler.build_dir = 'build_dir' |
| |
| command = handler._create_command(robot_test) |
| |
| assert command == expected |
| |
| |
| TESTDATA_5 = [ |
| (False, False, False), |
| (True, False, False), |
| (True, True, False), |
| (False, False, True), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'enable_asan, enable_lsan, enable_ubsan', |
| TESTDATA_5, |
| ids=['none', 'asan', 'asan, lsan', 'ubsan'] |
| ) |
| def test_binaryhandler_create_env( |
| mocked_instance, |
| enable_asan, |
| enable_lsan, |
| enable_ubsan |
| ): |
| handler = BinaryHandler(mocked_instance, 'build') |
| handler.options = mock.Mock( |
| enable_asan=enable_asan, |
| enable_lsan=enable_lsan, |
| enable_ubsan=enable_ubsan |
| ) |
| |
| env = { |
| 'example_env_var': True, |
| 'ASAN_OPTIONS': 'dummy=dummy:', |
| 'UBSAN_OPTIONS': 'dummy=dummy:' |
| } |
| |
| with mock.patch('os.environ', env): |
| res = handler._create_env() |
| |
| assert env['example_env_var'] == res['example_env_var'] |
| |
| if enable_ubsan: |
| assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS'] |
| assert 'log_path=stdout:' in res['UBSAN_OPTIONS'] |
| assert 'halt_on_error=1:' in res['UBSAN_OPTIONS'] |
| |
| if enable_asan: |
| assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS'] |
| assert 'log_path=stdout:' in res['ASAN_OPTIONS'] |
| |
| if not enable_lsan: |
| assert 'detect_leaks=0' in res['ASAN_OPTIONS'] |
| |
| |
| TESTDATA_6 = [ |
| (None, False, 2, True, 'failed', 'Valgrind error', False), |
| (None, False, 1, False, 'failed', 'Failed', False), |
| ('failed', False, 0, False, 'failed', 'Failed', False), |
| ('success', False, 0, False, 'success', 'Unknown', False), |
| (None, True, 1, True, 'failed', 'Timeout', True), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'harness_state, terminated, returncode, enable_valgrind,' \ |
| ' expected_status, expected_reason, do_add_missing', |
| TESTDATA_6, |
| ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state'] |
| ) |
| def test_binaryhandler_update_instance_info( |
| mocked_instance, |
| harness_state, |
| terminated, |
| returncode, |
| enable_valgrind, |
| expected_status, |
| expected_reason, |
| do_add_missing |
| ): |
| handler = BinaryHandler(mocked_instance, 'build') |
| handler_time = 59 |
| handler.terminated = terminated |
| handler.returncode = returncode |
| handler.options = mock.Mock(enable_valgrind=enable_valgrind) |
| missing_mock = mock.Mock() |
| handler.instance.add_missing_case_status = missing_mock |
| |
| handler._update_instance_info(harness_state, handler_time) |
| |
| assert handler.instance.execution_time == handler_time |
| |
| assert handler.instance.status == expected_status |
| assert handler.instance.reason == expected_reason |
| |
| if do_add_missing: |
| missing_mock.assert_called_once_with('blocked', expected_reason) |
| |
| |
| TESTDATA_7 = [ |
| (True, False, False), |
| (False, True, False), |
| (False, False, True), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'is_robot_test, coverage, isatty', |
| TESTDATA_7, |
| ids=['robot test', 'coverage', 'isatty'] |
| ) |
| def test_binaryhandler_handle( |
| mocked_instance, |
| caplog, |
| is_robot_test, |
| coverage, |
| isatty |
| ): |
| thread_mock_obj = mock.Mock() |
| |
| def mock_popen(command, *args, **kwargs,): |
| return mock.Mock( |
| __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)), |
| __exit__=mock.Mock(return_value=None) |
| ) |
| |
| def mock_thread(target, *args, **kwargs): |
| return thread_mock_obj |
| |
| handler = BinaryHandler(mocked_instance, 'build') |
| handler.sourcedir = 'source_dir' |
| handler.build_dir = 'build_dir' |
| handler.name= 'Dummy Name' |
| handler._create_command = mock.Mock(return_value=['dummy' , 'command']) |
| handler._create_env = mock.Mock(return_value=[]) |
| handler._update_instance_info = mock.Mock() |
| handler._final_handle_actions = mock.Mock() |
| handler.terminate = mock.Mock() |
| handler.try_kill_process_by_pid = mock.Mock() |
| handler.options = mock.Mock(coverage=coverage) |
| |
| robot_mock = mock.Mock() |
| harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock) |
| |
| popen_mock = mock.Mock(side_effect=mock_popen) |
| thread_mock = mock.Mock(side_effect=mock_thread) |
| call_mock = mock.Mock() |
| |
| with mock.patch('subprocess.call', call_mock), \ |
| mock.patch('subprocess.Popen', popen_mock), \ |
| mock.patch('threading.Thread', thread_mock), \ |
| mock.patch('sys.stdout.isatty', return_value=isatty): |
| handler.handle(harness) |
| |
| if is_robot_test: |
| robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY) |
| return |
| |
| assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text |
| |
| thread_mock_obj.join.assert_called() |
| handler._update_instance_info.assert_called_once() |
| handler._final_handle_actions.assert_called_once() |
| |
| if isatty: |
| call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY) |
| |
| |
| TESTDATA_8 = [ |
| ('renode', True, True, False, False), |
| ('native', False, False, False, True), |
| ('build', False, True, False, False), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready', |
| TESTDATA_8, |
| ids=[t[0] for t in TESTDATA_8] |
| ) |
| def test_simulationhandler_init( |
| mocked_instance, |
| type_str, |
| is_pid_fn, |
| expected_call_make_run, |
| is_binary, |
| expected_ready |
| ): |
| handler = SimulationHandler(mocked_instance, type_str) |
| |
| assert handler.call_make_run == expected_call_make_run |
| assert handler.ready == expected_ready |
| |
| if is_pid_fn: |
| assert handler.pid_fn == os.path.join(mocked_instance.build_dir, |
| 'renode.pid') |
| if is_binary: |
| assert handler.pid_fn == os.path.join(mocked_instance.build_dir, |
| 'zephyr', 'zephyr.exe') |
| |
| |
| TESTDATA_9 = [ |
| (3, 2, 0, 0, 3, -1, True, False, False, 1), |
| (4, 1, 0, 0, -1, -1, False, True, False, 0), |
| (5, 0, 1, 2, -1, 4, False, False, True, 3) |
| ] |
| |
| @pytest.mark.parametrize( |
| 'success_count, in_waiting_count, oserror_count, readline_error_count,' |
| ' haltless_count, stateless_count, end_by_halt, end_by_close,' |
| ' end_by_state, expected_line_count', |
| TESTDATA_9, |
| ids=[ |
| 'halt event', |
| 'serial closes', |
| 'harness state with errors' |
| ] |
| ) |
| def test_devicehandler_monitor_serial( |
| mocked_instance, |
| success_count, |
| in_waiting_count, |
| oserror_count, |
| readline_error_count, |
| haltless_count, |
| stateless_count, |
| end_by_halt, |
| end_by_close, |
| end_by_state, |
| expected_line_count |
| ): |
| is_open_iter = iter(lambda: True, False) |
| line_iter = [ |
| TypeError('dummy TypeError') if x % 2 else \ |
| SerialException('dummy SerialException') for x in range( |
| readline_error_count |
| ) |
| ] + [ |
| f'line no {idx}'.encode('utf-8') for idx in range(success_count) |
| ] |
| in_waiting_iter = [False] * in_waiting_count + [ |
| TypeError('dummy TypeError') |
| ] if end_by_close else ( |
| [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count |
| ) + [True] * (success_count + readline_error_count) |
| |
| is_set_iter = [False] * haltless_count + [True] \ |
| if end_by_halt else iter(lambda: False, True) |
| |
| state_iter = [False] * stateless_count + [True] \ |
| if end_by_state else iter(lambda: False, True) |
| |
| halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter)) |
| ser = mock.Mock( |
| isOpen=mock.Mock(side_effect=is_open_iter), |
| readline=mock.Mock(side_effect=line_iter) |
| ) |
| type(ser).in_waiting = mock.PropertyMock( |
| side_effect=in_waiting_iter, |
| return_value=False |
| ) |
| harness = mock.Mock(capture_coverage=False) |
| type(harness).state=mock.PropertyMock(side_effect=state_iter) |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.options = mock.Mock(enable_coverage=not end_by_state) |
| |
| with mock.patch('builtins.open', mock.mock_open(read_data='')): |
| handler.monitor_serial(ser, halt_event, harness) |
| |
| if not end_by_close: |
| ser.close.assert_called_once() |
| |
| harness.handle.assert_has_calls( |
| [mock.call(f'line no {idx}') for idx in range(expected_line_count)] |
| ) |
| |
| |
| TESTDATA_10 = [ |
| ( |
| 'dummy_platform', |
| 'dummy fixture', |
| [ |
| mock.Mock( |
| fixtures=[], |
| platform='dummy_platform', |
| available=1, |
| counter=0 |
| ), |
| mock.Mock( |
| fixtures=['dummy fixture'], |
| platform='another_platform', |
| available=1, |
| counter=0 |
| ), |
| mock.Mock( |
| fixtures=['dummy fixture'], |
| platform='dummy_platform', |
| serial_pty=None, |
| serial=None, |
| available=1, |
| counter=0 |
| ), |
| mock.Mock( |
| fixtures=['dummy fixture'], |
| platform='dummy_platform', |
| serial_pty=mock.Mock(), |
| available=1, |
| counter=0 |
| ) |
| ], |
| 3 |
| ), |
| ( |
| 'dummy_platform', |
| 'dummy fixture', |
| [], |
| TwisterException |
| ), |
| ( |
| 'dummy_platform', |
| 'dummy fixture', |
| [ |
| mock.Mock( |
| fixtures=['dummy fixture'], |
| platform='dummy_platform', |
| serial_pty=mock.Mock(), |
| available=0 |
| ), |
| mock.Mock( |
| fixtures=['another fixture'], |
| platform='dummy_platform', |
| serial_pty=mock.Mock(), |
| available=0 |
| ), |
| mock.Mock( |
| fixtures=['dummy fixture'], |
| platform='dummy_platform', |
| serial=mock.Mock(), |
| available=0 |
| ), |
| mock.Mock( |
| fixtures=['another fixture'], |
| platform='dummy_platform', |
| serial=mock.Mock(), |
| available=0 |
| ) |
| ], |
| None |
| ) |
| ] |
| |
| @pytest.mark.parametrize( |
| 'platform_name, fixture, duts, expected', |
| TESTDATA_10, |
| ids=['one good dut', 'exception - no duts', 'no available duts'] |
| ) |
| def test_devicehandler_device_is_available( |
| mocked_instance, |
| platform_name, |
| fixture, |
| duts, |
| expected |
| ): |
| mocked_instance.platform.name = platform_name |
| mocked_instance.testsuite.harness_config = {'fixture': fixture} |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.duts = duts |
| |
| if isinstance(expected, int): |
| device = handler.device_is_available(mocked_instance) |
| |
| assert device == duts[expected] |
| assert device.available == 0 |
| assert device.counter == 1 |
| elif expected is None: |
| device = handler.device_is_available(mocked_instance) |
| |
| assert device is None |
| elif isinstance(expected, type): |
| with pytest.raises(expected): |
| device = handler.device_is_available(mocked_instance) |
| else: |
| assert False |
| |
| |
| def test_devicehandler_make_device_available(mocked_instance): |
| serial = mock.Mock(name='dummy_serial') |
| duts = [ |
| mock.Mock(available=0, serial=serial, serial_pty=None), |
| mock.Mock(available=0, serial=None, serial_pty=serial), |
| mock.Mock( |
| available=0, |
| serial=mock.Mock('another_serial'), |
| serial_pty=None |
| ) |
| ] |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.duts = duts |
| |
| handler.make_device_available(serial) |
| |
| assert len([None for d in handler.duts if d.available == 1]) == 2 |
| assert handler.duts[2].available == 0 |
| |
| |
| TESTDATA_11 = [ |
| (mock.Mock(pid=0, returncode=0), False), |
| (mock.Mock(pid=0, returncode=1), False), |
| (mock.Mock(pid=0, returncode=1), True) |
| ] |
| |
| @pytest.mark.parametrize( |
| 'mock_process, raise_timeout', |
| TESTDATA_11, |
| ids=['proper script', 'error', 'timeout'] |
| ) |
| def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout): |
| def raise_timeout_fn(timeout=-1): |
| if raise_timeout and timeout != -1: |
| raise subprocess.TimeoutExpired(None, timeout) |
| else: |
| return mock.Mock(), mock.Mock() |
| |
| def assert_popen(command, *args, **kwargs): |
| return mock.Mock( |
| __enter__=mock.Mock(return_value=mock_process), |
| __exit__=mock.Mock(return_value=None) |
| ) |
| |
| mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn) |
| |
| script = [os.path.join('test','script', 'path'), 'arg'] |
| timeout = 60 |
| |
| with mock.patch('subprocess.Popen', side_effect=assert_popen): |
| DeviceHandler.run_custom_script(script, timeout) |
| |
| if raise_timeout: |
| assert all( |
| t in caplog.text.lower() for t in [str(script), 'timed out'] |
| ) |
| mock_process.assert_has_calls( |
| [ |
| mock.call.communicate(timeout=timeout), |
| mock.call.kill(), |
| mock.call.communicate() |
| ] |
| ) |
| elif mock_process.returncode == 0: |
| assert not any([r.levelname == 'ERROR' for r in caplog.records]) |
| else: |
| assert 'timed out' not in caplog.text.lower() |
| assert 'custom script failure' in caplog.text.lower() |
| |
| |
| TESTDATA_12 = [ |
| (0, False), |
| (4, False), |
| (0, True) |
| ] |
| |
| @pytest.mark.parametrize( |
| 'num_of_failures, raise_exception', |
| TESTDATA_12, |
| ids=['no failures', 'with failures', 'exception'] |
| ) |
| def test_devicehandler_get_hardware( |
| mocked_instance, |
| caplog, |
| num_of_failures, |
| raise_exception |
| ): |
| expected_hardware = mock.Mock() |
| |
| def mock_availability(handler, instance, no=num_of_failures): |
| if raise_exception: |
| raise TwisterException(f'dummy message') |
| if handler.no: |
| handler.no -= 1 |
| return None |
| return expected_hardware |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.no = num_of_failures |
| |
| with mock.patch.object( |
| DeviceHandler, |
| 'device_is_available', |
| mock_availability |
| ): |
| hardware = handler.get_hardware() |
| |
| if raise_exception: |
| assert 'dummy message' in caplog.text.lower() |
| assert mocked_instance.status == 'failed' |
| assert mocked_instance.reason == 'dummy message' |
| else: |
| assert hardware == expected_hardware |
| |
| |
| TESTDATA_13 = [ |
| ( |
| None, |
| None, |
| None, |
| ['generator_cmd', '-C', '$build_dir', 'flash'] |
| ), |
| ( |
| [], |
| None, |
| None, |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] |
| ), |
| ( |
| '--dummy', |
| None, |
| None, |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--', '--dummy'] |
| ), |
| ( |
| '--dummy1,--dummy2', |
| None, |
| None, |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--', '--dummy1', '--dummy2'] |
| ), |
| |
| ( |
| None, |
| 'runner', |
| 'product', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'runner', 'param1', 'param2'] |
| ), |
| |
| ( |
| None, |
| 'pyocd', |
| 'product', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345] |
| ), |
| ( |
| None, |
| 'nrfjprog', |
| 'product', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345] |
| ), |
| ( |
| None, |
| 'openocd', |
| 'STM32 STLink', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'openocd', 'param1', 'param2', |
| '--', '--cmd-pre-init', 'hla_serial 12345'] |
| ), |
| ( |
| None, |
| 'openocd', |
| 'STLINK-V3', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'openocd', 'param1', 'param2', |
| '--', '--cmd-pre-init', 'hla_serial 12345'] |
| ), |
| ( |
| None, |
| 'openocd', |
| 'EDBG CMSIS-DAP', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'openocd', 'param1', 'param2', |
| '--', '--cmd-pre-init', 'cmsis_dap_serial 12345'] |
| ), |
| ( |
| None, |
| 'jlink', |
| 'product', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'jlink', '--tool-opt=-SelectEmuBySN 12345', # 2x space |
| 'param1', 'param2'] |
| ), |
| ( |
| None, |
| 'stm32cubeprogrammer', |
| 'product', |
| ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', |
| '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345', |
| 'param1', 'param2'] |
| ), |
| |
| ] |
| |
| TESTDATA_13_2 = [(True), (False)] |
| |
| @pytest.mark.parametrize( |
| 'self_west_flash, runner,' \ |
| ' hardware_product_name, expected', |
| TESTDATA_13, |
| ids=['generator', '--west-flash', 'one west flash value', |
| 'multiple west flash values', 'generic runner', 'pyocd', |
| 'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3', |
| 'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer'] |
| ) |
| @pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id']) |
| def test_devicehandler_create_command( |
| mocked_instance, |
| self_west_flash, |
| runner, |
| hardware_probe, |
| hardware_product_name, |
| expected |
| ): |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.options = mock.Mock(west_flash=self_west_flash) |
| handler.generator_cmd = 'generator_cmd' |
| |
| expected = [handler.build_dir if val == '$build_dir' else \ |
| val for val in expected] |
| |
| hardware = mock.Mock( |
| product=hardware_product_name, |
| probe_id=12345 if hardware_probe else None, |
| id=12345 if not hardware_probe else None, |
| runner_params=['param1', 'param2'] |
| ) |
| |
| command = handler._create_command(runner, hardware) |
| |
| assert command == expected |
| |
| |
| TESTDATA_14 = [ |
| ('success', False, 'success', 'Unknown', False), |
| ('failed', False, 'failed', 'Failed', True), |
| ('error', False, 'error', 'Unknown', True), |
| (None, True, None, 'Unknown', False), |
| (None, False, 'failed', 'Timeout', True), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'harness_state, flash_error,' \ |
| ' expected_status, expected_reason, do_add_missing', |
| TESTDATA_14, |
| ids=['success', 'failed', 'error', 'flash error', 'no status'] |
| ) |
| def test_devicehandler_update_instance_info( |
| mocked_instance, |
| harness_state, |
| flash_error, |
| expected_status, |
| expected_reason, |
| do_add_missing |
| ): |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler_time = 59 |
| missing_mock = mock.Mock() |
| handler.instance.add_missing_case_status = missing_mock |
| |
| handler._update_instance_info(harness_state, handler_time, flash_error) |
| |
| assert handler.instance.execution_time == handler_time |
| |
| assert handler.instance.status == expected_status |
| assert handler.instance.reason == expected_reason |
| |
| if do_add_missing: |
| missing_mock.assert_called_once_with('blocked', expected_reason) |
| |
| |
| TESTDATA_15 = [ |
| ('dummy device', 'dummy pty', None, None, True, False, False), |
| ( |
| 'dummy device', |
| 'dummy pty', |
| mock.Mock(communicate=mock.Mock(return_value=('', ''))), |
| SerialException, |
| False, |
| True, |
| 'dummy pty' |
| ), |
| ( |
| 'dummy device', |
| None, |
| None, |
| SerialException, |
| False, |
| False, |
| 'dummy device' |
| ) |
| ] |
| |
| @pytest.mark.parametrize( |
| 'serial_device, serial_pty, ser_pty_process, expected_exception,' \ |
| ' expected_result, terminate_ser_pty_process, make_available', |
| TESTDATA_15, |
| ids=['valid', 'serial pty process', 'no serial pty'] |
| ) |
| def test_devicehandler_create_serial_connection( |
| mocked_instance, |
| serial_device, |
| serial_pty, |
| ser_pty_process, |
| expected_exception, |
| expected_result, |
| terminate_ser_pty_process, |
| make_available |
| ): |
| def mock_serial(*args, **kwargs): |
| if expected_exception: |
| raise expected_exception('') |
| return expected_result |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.make_device_available = mock.Mock() |
| missing_mock = mock.Mock() |
| handler.instance.add_missing_case_status = missing_mock |
| available_mock = mock.Mock() |
| handler.make_device_available = available_mock |
| handler.options = mock.Mock(timeout_multiplier=1) |
| |
| hardware_baud = 14400 |
| flash_timeout = 60 |
| serial_mock = mock.Mock(side_effect=mock_serial) |
| |
| with mock.patch('serial.Serial', serial_mock), \ |
| pytest.raises(expected_exception) if expected_exception else \ |
| nullcontext(): |
| result = handler._create_serial_connection(serial_device, hardware_baud, |
| flash_timeout, serial_pty, |
| ser_pty_process) |
| |
| if expected_result: |
| assert result is not None |
| |
| if expected_exception: |
| assert handler.instance.status == 'failed' |
| assert handler.instance.reason == 'Serial Device Error' |
| |
| missing_mock.assert_called_once_with('blocked', 'Serial Device Error') |
| |
| if terminate_ser_pty_process: |
| ser_pty_process.terminate.assert_called_once() |
| ser_pty_process.communicate.assert_called_once() |
| |
| if make_available: |
| available_mock.assert_called_once_with(make_available) |
| |
| |
| TESTDATA_16 = [ |
| ('dummy1 dummy2', None, 'slave name'), |
| ('dummy1,dummy2', CalledProcessError, None), |
| (None, None, 'dummy hardware serial'), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'serial_pty, popen_exception, expected_device', |
| TESTDATA_16, |
| ids=['pty', 'pty process error', 'no pty'] |
| ) |
| def test_devicehandler_get_serial_device( |
| mocked_instance, |
| serial_pty, |
| popen_exception, |
| expected_device |
| ): |
| def mock_popen(command, *args, **kwargs): |
| assert command == ['dummy1', 'dummy2'] |
| if popen_exception: |
| raise popen_exception(command, 'Dummy error') |
| return mock.Mock() |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| hardware_serial = 'dummy hardware serial' |
| |
| popen_mock = mock.Mock(side_effect=mock_popen) |
| openpty_mock = mock.Mock(return_value=('master', 'slave')) |
| ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') |
| |
| with mock.patch('subprocess.Popen', popen_mock), \ |
| mock.patch('pty.openpty', openpty_mock), \ |
| mock.patch('os.ttyname', ttyname_mock): |
| result = handler._get_serial_device(serial_pty, hardware_serial) |
| |
| if popen_exception: |
| assert result is None |
| else: |
| assert result[0] == expected_device |
| |
| TESTDATA_17 = [ |
| (False, False, False, False, None, False, False, |
| None, None, []), |
| (True, True, False, False, None, False, False, |
| None, None, []), |
| (True, False, True, False, None, False, False, |
| 'error', 'Device issue (Flash error)', []), |
| (True, False, False, True, None, False, False, |
| 'error', 'Device issue (Timeout)', ['Flash operation timed out.']), |
| (True, False, False, False, 1, False, False, |
| 'error', 'Device issue (Flash error?)', []), |
| (True, False, False, False, 0, True, False, |
| None, None, ['Timed out while monitoring serial output on IPName']), |
| (True, False, False, False, 0, False, True, |
| None, None, ['Process Serial PTY terminated outs: errs ']), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \ |
| ' returncode, do_timeout_thread, use_pty,' \ |
| ' expected_status, expected_reason, expected_logs', |
| TESTDATA_17, |
| ids=['no hardware', 'create serial failure', 'popen called process error', |
| 'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev'] |
| ) |
| def test_devicehandler_handle( |
| mocked_instance, |
| caplog, |
| has_hardware, |
| raise_create_serial, |
| raise_popen, |
| raise_timeout, |
| returncode, |
| do_timeout_thread, |
| use_pty, |
| expected_status, |
| expected_reason, |
| expected_logs |
| ): |
| def mock_get_serial(serial_pty, hardware_serial): |
| if serial_pty: |
| serial_pty_process = mock.Mock( |
| name='dummy serial PTY process', |
| communicate=mock.Mock( |
| return_value=('', '') |
| ) |
| ) |
| return 'dummy serial PTY device', serial_pty_process |
| return 'dummy serial device', None |
| |
| def mock_create_serial(*args, **kwargs): |
| if raise_create_serial: |
| raise SerialException('dummy cmd', 'dummy msg') |
| return mock.Mock(name='dummy serial') |
| |
| def mock_thread(*args, **kwargs): |
| is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread)) |
| |
| return mock.Mock(is_alive=is_alive_mock) |
| |
| def mock_terminate(proc, *args, **kwargs): |
| proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock())) |
| |
| def mock_communicate(*args, **kwargs): |
| if raise_timeout: |
| raise TimeoutExpired('dummy cmd', 'dummyamount') |
| return mock.Mock(), mock.Mock() |
| |
| def mock_popen(command, *args, **kwargs): |
| if raise_popen: |
| raise CalledProcessError('dummy proc', 'dummy msg') |
| |
| mock_process = mock.Mock( |
| pid=1, |
| returncode=returncode, |
| communicate=mock.Mock(side_effect=mock_communicate) |
| ) |
| |
| return mock.Mock( |
| __enter__=mock.Mock(return_value=mock_process), |
| __exit__=mock.Mock(return_value=None) |
| ) |
| |
| hardware = None if not has_hardware else mock.Mock( |
| baud=14400, |
| runner='dummy runner', |
| serial_pty='Serial PTY' if use_pty else None, |
| serial='dummy serial', |
| pre_script='dummy pre script', |
| post_script='dummy post script', |
| post_flash_script='dummy post flash script', |
| flash_timeout=60, |
| flash_with_test=True |
| ) |
| |
| handler = DeviceHandler(mocked_instance, 'build') |
| handler.get_hardware = mock.Mock(return_value=hardware) |
| handler.options = mock.Mock( |
| timeout_multiplier=1, |
| west_flash=None, |
| west_runner=None |
| ) |
| handler._get_serial_device = mock.Mock(side_effect=mock_get_serial) |
| handler._create_command = mock.Mock(return_value=['dummy', 'command']) |
| handler.run_custom_script = mock.Mock() |
| handler._create_serial_connection = mock.Mock( |
| side_effect=mock_create_serial |
| ) |
| handler.monitor_serial = mock.Mock() |
| handler.terminate = mock.Mock(side_effect=mock_terminate) |
| handler._update_instance_info = mock.Mock() |
| handler._final_handle_actions = mock.Mock() |
| handler.make_device_available = mock.Mock() |
| handler.instance.platform.name = 'IPName' |
| |
| harness = mock.Mock() |
| |
| with mock.patch('builtins.open', mock.mock_open(read_data='')), \ |
| mock.patch('subprocess.Popen', side_effect=mock_popen), \ |
| mock.patch('threading.Event', mock.Mock()), \ |
| mock.patch('threading.Thread', side_effect=mock_thread): |
| handler.handle(harness) |
| |
| handler.get_hardware.assert_called_once() |
| |
| messages = [record.msg for record in caplog.records] |
| assert all([msg in messages for msg in expected_logs]) |
| |
| if not has_hardware: |
| return |
| |
| handler.run_custom_script.assert_has_calls([ |
| mock.call('dummy pre script', mock.ANY) |
| ]) |
| |
| if raise_create_serial: |
| return |
| |
| handler.run_custom_script.assert_has_calls([ |
| mock.call('dummy pre script', mock.ANY), |
| mock.call('dummy post flash script', mock.ANY), |
| mock.call('dummy post script', mock.ANY) |
| ]) |
| |
| if expected_reason: |
| assert handler.instance.reason == expected_reason |
| if expected_status: |
| assert handler.instance.status == expected_status |
| |
| handler.make_device_available.assert_called_once_with( |
| 'Serial PTY' if use_pty else 'dummy serial device' |
| ) |
| |
| |
| TESTDATA_18 = [ |
| (True, True, True), |
| (False, False, False), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', |
| TESTDATA_18, |
| ids=['ignore crash', 'qemu crash'] |
| ) |
| def test_qemuhandler_init( |
| mocked_instance, |
| ignore_qemu_crash, |
| expected_ignore_crash, |
| expected_ignore_unexpected_eof |
| ): |
| mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash |
| |
| handler = QEMUHandler(mocked_instance, 'build') |
| |
| assert handler.ignore_qemu_crash == expected_ignore_crash |
| assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof |
| |
| |
| def test_qemuhandler_get_cpu_time(): |
| def mock_process(pid): |
| return mock.Mock( |
| cpu_times=mock.Mock( |
| return_value=mock.Mock( |
| user=20.0, |
| system=64.0 |
| ) |
| ) |
| ) |
| |
| with mock.patch('psutil.Process', mock_process): |
| res = QEMUHandler._get_cpu_time(0) |
| |
| assert res == pytest.approx(84.0) |
| |
| |
| TESTDATA_19 = [ |
| ( |
| True, |
| os.path.join('self', 'dummy_dir', '1'), |
| mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')), |
| os.path.join('dummy_dir', '1') |
| ), |
| ( |
| False, |
| os.path.join('self', 'dummy_dir', '2'), |
| mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')), |
| os.path.join('self', 'dummy_dir', '2') |
| ), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'self_sysbuild, self_build_dir, build_dir, expected', |
| TESTDATA_19, |
| ids=['domains build dir', 'self build dir'] |
| ) |
| def test_qemuhandler_get_sysbuild_build_dir( |
| mocked_instance, |
| self_sysbuild, |
| self_build_dir, |
| build_dir, |
| expected |
| ): |
| get_default_domain_mock = mock.Mock() |
| type(get_default_domain_mock()).build_dir = build_dir |
| domains_mock = mock.Mock(get_default_domain=get_default_domain_mock) |
| from_file_mock = mock.Mock(return_value=domains_mock) |
| |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler.instance.testsuite.sysbuild = self_sysbuild |
| handler.build_dir = self_build_dir |
| |
| with mock.patch('domains.Domains.from_file', from_file_mock): |
| result = handler._get_sysbuild_build_dir() |
| |
| assert result == expected |
| |
| |
| TESTDATA_20 = [ |
| ( |
| os.path.join('self', 'dummy_dir', 'log1'), |
| os.path.join('self', 'dummy_dir', 'pid1'), |
| os.path.join('sysbuild', 'dummy_dir', 'bd1'), |
| True |
| ), |
| ( |
| os.path.join('self', 'dummy_dir', 'log2'), |
| os.path.join('self', 'dummy_dir', 'pid2'), |
| os.path.join('sysbuild', 'dummy_dir', 'bd2'), |
| False |
| ), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', |
| TESTDATA_20, |
| ids=['pid exists', 'pid missing'] |
| ) |
| def test_qemuhandler_set_qemu_filenames( |
| mocked_instance, |
| self_log, |
| self_pid_fn, |
| sysbuild_build_dir, |
| exists_pid_fn |
| ): |
| unlink_mock = mock.Mock() |
| exists_mock = mock.Mock(return_value=exists_pid_fn) |
| |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler.log = self_log |
| handler.pid_fn = self_pid_fn |
| |
| with mock.patch('os.unlink', unlink_mock), \ |
| mock.patch('os.path.exists', exists_mock): |
| handler._set_qemu_filenames(sysbuild_build_dir) |
| |
| assert handler.fifo_fn == mocked_instance.build_dir + \ |
| os.path.sep + 'qemu-fifo' |
| |
| assert handler.pid_fn == sysbuild_build_dir + os.path.sep + 'qemu.pid' |
| |
| assert handler.log_fn == self_log |
| |
| if exists_pid_fn: |
| unlink_mock.assert_called_once_with(sysbuild_build_dir + \ |
| os.path.sep + 'qemu.pid') |
| |
| |
| def test_qemuhandler_create_command(mocked_instance): |
| sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') |
| |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler.generator_cmd = 'dummy_cmd' |
| |
| result = handler._create_command(sysbuild_build_dir) |
| |
| assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir', |
| 'run'] |
| |
| |
| TESTDATA_21 = [ |
| ( |
| 0, |
| False, |
| None, |
| 'good dummy state', |
| False, |
| None, |
| None, |
| False |
| ), |
| ( |
| 1, |
| True, |
| None, |
| 'good dummy state', |
| False, |
| None, |
| None, |
| False |
| ), |
| ( |
| 0, |
| False, |
| None, |
| None, |
| True, |
| 'failed', |
| 'Timeout', |
| True |
| ), |
| ( |
| 1, |
| False, |
| None, |
| None, |
| False, |
| 'failed', |
| 'Exited with 1', |
| True |
| ), |
| ( |
| 1, |
| False, |
| 'preexisting reason', |
| 'good dummy state', |
| False, |
| 'failed', |
| 'preexisting reason', |
| True |
| ), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'self_returncode, self_ignore_qemu_crash,' \ |
| ' self_instance_reason, harness_state, is_timeout,' \ |
| ' expected_status, expected_reason, expected_called_missing_case', |
| TESTDATA_21, |
| ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail'] |
| ) |
| def test_qemuhandler_update_instance_info( |
| mocked_instance, |
| self_returncode, |
| self_ignore_qemu_crash, |
| self_instance_reason, |
| harness_state, |
| is_timeout, |
| expected_status, |
| expected_reason, |
| expected_called_missing_case |
| ): |
| mocked_instance.add_missing_case_status = mock.Mock() |
| mocked_instance.reason = self_instance_reason |
| |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler.returncode = self_returncode |
| handler.ignore_qemu_crash = self_ignore_qemu_crash |
| |
| handler._update_instance_info(harness_state, is_timeout) |
| |
| assert handler.instance.status == expected_status |
| assert handler.instance.reason == expected_reason |
| |
| if expected_called_missing_case: |
| mocked_instance.add_missing_case_status.assert_called_once_with( |
| 'blocked' |
| ) |
| |
| |
| def test_qemuhandler_thread_get_fifo_names(): |
| fifo_fn = 'dummy' |
| |
| fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) |
| |
| assert fifo_in == 'dummy.in' |
| assert fifo_out == 'dummy.out' |
| |
| TESTDATA_22 = [ |
| (False, False), |
| (False, True), |
| (True, False), |
| (True, True), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'fifo_in_exists, fifo_out_exists', |
| TESTDATA_22, |
| ids=['both missing', 'out exists', 'in exists', 'both exist'] |
| ) |
| def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists): |
| def mock_exists(path): |
| if path == 'fifo.in': |
| return fifo_in_exists |
| elif path == 'fifo.out': |
| return fifo_out_exists |
| else: |
| raise ValueError('Unexpected path in mock of os.path.exists') |
| |
| unlink_mock = mock.Mock() |
| exists_mock = mock.Mock(side_effect=mock_exists) |
| mkfifo_mock = mock.Mock() |
| |
| fifo_in = 'fifo.in' |
| fifo_out = 'fifo.out' |
| logfile = 'log.file' |
| |
| with mock.patch('os.unlink', unlink_mock), \ |
| mock.patch('os.mkfifo', mkfifo_mock), \ |
| mock.patch('os.path.exists', exists_mock), \ |
| mock.patch('builtins.open', mock.mock_open()) as open_mock: |
| _, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile) |
| |
| open_mock.assert_has_calls([ |
| mock.call('fifo.in', 'wb'), |
| mock.call('fifo.out', 'rb', buffering=0), |
| mock.call('log.file', 'wt'), |
| ]) |
| |
| if fifo_in_exists: |
| unlink_mock.assert_any_call('fifo.in') |
| |
| if fifo_out_exists: |
| unlink_mock.assert_any_call('fifo.out') |
| |
| |
| TESTDATA_23 = [ |
| (False, False), |
| (True, True), |
| (True, False) |
| ] |
| |
| @pytest.mark.parametrize( |
| 'is_pid, is_lookup_error', |
| TESTDATA_23, |
| ids=['pid missing', 'pid lookup error', 'pid ok'] |
| ) |
| def test_qemuhandler_thread_close_files(is_pid, is_lookup_error): |
| is_process_killed = {} |
| |
| def mock_kill(pid, sig): |
| if is_lookup_error: |
| raise ProcessLookupError(f'Couldn\'t find pid: {pid}.') |
| elif sig == signal.SIGTERM: |
| is_process_killed[pid] = True |
| |
| unlink_mock = mock.Mock() |
| kill_mock = mock.Mock(side_effect=mock_kill) |
| |
| fifo_in = 'fifo.in' |
| fifo_out = 'fifo.out' |
| pid = 12345 if is_pid else None |
| out_fp = mock.Mock() |
| in_fp = mock.Mock() |
| log_out_fp = mock.Mock() |
| |
| with mock.patch('os.unlink', unlink_mock), \ |
| mock.patch('os.kill', kill_mock): |
| QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, |
| in_fp, log_out_fp) |
| |
| out_fp.close.assert_called_once() |
| in_fp.close.assert_called_once() |
| log_out_fp.close.assert_called_once() |
| |
| unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')]) |
| |
| if is_pid and not is_lookup_error: |
| assert is_process_killed[pid] |
| |
| |
| TESTDATA_24 = [ |
| ('timeout', 'failed', 'Timeout'), |
| ('failed', 'failed', 'Failed'), |
| ('unexpected eof', 'failed', 'unexpected eof'), |
| ('unexpected byte', 'failed', 'unexpected byte'), |
| (None, None, 'Unknown'), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'out_state, expected_status, expected_reason', |
| TESTDATA_24, |
| ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown'] |
| ) |
| def test_qemuhandler_thread_update_instance_info( |
| mocked_instance, |
| out_state, |
| expected_status, |
| expected_reason |
| ): |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler_time = 59 |
| |
| QEMUHandler._thread_update_instance_info(handler, handler_time, out_state) |
| |
| assert handler.instance.execution_time == handler_time |
| |
| assert handler.instance.status == expected_status |
| assert handler.instance.reason == expected_reason |
| |
| |
| TESTDATA_25 = [ |
| ( |
| ('1\n' * 60).encode('utf-8'), |
| 60, |
| 1, |
| [None] * 60 + ['success'] * 6, |
| 1000, |
| False, |
| 'timeout', |
| [mock.call('1\n'), mock.call('1\n')] |
| ), |
| ( |
| ('1\n' * 60).encode('utf-8'), |
| 60, |
| -1, |
| [None] * 60 + ['success'] * 30, |
| 100, |
| False, |
| 'failed', |
| [mock.call('1\n'), mock.call('1\n')] |
| ), |
| ( |
| b'', |
| 60, |
| 1, |
| ['success'] * 3, |
| 100, |
| False, |
| 'unexpected eof', |
| [] |
| ), |
| ( |
| b'\x81', |
| 60, |
| 1, |
| ['success'] * 3, |
| 100, |
| False, |
| 'unexpected byte', |
| [] |
| ), |
| ( |
| '1\n2\n3\n4\n5\n'.encode('utf-8'), |
| 600, |
| 1, |
| [None] * 3 + ['success'] * 7, |
| 100, |
| False, |
| 'success', |
| [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] |
| ), |
| ( |
| '1\n2\n3\n4\n5\n'.encode('utf-8'), |
| 600, |
| 0, |
| [None] * 3 + ['success'] * 7, |
| 100, |
| False, |
| 'timeout', |
| [mock.call('1\n'), mock.call('2\n')] |
| ), |
| ( |
| '1\n2\n3\n4\n5\n'.encode('utf-8'), |
| 60, |
| 1, |
| [None] * 3 + ['success'] * 7, |
| (n for n in [100, 100, 10000]), |
| True, |
| 'success', |
| [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] |
| ), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'content, timeout, pid, harness_states, cputime, capture_coverage,' \ |
| ' expected_out_state, expected_log_calls', |
| TESTDATA_25, |
| ids=[ |
| 'timeout', |
| 'harness failed', |
| 'unexpected eof', |
| 'unexpected byte', |
| 'harness success', |
| 'timeout by pid=0', |
| 'capture_coverage' |
| ] |
| ) |
| def test_qemuhandler_thread( |
| mocked_instance, |
| faux_timer, |
| content, |
| timeout, |
| pid, |
| harness_states, |
| cputime, |
| capture_coverage, |
| expected_out_state, |
| expected_log_calls |
| ): |
| def mock_cputime(pid): |
| if pid > 0: |
| return cputime if isinstance(cputime, int) else next(cputime) |
| else: |
| raise ProcessLookupError() |
| |
| type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout) |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler.results = {} |
| handler.ignore_unexpected_eof = False |
| handler.pid_fn = 'pid_fn' |
| handler.fifo_fn = 'fifo_fn' |
| handler.options = mock.Mock(timeout_multiplier=1) |
| |
| def mocked_open(filename, *args, **kwargs): |
| if filename == handler.pid_fn: |
| contents = str(pid).encode('utf-8') |
| elif filename == handler.fifo_fn + '.out': |
| contents = content |
| else: |
| contents = b'' |
| |
| file_object = mock.mock_open(read_data=contents).return_value |
| file_object.__iter__.return_value = contents.splitlines(True) |
| return file_object |
| |
| harness = mock.Mock(capture_coverage=capture_coverage, handle=print) |
| type(harness).state = mock.PropertyMock(side_effect=harness_states) |
| |
| p = mock.Mock() |
| p.poll = mock.Mock( |
| side_effect=itertools.cycle([True, True, True, True, False]) |
| ) |
| |
| mock_thread_get_fifo_names = mock.Mock( |
| return_value=('fifo_fn.in', 'fifo_fn.out') |
| ) |
| log_fp_mock = mock.Mock() |
| in_fp_mock = mocked_open('fifo_fn.out') |
| out_fp_mock = mock.Mock() |
| mock_thread_open_files = mock.Mock( |
| return_value=(out_fp_mock, in_fp_mock, log_fp_mock) |
| ) |
| mock_thread_close_files = mock.Mock() |
| mock_thread_update_instance_info = mock.Mock() |
| |
| with mock.patch('time.time', side_effect=faux_timer.time), \ |
| mock.patch('builtins.open', new=mocked_open), \ |
| mock.patch('select.poll', return_value=p), \ |
| mock.patch('os.path.exists', return_value=True), \ |
| mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time', |
| mock_cputime), \ |
| mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names', |
| mock_thread_get_fifo_names), \ |
| mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files', |
| mock_thread_open_files), \ |
| mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files', |
| mock_thread_close_files), \ |
| mock.patch('twisterlib.handlers.QEMUHandler.' \ |
| '_thread_update_instance_info', |
| mock_thread_update_instance_info): |
| QEMUHandler._thread( |
| handler, |
| handler.get_test_timeout(), |
| handler.build_dir, |
| handler.log, |
| handler.fifo_fn, |
| handler.pid_fn, |
| handler.results, |
| harness, |
| handler.ignore_unexpected_eof |
| ) |
| |
| mock_thread_update_instance_info.assert_called_once_with( |
| handler, |
| mock.ANY, |
| expected_out_state |
| ) |
| |
| log_fp_mock.write.assert_has_calls(expected_log_calls) |
| |
| |
| TESTDATA_26 = [ |
| (True, False, None, True, |
| ['No timeout, return code from QEMU (1): 1', |
| 'return code from QEMU (1): 1']), |
| (False, True, 'passed', True, ['return code from QEMU (1): 0']), |
| (False, True, 'failed', False, ['return code from QEMU (None): 1']), |
| ] |
| |
| @pytest.mark.parametrize( |
| 'isatty, do_timeout, harness_state, exists_pid_fn, expected_logs', |
| TESTDATA_26, |
| ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn'] |
| ) |
| def test_qemuhandler_handle( |
| mocked_instance, |
| caplog, |
| tmp_path, |
| isatty, |
| do_timeout, |
| harness_state, |
| exists_pid_fn, |
| expected_logs |
| ): |
| def mock_wait(*args, **kwargs): |
| if do_timeout: |
| raise TimeoutExpired('dummy cmd', 'dummyamount') |
| |
| mock_process = mock.Mock(pid=0, returncode=1) |
| mock_process.communicate = mock.Mock( |
| return_value=(mock.Mock(), mock.Mock()) |
| ) |
| mock_process.wait = mock.Mock(side_effect=mock_wait) |
| |
| handler = QEMUHandler(mocked_instance, 'build') |
| |
| def mock_path_exists(name, *args, **kwargs): |
| return exists_pid_fn |
| |
| def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None): |
| return mock.Mock( |
| __enter__=mock.Mock(return_value=mock_process), |
| __exit__=mock.Mock(return_value=None), |
| communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock())) |
| ) |
| |
| def mock_thread(name=None, target=None, daemon=None, args=None): |
| return mock.Mock() |
| |
| def mock_filenames(sysbuild_build_dir): |
| handler.fifo_fn = os.path.join('dummy', 'qemu-fifo') |
| handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid') |
| handler.log_fn = os.path.join('dummy', 'log') |
| |
| harness = mock.Mock(state=harness_state) |
| handler_options_west_flash = [] |
| |
| sysbuild_build_dir = os.path.join('sysbuild', 'dummydir') |
| command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run'] |
| |
| handler.options = mock.Mock( |
| timeout_multiplier=1, |
| west_flash=handler_options_west_flash, |
| west_runner=None |
| ) |
| handler.run_custom_script = mock.Mock(return_value=None) |
| handler.make_device_available = mock.Mock(return_value=None) |
| handler._final_handle_actions = mock.Mock(return_value=None) |
| handler._create_command = mock.Mock(return_value=command) |
| handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames) |
| handler._get_sysbuild_build_dir = mock.Mock(return_value=sysbuild_build_dir) |
| handler.terminate = mock.Mock() |
| |
| unlink_mock = mock.Mock() |
| |
| with mock.patch('subprocess.Popen', side_effect=mock_popen), \ |
| mock.patch('builtins.open', mock.mock_open(read_data='1')), \ |
| mock.patch('threading.Thread', side_effect=mock_thread), \ |
| mock.patch('os.path.exists', side_effect=mock_path_exists), \ |
| mock.patch('os.unlink', unlink_mock), \ |
| mock.patch('sys.stdout.isatty', return_value=isatty): |
| handler.handle(harness) |
| |
| assert all([expected_log in caplog.text for expected_log in expected_logs]) |
| |
| |
| def test_qemuhandler_get_fifo(mocked_instance): |
| handler = QEMUHandler(mocked_instance, 'build') |
| handler.fifo_fn = 'fifo_fn' |
| |
| result = handler.get_fifo() |
| |
| assert result == 'fifo_fn' |