| #!/usr/bin/env python3 |
| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """This script flashes and runs unit tests on stm32f769i-disc0 boards.""" |
| |
| import argparse |
| import logging |
| import os |
| import subprocess |
| import sys |
| import threading |
| from typing import List |
| |
| import coloredlogs # type: ignore |
| import serial |
| from stm32f769i_disc0_utils import stm32f769i_detector |
| |
| # Path used to access non-python resources in this python module. |
| _DIR = os.path.dirname(__file__) |
| |
| # Path to default openocd configuration file. |
| _OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f7xx.cfg') |
| |
| # Path to scripts provided by openocd. |
| _OPENOCD_SCRIPTS_DIR = os.path.join( |
| os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd', 'scripts' |
| ) |
| |
| _LOG = logging.getLogger('unit_test_runner') |
| |
| # Verification of test pass/failure depends on these strings. If the formatting |
| # or output of the simple_printing_event_handler changes, this may need to be |
| # updated. |
| _TESTS_STARTING_STRING = b'[==========] Running all tests.' |
| _TESTS_DONE_STRING = b'[==========] Done running all tests.' |
| _TEST_FAILURE_STRING = b'[ FAILED ]' |
| |
| # How long to wait for the first byte of a test to be emitted. This is longer |
| # than the user-configurable timeout as there's a delay while the device is |
| # flashed. |
| _FLASH_TIMEOUT = 5.0 |
| |
| |
| class TestingFailure(Exception): |
| """A simple exception to be raised when a testing step fails.""" |
| |
| |
| class DeviceNotFound(Exception): |
| """A simple exception to be raised when unable to connect to a device.""" |
| |
| |
| def parse_args(): |
| """Parses command-line arguments.""" |
| |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument('binary', help='The target test binary to run') |
| parser.add_argument( |
| '--openocd-config', |
| default=_OPENOCD_CONFIG, |
| help='Path to openocd configuration file', |
| ) |
| parser.add_argument( |
| '--stlink-serial', |
| default=None, |
| help='The serial number of the stlink to use when ' |
| 'flashing the target device', |
| ) |
| parser.add_argument( |
| '--port', |
| default=None, |
| help='The name of the serial port to connect to when ' 'running tests', |
| ) |
| parser.add_argument( |
| '--baud', |
| type=int, |
| default=115200, |
| help='Target baud rate to use for serial communication' |
| ' with target device', |
| ) |
| parser.add_argument( |
| '--test-timeout', |
| type=float, |
| default=5.0, |
| help='Maximum communication delay in seconds before a ' |
| 'test is considered unresponsive and aborted', |
| ) |
| parser.add_argument( |
| '--verbose', |
| '-v', |
| dest='verbose', |
| action="store_true", |
| help='Output additional logs as the script runs', |
| ) |
| |
| return parser.parse_args() |
| |
| |
| def log_subprocess_output(level, output): |
| """Logs subprocess output line-by-line.""" |
| |
| lines = output.decode('utf-8', errors='replace').splitlines() |
| for line in lines: |
| _LOG.log(level, line) |
| |
| |
| def reset_device(openocd_config, stlink_serial): |
| """Uses openocd to reset the attached device.""" |
| |
| # Name/path of openocd. |
| default_flasher = 'openocd' |
| flash_tool = os.getenv('OPENOCD_PATH', default_flasher) |
| |
| cmd = [ |
| flash_tool, |
| '-s', |
| _OPENOCD_SCRIPTS_DIR, |
| '-f', |
| openocd_config, |
| '-c', |
| 'init', |
| '-c', |
| 'reset run', |
| '-c', |
| 'exit', |
| ] |
| _LOG.debug('Resetting device') |
| |
| env = os.environ.copy() |
| if stlink_serial: |
| env['PW_STLINK_SERIAL'] = stlink_serial |
| |
| # Disable GDB port to support multi-device testing. |
| env['PW_GDB_PORT'] = 'disabled' |
| process = subprocess.run( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env |
| ) |
| if process.returncode: |
| log_subprocess_output(logging.ERROR, process.stdout) |
| raise TestingFailure('Failed to reset target device') |
| |
| log_subprocess_output(logging.DEBUG, process.stdout) |
| |
| _LOG.debug('Successfully reset device') |
| |
| |
| def read_serial(port, baud_rate, test_timeout) -> bytes: |
| """Reads lines from a serial port until a line read times out. |
| |
| Returns bytes object containing the read serial data. |
| """ |
| |
| serial_data = bytearray() |
| device = serial.Serial( |
| baudrate=baud_rate, port=port, timeout=_FLASH_TIMEOUT |
| ) |
| if not device.is_open: |
| raise TestingFailure('Failed to open device') |
| |
| # Flush input buffer and reset the device to begin the test. |
| device.reset_input_buffer() |
| |
| # Block and wait for the first byte. |
| serial_data += device.read() |
| if not serial_data: |
| raise TestingFailure('Device not producing output') |
| |
| device.timeout = test_timeout |
| |
| # Read with a reasonable timeout until we stop getting characters. |
| while True: |
| bytes_read = device.readline() |
| if not bytes_read: |
| break |
| serial_data += bytes_read |
| if serial_data.rfind(_TESTS_DONE_STRING) != -1: |
| # Set to much more aggressive timeout since the last one or two |
| # lines should print out immediately. (one line if all fails or all |
| # passes, two lines if mixed.) |
| device.timeout = 0.01 |
| |
| # Remove carriage returns. |
| serial_data = serial_data.replace(b'\r', b'') |
| |
| # Try to trim captured results to only contain most recent test run. |
| test_start_index = serial_data.rfind(_TESTS_STARTING_STRING) |
| return ( |
| serial_data |
| if test_start_index == -1 |
| else serial_data[test_start_index:] |
| ) |
| |
| |
| def flash_device(binary, openocd_config, stlink_serial): |
| """Flash binary to a connected device using the provided configuration.""" |
| |
| # Name/path of openocd. |
| default_flasher = 'openocd' |
| flash_tool = os.getenv('OPENOCD_PATH', default_flasher) |
| |
| openocd_command = ' '.join(['program', binary, 'reset', 'exit']) |
| cmd = [ |
| flash_tool, |
| '-s', |
| _OPENOCD_SCRIPTS_DIR, |
| '-f', |
| openocd_config, |
| '-c', |
| openocd_command, |
| ] |
| _LOG.info('Flashing firmware to device') |
| |
| env = os.environ.copy() |
| if stlink_serial: |
| env['PW_STLINK_SERIAL'] = stlink_serial |
| |
| # Disable GDB port to support multi-device testing. |
| env['PW_GDB_PORT'] = 'disabled' |
| process = subprocess.run( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env |
| ) |
| if process.returncode: |
| log_subprocess_output(logging.ERROR, process.stdout) |
| raise TestingFailure('Failed to flash target device') |
| |
| log_subprocess_output(logging.DEBUG, process.stdout) |
| |
| _LOG.debug('Successfully flashed firmware to device') |
| |
| |
| def handle_test_results(test_output): |
| """Parses test output to determine whether tests passed or failed.""" |
| |
| if test_output.find(_TESTS_STARTING_STRING) == -1: |
| raise TestingFailure('Failed to find test start') |
| |
| if test_output.rfind(_TESTS_DONE_STRING) == -1: |
| log_subprocess_output(logging.INFO, test_output) |
| raise TestingFailure('Tests did not complete') |
| |
| if test_output.rfind(_TEST_FAILURE_STRING) != -1: |
| log_subprocess_output(logging.INFO, test_output) |
| raise TestingFailure('Test suite had one or more failures') |
| |
| log_subprocess_output(logging.DEBUG, test_output) |
| |
| _LOG.info('Test passed!') |
| |
| |
| def _threaded_test_reader(dest, port, baud_rate, test_timeout): |
| """Parses test output to the mutable "dest" passed to this function.""" |
| dest.append(read_serial(port, baud_rate, test_timeout)) |
| |
| |
| def run_device_test( |
| binary, test_timeout, openocd_config, baud, stlink_serial=None, port=None |
| ) -> bool: |
| """Flashes, runs, and checks an on-device test binary. |
| |
| Returns true on test pass. |
| """ |
| |
| if stlink_serial is None and port is None: |
| _LOG.debug('Attempting to automatically detect dev board') |
| boards = stm32f769i_detector.detect_boards() |
| if not boards: |
| error = 'Could not find an attached device' |
| _LOG.error(error) |
| raise DeviceNotFound(error) |
| stlink_serial = boards[0].serial_number |
| port = boards[0].dev_name |
| |
| _LOG.debug('Launching test binary %s', binary) |
| try: |
| # Begin capturing test output via another thread BEFORE flashing the |
| # device since the test will automatically run after the image is |
| # flashed. This reduces flake since there isn't a need to time a reset |
| # correctly relative to the start of capturing device output. |
| result: List[bytes] = [] |
| threaded_reader_args = (result, port, baud, test_timeout) |
| read_thread = threading.Thread( |
| target=_threaded_test_reader, args=threaded_reader_args |
| ) |
| read_thread.start() |
| _LOG.info('Running test') |
| flash_device(binary, openocd_config, stlink_serial) |
| read_thread.join() |
| if result: |
| handle_test_results(result[0]) |
| except TestingFailure as err: |
| _LOG.error(err) |
| return False |
| |
| return True |
| |
| |
| def main(): |
| """Set up runner, and then flash/run device test.""" |
| args = parse_args() |
| |
| # Try to use pw_cli logs, else default to something reasonable. |
| try: |
| import pw_cli.log # pylint: disable=import-outside-toplevel |
| |
| log_level = logging.DEBUG if args.verbose else logging.INFO |
| pw_cli.log.install(level=log_level) |
| except ImportError: |
| coloredlogs.install( |
| level='DEBUG' if args.verbose else 'INFO', |
| level_styles={'debug': {'color': 244}, 'error': {'color': 'red'}}, |
| fmt='%(asctime)s %(levelname)s | %(message)s', |
| ) |
| |
| if run_device_test( |
| args.binary, |
| args.test_timeout, |
| args.openocd_config, |
| args.baud, |
| args.stlink_serial, |
| args.port, |
| ): |
| sys.exit(0) |
| else: |
| sys.exit(1) |
| |
| |
| if __name__ == '__main__': |
| main() |