blob: ff01a7d94bc4ae13dcdd57a7aa380837260b07b1 [file] [log] [blame]
#!/usr/bin/env python3
# vim: set syntax=python ts=4 :
#
# Copyright (c) 2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import os
from multiprocessing import Lock, Value
import re
import platform
import yaml
import scl
import logging
from pathlib import Path
from twisterlib.environment import ZEPHYR_BASE
try:
# Use the C LibYAML parser if available, rather than the Python parser.
# It's much faster.
from yaml import CSafeLoader as SafeLoader
from yaml import CDumper as Dumper
except ImportError:
from yaml import SafeLoader, Dumper
try:
from tabulate import tabulate
except ImportError:
print("Install tabulate python module with pip to use --device-testing option.")
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
class DUT(object):
def __init__(self,
id=None,
serial=None,
serial_baud=None,
platform=None,
product=None,
serial_pty=None,
connected=False,
runner_params=None,
pre_script=None,
post_script=None,
post_flash_script=None,
runner=None):
self.serial = serial
self.baud = serial_baud or 115200
self.platform = platform
self.serial_pty = serial_pty
self._counter = Value("i", 0)
self._available = Value("i", 1)
self.connected = connected
self.pre_script = pre_script
self.id = id
self.product = product
self.runner = runner
self.runner_params = runner_params
self.fixtures = []
self.post_flash_script = post_flash_script
self.post_script = post_script
self.pre_script = pre_script
self.probe_id = None
self.notes = None
self.lock = Lock()
self.match = False
@property
def available(self):
with self._available.get_lock():
return self._available.value
@available.setter
def available(self, value):
with self._available.get_lock():
self._available.value = value
@property
def counter(self):
with self._counter.get_lock():
return self._counter.value
@counter.setter
def counter(self, value):
with self._counter.get_lock():
self._counter.value = value
def to_dict(self):
d = {}
exclude = ['_available', '_counter', 'match']
v = vars(self)
for k in v.keys():
if k not in exclude and v[k]:
d[k] = v[k]
return d
def __repr__(self):
return f"<{self.platform} ({self.product}) on {self.serial}>"
class HardwareMap:
schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml")
manufacturer = [
'ARM',
'SEGGER',
'MBED',
'STMicroelectronics',
'Atmel Corp.',
'Texas Instruments',
'Silicon Labs',
'NXP Semiconductors',
'Microchip Technology Inc.',
'FTDI',
'Digilent'
]
runner_mapping = {
'pyocd': [
'DAPLink CMSIS-DAP',
'MBED CMSIS-DAP'
],
'jlink': [
'J-Link',
'J-Link OB'
],
'openocd': [
'STM32 STLink', '^XDS110.*', 'STLINK-V3'
],
'dediprog': [
'TTL232R-3V3',
'MCP2200 USB Serial Port Emulator'
]
}
def __init__(self, env=None):
self.detected = []
self.duts = []
self.options = env.options
def discover(self):
if self.options.generate_hardware_map:
self.scan(persistent=self.options.persistent_hardware_map)
self.save(self.options.generate_hardware_map)
return 0
if not self.options.device_testing and self.options.hardware_map:
self.load(self.options.hardware_map)
logger.info("Available devices:")
self.dump(connected_only=True)
return 0
if self.options.device_testing:
if self.options.hardware_map:
self.load(self.options.hardware_map)
if not self.options.platform:
self.options.platform = []
for d in self.duts:
if d.connected:
self.options.platform.append(d.platform)
elif self.options.device_serial or self.options.device_serial_pty:
if self.options.device_serial:
self.add_device(self.options.device_serial,
self.options.platform[0],
self.options.pre_script,
False,
baud=self.options.device_serial_baud
)
else:
self.add_device(self.options.device_serial_pty,
self.options.platform[0],
self.options.pre_script,
True)
# the fixtures given by twister command explicitly should be assigned to each DUT
if self.options.fixture:
for d in self.duts:
d.fixtures.extend(self.options.fixture)
return 1
def summary(self, selected_platforms):
print("\nHardware distribution summary:\n")
table = []
header = ['Board', 'ID', 'Counter']
for d in self.duts:
if d.connected and d.platform in selected_platforms:
row = [d.platform, d.id, d.counter]
table.append(row)
print(tabulate(table, headers=header, tablefmt="github"))
def add_device(self, serial, platform, pre_script, is_pty, baud=None):
device = DUT(platform=platform, connected=True, pre_script=pre_script, serial_baud=baud)
if is_pty:
device.serial_pty = serial
else:
device.serial = serial
self.duts.append(device)
def load(self, map_file):
hwm_schema = scl.yaml_load(self.schema_path)
duts = scl.yaml_load_verify(map_file, hwm_schema)
for dut in duts:
pre_script = dut.get('pre_script')
post_script = dut.get('post_script')
post_flash_script = dut.get('post_flash_script')
platform = dut.get('platform')
id = dut.get('id')
runner = dut.get('runner')
runner_params = dut.get('runner_params')
serial_pty = dut.get('serial_pty')
serial = dut.get('serial')
baud = dut.get('baud', None)
product = dut.get('product')
fixtures = dut.get('fixtures', [])
connected= dut.get('connected') and ((serial or serial_pty) is not None)
if not connected:
continue
new_dut = DUT(platform=platform,
product=product,
runner=runner,
runner_params=runner_params,
id=id,
serial_pty=serial_pty,
serial=serial,
serial_baud=baud,
connected=connected,
pre_script=pre_script,
post_script=post_script,
post_flash_script=post_flash_script)
new_dut.fixtures = fixtures
new_dut.counter = 0
self.duts.append(new_dut)
def scan(self, persistent=False):
from serial.tools import list_ports
if persistent and platform.system() == 'Linux':
# On Linux, /dev/serial/by-id provides symlinks to
# '/dev/ttyACMx' nodes using names which are unique as
# long as manufacturers fill out USB metadata nicely.
#
# This creates a map from '/dev/ttyACMx' device nodes
# to '/dev/serial/by-id/usb-...' symlinks. The symlinks
# go into the hardware map because they stay the same
# even when the user unplugs / replugs the device.
#
# Some inexpensive USB/serial adapters don't result
# in unique names here, though, so use of this feature
# requires explicitly setting persistent=True.
by_id = Path('/dev/serial/by-id')
def readlink(link):
return str((by_id / link).resolve())
persistent_map = {readlink(link): str(link)
for link in by_id.iterdir()}
else:
persistent_map = {}
serial_devices = list_ports.comports()
logger.info("Scanning connected hardware...")
for d in serial_devices:
if d.manufacturer in self.manufacturer:
# TI XDS110 can have multiple serial devices for a single board
# assume endpoint 0 is the serial, skip all others
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
continue
s_dev = DUT(platform="unknown",
id=d.serial_number,
serial=persistent_map.get(d.device, d.device),
product=d.product,
runner='unknown',
connected=True)
for runner, _ in self.runner_mapping.items():
products = self.runner_mapping.get(runner)
if d.product in products:
s_dev.runner = runner
continue
# Try regex matching
for p in products:
if re.match(p, d.product):
s_dev.runner = runner
s_dev.connected = True
s_dev.lock = None
self.detected.append(s_dev)
else:
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
def save(self, hwm_file):
# use existing map
self.detected.sort(key=lambda x: x.serial or '')
if os.path.exists(hwm_file):
with open(hwm_file, 'r') as yaml_file:
hwm = yaml.load(yaml_file, Loader=SafeLoader)
if hwm:
hwm.sort(key=lambda x: x.get('id', ''))
# disconnect everything
for h in hwm:
h['connected'] = False
h['serial'] = None
for _detected in self.detected:
for h in hwm:
if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match:
h['connected'] = True
h['serial'] = _detected.serial
_detected.match = True
new_duts = list(filter(lambda d: not d.match, self.detected))
new = []
for d in new_duts:
new.append(d.to_dict())
if hwm:
hwm = hwm + new
else:
hwm = new
with open(hwm_file, 'w') as yaml_file:
yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False)
self.load(hwm_file)
logger.info("Registered devices:")
self.dump()
else:
# create new file
dl = []
for _connected in self.detected:
platform = _connected.platform
id = _connected.id
runner = _connected.runner
serial = _connected.serial
product = _connected.product
d = {
'platform': platform,
'id': id,
'runner': runner,
'serial': serial,
'product': product,
'connected': _connected.connected
}
dl.append(d)
with open(hwm_file, 'w') as yaml_file:
yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False)
logger.info("Detected devices:")
self.dump(detected=True)
def dump(self, filtered=[], header=[], connected_only=False, detected=False):
print("")
table = []
if detected:
to_show = self.detected
else:
to_show = self.duts
if not header:
header = ["Platform", "ID", "Serial device"]
for p in to_show:
platform = p.platform
connected = p.connected
if filtered and platform not in filtered:
continue
if not connected_only or connected:
table.append([platform, p.id, p.serial])
print(tabulate(table, headers=header, tablefmt="github"))