| #!/usr/bin/env python |
| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Tests for the command line interface""" |
| |
| import os |
| import signal |
| import subprocess |
| import sys |
| import time |
| import unittest |
| |
| from pathlib import Path |
| |
| from mock_emu_frontend import _mock_emu |
| from config_helper import ConfigHelper |
| |
| |
| # TODO: b/301382004 - The Python Pigweed package install (into python-venv) |
| # races with running this test and there is no way to add that package as a test |
| # depedency without creating circular depedencies. This means we can't rely on |
| # using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. |
| # |
| # Run the CLI directly instead of going through pw cli. |
| _cli_path = Path( |
| os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py') |
| ).resolve() |
| |
| |
| class TestCli(ConfigHelper): |
| """Test non-interactive commands""" |
| |
| _config = { |
| 'emulators': { |
| 'mock-emu': { |
| 'launcher': 'mock_emu_frontend.MockEmuLauncher', |
| 'connector': 'mock_emu_frontend.MockEmuConnector', |
| } |
| }, |
| 'mock-emu': { |
| 'tcp_channel': True, |
| 'gdb_channel': True, |
| }, |
| 'gdb': _mock_emu + ['--exit', '--'], |
| 'targets': {'test-target': {'mock-emu': {}}}, |
| } |
| |
| def _build_cmd(self, args: list[str]) -> list[str]: |
| cmd = [ |
| 'python', |
| str(_cli_path), |
| '--working-dir', |
| self._wdir.name, |
| '--config', |
| self._config_file, |
| ] + args |
| return cmd |
| |
| def _run(self, args: list[str], **kwargs) -> subprocess.CompletedProcess: |
| """Run the CLI and wait for completion""" |
| return subprocess.run(self._build_cmd(args), **kwargs) |
| |
| def _popen(self, args: list[str], **kwargs) -> subprocess.Popen: |
| """Run the CLI in the background""" |
| return subprocess.Popen(self._build_cmd(args), **kwargs) |
| |
| |
| class TestNonInteractive(TestCli): |
| """Test non interactive commands.""" |
| |
| def setUp(self) -> None: |
| super().setUp() |
| self.assertEqual(self._run(['start', 'test-target']).returncode, 0) |
| |
| def tearDown(self) -> None: |
| self.assertEqual(self._run(['stop']).returncode, 0) |
| super().tearDown() |
| |
| def test_already_running(self) -> None: |
| self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0) |
| |
| def test_gdb_cmds(self) -> None: |
| status = self._run( |
| ['gdb-cmds', 'show version'], |
| ) |
| self.assertEqual(status.returncode, 0) |
| |
| def test_prop_ls(self) -> None: |
| status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE) |
| self.assertEqual(status.returncode, 0) |
| self.assertTrue('prop1' in status.stdout.decode('ascii')) |
| status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE) |
| self.assertNotEqual(status.returncode, 0) |
| |
| def test_prop_get(self) -> None: |
| status = self._run( |
| ['prop-get', 'invalid path', 'prop1'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertNotEqual(status.returncode, 0) |
| status = self._run( |
| ['prop-get', 'path1', 'invalid prop'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertNotEqual(status.returncode, 0) |
| status = self._run( |
| ['prop-get', 'path1', 'prop1'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertEqual(status.returncode, 0) |
| self.assertTrue('val1' in status.stdout.decode('ascii')) |
| |
| def test_prop_set(self) -> None: |
| status = self._run( |
| ['prop-set', 'invalid path', 'prop1', 'v'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertNotEqual(status.returncode, 0) |
| status = self._run( |
| ['prop-set', 'path1', 'invalid prop', 'v'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertNotEqual(status.returncode, 0) |
| status = self._run( |
| ['prop-set', 'path1', 'prop1', 'value'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertEqual(status.returncode, 0) |
| status = self._run( |
| ['prop-get', 'path1', 'prop1'], |
| stdout=subprocess.PIPE, |
| ) |
| self.assertEqual(status.returncode, 0) |
| self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout) |
| |
| def test_reset(self) -> None: |
| self.assertEqual(self._run(['reset']).returncode, 0) |
| self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset'))) |
| |
| def test_load(self) -> None: |
| self.assertEqual(self._run(['load', 'executable']).returncode, 0) |
| |
| def test_resume(self) -> None: |
| self.assertEqual(self._run(['resume']).returncode, 0) |
| |
| |
| class TestForeground(TestCli): |
| """Test starting in foreground""" |
| |
| def _test_common(self, cmd) -> None: |
| # Run the CLI process in a new session so that we can terminate both the |
| # CLI and the mock emulator it spawns in the foreground. |
| args = {} |
| if sys.platform == 'win32': |
| args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP |
| else: |
| args['start_new_session'] = True |
| proc = self._popen(cmd, stdout=subprocess.PIPE, **args) |
| assert proc.stdout |
| output = proc.stdout.readline() |
| self.assertTrue( |
| 'starting mock emulator' in output.decode('utf-8'), |
| output.decode('utf-8'), |
| ) |
| if sys.platform == 'win32': |
| # See https://bugs.python.org/issue26350 |
| os.kill(proc.pid, signal.CTRL_BREAK_EVENT) |
| else: |
| os.kill(-proc.pid, signal.SIGTERM) |
| proc.wait() |
| proc.stdout.close() |
| |
| def test_foreground(self) -> None: |
| self._test_common(['start', '--foreground', 'test-target']) |
| |
| def test_debug(self) -> None: |
| self._test_common(['start', '--debug', 'test-target']) |
| |
| |
| class TestInteractive(TestCli): |
| """Test interactive commands""" |
| |
| def setUp(self) -> None: |
| super().setUp() |
| self.assertEqual(self._run(['start', 'test-target']).returncode, 0) |
| |
| def tearDown(self) -> None: |
| self.assertEqual(self._run(['stop']).returncode, 0) |
| super().tearDown() |
| |
| @staticmethod |
| def _read_nonblocking(fd: int, size: int) -> bytes: |
| try: |
| return os.read(fd, size) |
| except BlockingIOError: |
| return b'' |
| |
| def test_term(self) -> None: |
| """Test the pw emu term command""" |
| |
| if sys.platform == 'win32': |
| self.skipTest('pty not supported on win32') |
| |
| # pylint: disable=import-outside-toplevel |
| # Can't import pty on win32. |
| import pty |
| |
| # pylint: disable=no-member |
| # Avoid pylint false positive on win32. |
| pid, fd = pty.fork() |
| if pid == 0: |
| status = self._run(['term', 'tcp']) |
| # pylint: disable=protected-access |
| # Use os._exit instead of os.exit after fork. |
| os._exit(status.returncode) |
| else: |
| expected = '--- Miniterm on tcp ---' |
| |
| # Read the expected string with a timeout. |
| os.set_blocking(fd, False) |
| deadline = time.monotonic() + 5 |
| data = self._read_nonblocking(fd, len(expected)) |
| while len(data) < len(expected): |
| time.sleep(0.1) |
| data += self._read_nonblocking(fd, len(expected) - len(data)) |
| if time.monotonic() > deadline: |
| break |
| self.assertTrue( |
| expected in data.decode('ascii'), |
| data + self._read_nonblocking(fd, 100), |
| ) |
| |
| # send CTRL + ']' to terminate miniterm |
| os.write(fd, b'\x1d') |
| |
| # wait for the process to exit, with a timeout |
| deadline = time.monotonic() + 5 |
| wait_pid, ret = os.waitpid(pid, os.WNOHANG) |
| while wait_pid == 0: |
| time.sleep(0.1) |
| # Discard input to avoid writer hang on MacOS, |
| # see https://github.com/python/cpython/issues/97001. |
| try: |
| self._read_nonblocking(fd, 100) |
| except OSError: |
| # Avoid read errors when the child pair of the pty |
| # closes when the child terminates. |
| pass |
| wait_pid, ret = os.waitpid(pid, os.WNOHANG) |
| if time.monotonic() > deadline: |
| break |
| self.assertEqual(wait_pid, pid) |
| self.assertEqual(ret, 0) |
| |
| def test_gdb(self) -> None: |
| res = self._run(['gdb', '-e', 'executable'], stdout=subprocess.PIPE) |
| self.assertEqual(res.returncode, 0) |
| output = res.stdout.decode('ascii') |
| self.assertTrue('target remote' in output, output) |
| self.assertTrue('executable' in output, output) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |