| #!/usr/bin/env python3 |
| # vim: set syntax=python ts=4 : |
| # |
| # Copyright (c) 2022 Intel Corporation |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| import os |
| from multiprocessing import Lock, Value |
| import re |
| |
| import platform |
| import yaml |
| import scl |
| import logging |
| from pathlib import Path |
| from natsort import natsorted |
| |
| from twisterlib.environment import ZEPHYR_BASE |
| |
| try: |
| # Use the C LibYAML parser if available, rather than the Python parser. |
| # It's much faster. |
| from yaml import CSafeLoader as SafeLoader |
| from yaml import CDumper as Dumper |
| except ImportError: |
| from yaml import SafeLoader, Dumper |
| |
| try: |
| from tabulate import tabulate |
| except ImportError: |
| print("Install tabulate python module with pip to use --device-testing option.") |
| |
| logger = logging.getLogger('twister') |
| logger.setLevel(logging.DEBUG) |
| |
| |
| class DUT(object): |
| def __init__(self, |
| id=None, |
| serial=None, |
| serial_baud=None, |
| platform=None, |
| product=None, |
| serial_pty=None, |
| connected=False, |
| runner_params=None, |
| pre_script=None, |
| post_script=None, |
| post_flash_script=None, |
| runner=None, |
| flash_timeout=60, |
| flash_with_test=False): |
| |
| self.serial = serial |
| self.baud = serial_baud or 115200 |
| self.platform = platform |
| self.serial_pty = serial_pty |
| self._counter = Value("i", 0) |
| self._available = Value("i", 1) |
| self.connected = connected |
| self.pre_script = pre_script |
| self.id = id |
| self.product = product |
| self.runner = runner |
| self.runner_params = runner_params |
| self.fixtures = [] |
| self.post_flash_script = post_flash_script |
| self.post_script = post_script |
| self.pre_script = pre_script |
| self.probe_id = None |
| self.notes = None |
| self.lock = Lock() |
| self.match = False |
| self.flash_timeout = flash_timeout |
| self.flash_with_test = flash_with_test |
| |
| @property |
| def available(self): |
| with self._available.get_lock(): |
| return self._available.value |
| |
| @available.setter |
| def available(self, value): |
| with self._available.get_lock(): |
| self._available.value = value |
| |
| @property |
| def counter(self): |
| with self._counter.get_lock(): |
| return self._counter.value |
| |
| @counter.setter |
| def counter(self, value): |
| with self._counter.get_lock(): |
| self._counter.value = value |
| |
| def to_dict(self): |
| d = {} |
| exclude = ['_available', '_counter', 'match'] |
| v = vars(self) |
| for k in v.keys(): |
| if k not in exclude and v[k]: |
| d[k] = v[k] |
| return d |
| |
| |
| def __repr__(self): |
| return f"<{self.platform} ({self.product}) on {self.serial}>" |
| |
| class HardwareMap: |
| schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml") |
| |
| manufacturer = [ |
| 'ARM', |
| 'SEGGER', |
| 'MBED', |
| 'STMicroelectronics', |
| 'Atmel Corp.', |
| 'Texas Instruments', |
| 'Silicon Labs', |
| 'NXP Semiconductors', |
| 'Microchip Technology Inc.', |
| 'FTDI', |
| 'Digilent', |
| 'Microsoft' |
| ] |
| |
| runner_mapping = { |
| 'pyocd': [ |
| 'DAPLink CMSIS-DAP', |
| 'MBED CMSIS-DAP' |
| ], |
| 'jlink': [ |
| 'J-Link', |
| 'J-Link OB' |
| ], |
| 'openocd': [ |
| 'STM32 STLink', '^XDS110.*', 'STLINK-V3' |
| ], |
| 'dediprog': [ |
| 'TTL232R-3V3', |
| 'MCP2200 USB Serial Port Emulator' |
| ] |
| } |
| |
| def __init__(self, env=None): |
| self.detected = [] |
| self.duts = [] |
| self.options = env.options |
| |
| def discover(self): |
| |
| if self.options.generate_hardware_map: |
| self.scan(persistent=self.options.persistent_hardware_map) |
| self.save(self.options.generate_hardware_map) |
| return 0 |
| |
| if not self.options.device_testing and self.options.hardware_map: |
| self.load(self.options.hardware_map) |
| logger.info("Available devices:") |
| self.dump(connected_only=True) |
| return 0 |
| |
| if self.options.device_testing: |
| if self.options.hardware_map: |
| self.load(self.options.hardware_map) |
| if not self.options.platform: |
| self.options.platform = [] |
| for d in self.duts: |
| if d.connected and d.platform != 'unknown': |
| self.options.platform.append(d.platform) |
| |
| elif self.options.device_serial: |
| self.add_device(self.options.device_serial, |
| self.options.platform[0], |
| self.options.pre_script, |
| False, |
| baud=self.options.device_serial_baud, |
| flash_timeout=self.options.device_flash_timeout, |
| flash_with_test=self.options.device_flash_with_test |
| ) |
| |
| elif self.options.device_serial_pty: |
| self.add_device(self.options.device_serial_pty, |
| self.options.platform[0], |
| self.options.pre_script, |
| True, |
| flash_timeout=self.options.device_flash_timeout, |
| flash_with_test=self.options.device_flash_with_test |
| ) |
| |
| # the fixtures given by twister command explicitly should be assigned to each DUT |
| if self.options.fixture: |
| for d in self.duts: |
| d.fixtures.extend(self.options.fixture) |
| return 1 |
| |
| |
| def summary(self, selected_platforms): |
| print("\nHardware distribution summary:\n") |
| table = [] |
| header = ['Board', 'ID', 'Counter'] |
| for d in self.duts: |
| if d.connected and d.platform in selected_platforms: |
| row = [d.platform, d.id, d.counter] |
| table.append(row) |
| print(tabulate(table, headers=header, tablefmt="github")) |
| |
| |
| def add_device(self, serial, platform, pre_script, is_pty, baud=None, flash_timeout=60, flash_with_test=False): |
| device = DUT(platform=platform, connected=True, pre_script=pre_script, serial_baud=baud, |
| flash_timeout=flash_timeout, flash_with_test=flash_with_test |
| ) |
| if is_pty: |
| device.serial_pty = serial |
| else: |
| device.serial = serial |
| |
| self.duts.append(device) |
| |
| def load(self, map_file): |
| hwm_schema = scl.yaml_load(self.schema_path) |
| duts = scl.yaml_load_verify(map_file, hwm_schema) |
| for dut in duts: |
| pre_script = dut.get('pre_script') |
| post_script = dut.get('post_script') |
| post_flash_script = dut.get('post_flash_script') |
| flash_timeout = dut.get('flash_timeout') or self.options.device_flash_timeout |
| flash_with_test = dut.get('flash_with_test') |
| if flash_with_test is None: |
| flash_with_test = self.options.device_flash_with_test |
| platform = dut.get('platform') |
| id = dut.get('id') |
| runner = dut.get('runner') |
| runner_params = dut.get('runner_params') |
| serial_pty = dut.get('serial_pty') |
| serial = dut.get('serial') |
| baud = dut.get('baud', None) |
| product = dut.get('product') |
| fixtures = dut.get('fixtures', []) |
| connected= dut.get('connected') and ((serial or serial_pty) is not None) |
| if not connected: |
| continue |
| new_dut = DUT(platform=platform, |
| product=product, |
| runner=runner, |
| runner_params=runner_params, |
| id=id, |
| serial_pty=serial_pty, |
| serial=serial, |
| serial_baud=baud, |
| connected=connected, |
| pre_script=pre_script, |
| post_script=post_script, |
| post_flash_script=post_flash_script, |
| flash_timeout=flash_timeout, |
| flash_with_test=flash_with_test) |
| new_dut.fixtures = fixtures |
| new_dut.counter = 0 |
| self.duts.append(new_dut) |
| |
| def scan(self, persistent=False): |
| from serial.tools import list_ports |
| |
| if persistent and platform.system() == 'Linux': |
| # On Linux, /dev/serial/by-id provides symlinks to |
| # '/dev/ttyACMx' nodes using names which are unique as |
| # long as manufacturers fill out USB metadata nicely. |
| # |
| # This creates a map from '/dev/ttyACMx' device nodes |
| # to '/dev/serial/by-id/usb-...' symlinks. The symlinks |
| # go into the hardware map because they stay the same |
| # even when the user unplugs / replugs the device. |
| # |
| # Some inexpensive USB/serial adapters don't result |
| # in unique names here, though, so use of this feature |
| # requires explicitly setting persistent=True. |
| by_id = Path('/dev/serial/by-id') |
| def readlink(link): |
| return str((by_id / link).resolve()) |
| |
| persistent_map = {readlink(link): str(link) |
| for link in by_id.iterdir()} |
| else: |
| persistent_map = {} |
| |
| serial_devices = list_ports.comports() |
| logger.info("Scanning connected hardware...") |
| for d in serial_devices: |
| if d.manufacturer in self.manufacturer: |
| |
| # TI XDS110 can have multiple serial devices for a single board |
| # assume endpoint 0 is the serial, skip all others |
| if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'): |
| continue |
| |
| if d.product is None: |
| d.product = 'unknown' |
| |
| s_dev = DUT(platform="unknown", |
| id=d.serial_number, |
| serial=persistent_map.get(d.device, d.device), |
| product=d.product, |
| runner='unknown', |
| connected=True) |
| |
| for runner, _ in self.runner_mapping.items(): |
| products = self.runner_mapping.get(runner) |
| if d.product in products: |
| s_dev.runner = runner |
| continue |
| # Try regex matching |
| for p in products: |
| if re.match(p, d.product): |
| s_dev.runner = runner |
| |
| s_dev.connected = True |
| s_dev.lock = None |
| self.detected.append(s_dev) |
| else: |
| logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d)) |
| |
| def save(self, hwm_file): |
| # use existing map |
| self.detected = natsorted(self.detected, key=lambda x: x.serial or '') |
| if os.path.exists(hwm_file): |
| with open(hwm_file, 'r') as yaml_file: |
| hwm = yaml.load(yaml_file, Loader=SafeLoader) |
| if hwm: |
| hwm.sort(key=lambda x: x.get('id', '')) |
| |
| # disconnect everything |
| for h in hwm: |
| h['connected'] = False |
| h['serial'] = None |
| |
| for _detected in self.detected: |
| for h in hwm: |
| if all([ |
| _detected.id == h['id'], |
| _detected.product == h['product'], |
| _detected.match is False, |
| h['connected'] is False |
| ]): |
| h['connected'] = True |
| h['serial'] = _detected.serial |
| _detected.match = True |
| break |
| |
| new_duts = list(filter(lambda d: not d.match, self.detected)) |
| new = [] |
| for d in new_duts: |
| new.append(d.to_dict()) |
| |
| if hwm: |
| hwm = hwm + new |
| else: |
| hwm = new |
| |
| with open(hwm_file, 'w') as yaml_file: |
| yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False) |
| |
| self.load(hwm_file) |
| logger.info("Registered devices:") |
| self.dump() |
| |
| else: |
| # create new file |
| dl = [] |
| for _connected in self.detected: |
| platform = _connected.platform |
| id = _connected.id |
| runner = _connected.runner |
| serial = _connected.serial |
| product = _connected.product |
| d = { |
| 'platform': platform, |
| 'id': id, |
| 'runner': runner, |
| 'serial': serial, |
| 'product': product, |
| 'connected': _connected.connected |
| } |
| dl.append(d) |
| with open(hwm_file, 'w') as yaml_file: |
| yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False) |
| logger.info("Detected devices:") |
| self.dump(detected=True) |
| |
| def dump(self, filtered=[], header=[], connected_only=False, detected=False): |
| print("") |
| table = [] |
| if detected: |
| to_show = self.detected |
| else: |
| to_show = self.duts |
| |
| if not header: |
| header = ["Platform", "ID", "Serial device"] |
| for p in to_show: |
| platform = p.platform |
| connected = p.connected |
| if filtered and platform not in filtered: |
| continue |
| |
| if not connected_only or connected: |
| table.append([platform, p.id, p.serial]) |
| |
| print(tabulate(table, headers=header, tablefmt="github")) |