blob: 284b96f38d4a4b5919010c53ee7351314f85c17f [file] [log] [blame]
# Copyright (c) 2024 Vestas Wind Systems A/S
#
# SPDX-License-Identifier: Apache-2.0
"""
Zephyr CAN shell module support for providing a python-can bus interface for testing.
"""
import re
import logging
from typing import Optional, Tuple
from can import BusABC, CanProtocol, Message
from can.exceptions import CanInitializationError, CanOperationError
from can.typechecking import CanFilters
from twister_harness import DeviceAdapter, Shell
logger = logging.getLogger(__name__)
class CanShellBus(BusABC): # pylint: disable=abstract-method
"""
A CAN interface using the Zephyr CAN shell module.
"""
def __init__(self, dut: DeviceAdapter, shell: Shell, channel: str,
can_filters: Optional[CanFilters] = None, **kwargs) -> None:
self._dut = dut
self._shell = shell
self._device = channel
self._is_filtered = False
self._filter_ids = []
self.channel_info = f'Zephyr CAN shell, device "{self._device}"'
mode = 'normal'
if 'fd' in self._get_capabilities():
self._can_protocol = CanProtocol.CAN_FD
mode += ' fd'
else:
self._can_protocol = CanProtocol.CAN_20
self._set_mode(mode)
self._start()
super().__init__(channel=channel, can_filters=can_filters, **kwargs)
def _retval(self):
"""Get return value of last shell command."""
return int(self._shell.get_filtered_output(self._shell.exec_command('retval'))[0])
def _get_capabilities(self) -> list[str]:
cmd = f'can show {self._device}'
lines = self._shell.get_filtered_output(self._shell.exec_command(cmd))
regex_compiled = re.compile(r'capabilities:\s+(?P<caps>.*)')
for line in lines:
m = regex_compiled.match(line)
if m:
return m.group('caps').split()
raise CanOperationError('capabilities not found')
def _set_mode(self, mode: str) -> None:
self._shell.exec_command(f'can mode {self._device} {mode}')
retval = self._retval()
if retval != 0:
raise CanOperationError(f'failed to set mode "{mode}" (err {retval})')
def _start(self):
self._shell.exec_command(f'can start {self._device}')
retval = self._retval()
if retval != 0:
raise CanInitializationError(f'failed to start (err {retval})')
def _stop(self):
self._shell.exec_command(f'can stop {self._device}')
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
logger.debug('sending: %s', msg)
cmd = f'can send {self._device}'
cmd += ' -e' if msg.is_extended_id else ''
cmd += ' -r' if msg.is_remote_frame else ''
cmd += ' -f' if msg.is_fd else ''
cmd += ' -b' if msg.bitrate_switch else ''
if msg.is_extended_id:
cmd += f' {msg.arbitration_id:08x}'
else:
cmd += f' {msg.arbitration_id:03x}'
if msg.data:
cmd += ' ' + msg.data.hex(' ', 1)
lines = self._shell.exec_command(cmd)
regex_compiled = re.compile(r'enqueuing\s+CAN\s+frame\s+#(?P<id>\d+)')
frame_num = None
for line in lines:
m = regex_compiled.match(line)
if m:
frame_num = m.group('id')
break
if frame_num is None:
raise CanOperationError('frame not enqueued')
tx_regex = r'CAN\s+frame\s+#' + frame_num + r'\s+successfully\s+sent'
self._dut.readlines_until(regex=tx_regex, timeout=timeout)
def _add_filter(self, can_id: int, can_mask: int, extended: bool) -> None:
"""Add RX filter."""
cmd = f'can filter add {self._device}'
cmd += ' -e' if extended else ''
if extended:
cmd += f' {can_id:08x}'
cmd += f' {can_mask:08x}'
else:
cmd += f' {can_id:03x}'
cmd += f' {can_mask:03x}'
lines = self._shell.exec_command(cmd)
regex_compiled = re.compile(r'filter\s+ID:\s+(?P<id>\d+)')
for line in lines:
m = regex_compiled.match(line)
if m:
filter_id = int(m.group('id'))
self._filter_ids.append(filter_id)
return
raise CanOperationError('filter_id not found')
def _remove_filter(self, filter_id: int) -> None:
"""Remove RX filter."""
if filter_id in self._filter_ids:
self._filter_ids.remove(filter_id)
self._shell.exec_command(f'can filter remove {self._device} {filter_id}')
retval = self._retval()
if retval != 0:
raise CanOperationError(f'failed to remove filter ID {filter_id} (err {retval})')
def _remove_all_filters(self) -> None:
"""Remove all RX filters."""
for filter_id in self._filter_ids[:]:
self._remove_filter(filter_id)
def _apply_filters(self, filters: Optional[CanFilters]) -> None:
self._remove_all_filters()
if filters:
self._is_filtered = True
else:
# Accept all frames if no hardware filters provided
filters = [
{'can_id': 0x0, 'can_mask': 0x0},
{'can_id': 0x0, 'can_mask': 0x0, 'extended': True}
]
self._is_filtered = False
for can_filter in filters:
can_id = can_filter['can_id']
can_mask = can_filter['can_mask']
extended = can_filter['extended'] if 'extended' in can_filter else False
self._add_filter(can_id, can_mask, extended)
def _recv_internal(self, timeout: Optional[float]) -> Tuple[Optional[Message], bool]:
frame_regex = r'.*' + re.escape(self._device) + \
r'\s+(?P<brs>\S)(?P<esi>\S)\s+(?P<can_id>\d+)\s+\[(?P<dlc>\d+)\]\s*(?P<data>[a-z0-9 ]*)'
lines = self._dut.readlines_until(regex=frame_regex, timeout=timeout)
msg = None
regex_compiled = re.compile(frame_regex)
for line in lines:
m = regex_compiled.match(line)
if m:
can_id = int(m.group('can_id'), 16)
ext = len(m.group('can_id')) == 8
dlc = int(m.group('dlc'))
fd = len(m.group('dlc')) == 2
brs = m.group('brs') == 'B'
esi = m.group('esi') == 'P'
data = bytearray.fromhex(m.group('data'))
msg = Message(arbitration_id=can_id,is_extended_id=ext,
data=data, dlc=dlc,
is_fd=fd, bitrate_switch=brs, error_state_indicator=esi,
channel=self._device, check=True)
logger.debug('received: %s', msg)
return msg, self._is_filtered
def shutdown(self) -> None:
if not self._is_shutdown:
super().shutdown()
self._stop()
self._remove_all_filters()