| #!/usr/bin/env python |
| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """QEMU emulator tests.""" |
| |
| import json |
| import os |
| import socket |
| import sys |
| import tempfile |
| import time |
| import unittest |
| |
| from pathlib import Path |
| from typing import Any |
| |
| from pw_emu.core import InvalidChannelName, InvalidChannelType |
| from config_helper import check_prog, ConfigHelperWithEmulator |
| |
| |
| # TODO: b/301382004 - The Python Pigweed package install (into python-venv) |
| # races with running this test and there is no way to add that package as a test |
| # depedency without creating circular depedencies. This means we can't rely on |
| # using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. |
| # |
| # run the arm_gdb.py wrapper directly |
| _arm_none_eabi_gdb_path = Path( |
| os.path.join( |
| os.environ['PW_ROOT'], |
| 'pw_env_setup', |
| 'py', |
| 'pw_env_setup', |
| 'entry_points', |
| 'arm_gdb.py', |
| ) |
| ).resolve() |
| |
| |
| class TestQemu(ConfigHelperWithEmulator): |
| """Tests for a valid qemu configuration.""" |
| |
| _config = { |
| 'gdb': ['python', str(_arm_none_eabi_gdb_path)], |
| 'qemu': { |
| 'executable': 'qemu-system-arm', |
| }, |
| 'targets': { |
| 'test-target': { |
| 'ignore1': None, |
| 'qemu': { |
| 'machine': 'lm3s6965evb', |
| 'channels': { |
| 'chardevs': { |
| 'test_uart': { |
| 'id': 'serial0', |
| } |
| } |
| }, |
| }, |
| 'ignore2': None, |
| } |
| }, |
| } |
| |
| def setUp(self) -> None: |
| super().setUp() |
| # No image so start paused to avoid crashing. |
| self._emu.start(target='test-target', pause=True) |
| |
| def tearDown(self) -> None: |
| self._emu.stop() |
| super().tearDown() |
| |
| def test_running(self) -> None: |
| self.assertTrue(self._emu.running()) |
| |
| def test_list_properties(self) -> None: |
| self.assertIsNotNone(self._emu.list_properties('/machine')) |
| |
| def test_get_property(self) -> None: |
| self.assertEqual( |
| self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine' |
| ) |
| |
| def test_set_property(self) -> None: |
| self._emu.set_property('/machine', 'graphics', False) |
| self.assertFalse(self._emu.get_property('/machine', 'graphics')) |
| |
| def test_bad_channel_name(self) -> None: |
| with self.assertRaises(InvalidChannelName): |
| self._emu.get_channel_addr('serial1') |
| |
| def get_reg(self, addr: int) -> bytes: |
| temp = tempfile.NamedTemporaryFile(delete=False) |
| temp.close() |
| |
| res = self._emu.run_gdb_cmds( |
| [ |
| f'dump val {temp.name} *(char*){addr}', |
| 'disconnect', |
| ] |
| ) |
| self.assertEqual(res.returncode, 0, res.stderr.decode('ascii')) |
| |
| with open(temp.name, 'rb') as file: |
| ret = file.read(1) |
| |
| self.assertNotEqual(ret, b'', res.stderr.decode('ascii')) |
| |
| os.unlink(temp.name) |
| |
| return ret |
| |
| def poll_data(self, timeout: int) -> bytes | None: |
| uartris = 0x4000C03C |
| uartrd = 0x4000C000 |
| |
| deadline = time.monotonic() + timeout |
| while self.get_reg(uartris) == b'\x00': |
| time.sleep(0.1) |
| if time.monotonic() > deadline: |
| return None |
| return self.get_reg(uartrd) |
| |
| def test_channel_stream(self) -> None: |
| ok, msg = check_prog('arm-none-eabi-gdb') |
| if not ok: |
| self.skipTest(msg) |
| |
| stream = self._emu.get_channel_stream('test_uart') |
| stream.write('test\n'.encode('ascii')) |
| |
| self.assertEqual(self.poll_data(5), b't') |
| self.assertEqual(self.poll_data(5), b'e') |
| self.assertEqual(self.poll_data(5), b's') |
| self.assertEqual(self.poll_data(5), b't') |
| |
| def test_gdb(self) -> None: |
| self._emu.run_gdb_cmds(['c']) |
| deadline = time.monotonic() + 5 |
| while self._emu.running(): |
| if time.monotonic() > deadline: |
| return |
| self.assertFalse(self._emu.running()) |
| |
| |
| class TestQemuChannelsTcp(TestQemu): |
| """Tests for configurations using TCP channels.""" |
| |
| _config: dict[str, Any] = {} |
| _config.update(json.loads(json.dumps(TestQemu._config))) |
| _config['qemu']['channels'] = {'type': 'tcp'} |
| |
| def test_get_channel_addr(self) -> None: |
| host, port = self._emu.get_channel_addr('test_uart') |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.connect((host, port)) |
| sock.close() |
| |
| |
| class TestQemuChannelsPty(TestQemu): |
| """Tests for configurations using PTY channels.""" |
| |
| _config: dict[str, Any] = {} |
| _config.update(json.loads(json.dumps(TestQemu._config))) |
| _config['qemu']['channels'] = {'type': 'pty'} |
| |
| def setUp(self): |
| if sys.platform == 'win32': |
| self.skipTest('pty not supported on win32') |
| super().setUp() |
| |
| def test_get_path(self) -> None: |
| self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart'))) |
| |
| |
| class TestQemuInvalidChannelType(ConfigHelperWithEmulator): |
| """Test invalid channel type configuration.""" |
| |
| _config = { |
| 'qemu': { |
| 'executable': 'qemu-system-arm', |
| 'channels': {'type': 'invalid'}, |
| }, |
| 'targets': { |
| 'test-target': { |
| 'qemu': { |
| 'machine': 'lm3s6965evb', |
| } |
| } |
| }, |
| } |
| |
| def test_start(self) -> None: |
| with self.assertRaises(InvalidChannelType): |
| self._emu.start('test-target', pause=True) |
| |
| |
| class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator): |
| """Test configuration with mixed channels types.""" |
| |
| _config = { |
| 'qemu': { |
| 'executable': 'qemu-system-arm', |
| }, |
| 'targets': { |
| 'test-target': { |
| 'qemu': { |
| 'machine': 'lm3s6965evb', |
| 'channels': { |
| 'chardevs': { |
| 'test_uart0': { |
| 'id': 'serial0', |
| }, |
| 'test_uart1': { |
| 'id': 'serial1', |
| 'type': 'tcp', |
| }, |
| 'test_uart2': { |
| 'id': 'serial2', |
| 'type': 'pty', |
| }, |
| } |
| }, |
| } |
| } |
| }, |
| } |
| |
| def setUp(self) -> None: |
| if sys.platform == 'win32': |
| self.skipTest('pty not supported on win32') |
| super().setUp() |
| # no image to run so start paused |
| self._emu.start('test-target', pause=True) |
| |
| def tearDown(self) -> None: |
| self._emu.stop() |
| super().tearDown() |
| |
| def test_uart0_addr(self) -> None: |
| host, port = self._emu.get_channel_addr('test_uart0') |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.connect((host, port)) |
| sock.close() |
| |
| def test_uart1_addr(self) -> None: |
| host, port = self._emu.get_channel_addr('test_uart1') |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.connect((host, port)) |
| sock.close() |
| |
| def test_uart2_path(self) -> None: |
| self.assertTrue( |
| os.path.exists(self._emu.get_channel_path('test_uart2')) |
| ) |
| |
| |
| def main() -> None: |
| ok, msg = check_prog('qemu-system-arm') |
| if not ok: |
| print(f'skipping tests: {msg}') |
| sys.exit(0) |
| |
| unittest.main() |
| |
| |
| if __name__ == '__main__': |
| main() |