| # Copyright (c) 2024 Vestas Wind Systems A/S |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| Test suites for testing Zephyr CAN <=> host CAN. |
| """ |
| |
| import logging |
| import pytest |
| |
| import can |
| from can import BusABC, CanProtocol |
| |
| # RX/TX timeout in seconds |
| TIMEOUT = 1.0 |
| |
| logger = logging.getLogger(__name__) |
| |
| @pytest.mark.parametrize('msg', [ |
| pytest.param( |
| can.Message(arbitration_id=0x10, |
| is_extended_id=False), |
| id='std_id_dlc_0' |
| ), |
| pytest.param( |
| can.Message(arbitration_id=0x20, |
| data=[0xaa, 0xbb, 0xcc, 0xdd], |
| is_extended_id=False), |
| id='std_id_dlc_4' |
| ), |
| pytest.param( |
| can.Message(arbitration_id=0x30, |
| data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff], |
| is_extended_id=True), |
| id='ext_id_dlc_8' |
| ), |
| pytest.param( |
| can.Message(arbitration_id=0x40, |
| data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, |
| 0x10, 0x11], |
| is_fd=True, is_extended_id=False), |
| id='std_id_fdf_dlc_9' |
| ), |
| pytest.param( |
| can.Message(arbitration_id=0x50, |
| data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, |
| 0x10, 0x11], |
| is_fd=True, bitrate_switch=True, is_extended_id=False), |
| id='std_id_fdf_brs_dlc_9' |
| ), |
| ]) |
| class TestCanRxTx(): |
| """ |
| Class for testing CAN RX/TX between Zephyr DUT and host. |
| """ |
| |
| @staticmethod |
| def check_rx(tx: can.Message, rx: can.Message) -> None: |
| """Check if received message matches transmitted message.""" |
| # pylint: disable-next=unused-variable |
| __tracebackhide__ = True |
| |
| if rx is None: |
| pytest.fail('no message received') |
| |
| if not tx.equals(rx, timestamp_delta=None, check_channel=False, |
| check_direction=False): |
| pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"') |
| |
| @staticmethod |
| def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: |
| """Skip test if message format is not supported by both DUT and host.""" |
| if msg.is_fd: |
| if can_dut.protocol == CanProtocol.CAN_20: |
| pytest.skip('CAN FD not supported by DUT') |
| if can_host.protocol == CanProtocol.CAN_20: |
| pytest.skip('CAN FD not supported by host') |
| |
| def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: |
| """Test DUT to host communication.""" |
| self.skip_if_unsupported(can_dut, can_host, msg) |
| |
| can_dut.send(msg, timeout=TIMEOUT) |
| rx = can_host.recv(timeout=TIMEOUT) |
| self.check_rx(msg, rx) |
| |
| def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: |
| """Test host to DUT communication.""" |
| self.skip_if_unsupported(can_dut, can_host, msg) |
| |
| can_host.send(msg, timeout=TIMEOUT) |
| rx = can_dut.recv(timeout=TIMEOUT) |
| self.check_rx(msg, rx) |