blob: 3a91813f75e5a31ead18abf925ed4a843ac7ac84 [file] [log] [blame]
# Copyright (c) 2024 Vestas Wind Systems A/S
#
# SPDX-License-Identifier: Apache-2.0
"""
Test suites for testing Zephyr CAN <=> host CAN.
"""
import logging
import pytest
import can
from can import BusABC, CanProtocol
# RX/TX timeout in seconds
TIMEOUT = 1.0
logger = logging.getLogger(__name__)
@pytest.mark.parametrize('msg', [
pytest.param(
can.Message(arbitration_id=0x10,
is_extended_id=False),
id='std_id_dlc_0'
),
pytest.param(
can.Message(arbitration_id=0x20,
data=[0xaa, 0xbb, 0xcc, 0xdd],
is_extended_id=False),
id='std_id_dlc_4'
),
pytest.param(
can.Message(arbitration_id=0x30,
data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff],
is_extended_id=True),
id='ext_id_dlc_8'
),
pytest.param(
can.Message(arbitration_id=0x40,
data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
0x10, 0x11],
is_fd=True, is_extended_id=False),
id='std_id_fdf_dlc_9'
),
pytest.param(
can.Message(arbitration_id=0x50,
data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
0x10, 0x11],
is_fd=True, bitrate_switch=True, is_extended_id=False),
id='std_id_fdf_brs_dlc_9'
),
])
class TestCanRxTx():
"""
Class for testing CAN RX/TX between Zephyr DUT and host.
"""
@staticmethod
def check_rx(tx: can.Message, rx: can.Message) -> None:
"""Check if received message matches transmitted message."""
# pylint: disable-next=unused-variable
__tracebackhide__ = True
if rx is None:
pytest.fail('no message received')
if not tx.equals(rx, timestamp_delta=None, check_channel=False,
check_direction=False):
pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"')
@staticmethod
def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
"""Skip test if message format is not supported by both DUT and host."""
if msg.is_fd:
if can_dut.protocol == CanProtocol.CAN_20:
pytest.skip('CAN FD not supported by DUT')
if can_host.protocol == CanProtocol.CAN_20:
pytest.skip('CAN FD not supported by host')
def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
"""Test DUT to host communication."""
self.skip_if_unsupported(can_dut, can_host, msg)
can_dut.send(msg, timeout=TIMEOUT)
rx = can_host.recv(timeout=TIMEOUT)
self.check_rx(msg, rx)
def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
"""Test host to DUT communication."""
self.skip_if_unsupported(can_dut, can_host, msg)
can_host.send(msg, timeout=TIMEOUT)
rx = can_dut.recv(timeout=TIMEOUT)
self.check_rx(msg, rx)