blob: df403092079f40e9433109d1fced240f780af4b2 [file]
#!/usr/bin/env python3
# Licensed under the Apache-2.0 license
# SPDX-License-Identifier: Apache-2.0
"""
AST1060 UART Test Execution Script
Provides GPIO control for SRST/FWSPICK pins and automated firmware upload
with test execution monitoring via pyserial.
This script is designed to be invoked by Bazel test rules or used standalone.
"""
import argparse
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, Tuple
def _find_pw_tokenizer() -> bool:
"""Attempt to locate and add pw_tokenizer to sys.path.
Checks PW_TOK_ROOT first, then falls back to the Bazel output base.
Returns True if pw_tokenizer was found and added to sys.path.
"""
pw_tok_root = os.environ.get("PW_TOK_ROOT")
if pw_tok_root:
sys.path.insert(0, os.path.join(pw_tok_root, "pw_tokenizer", "py"))
return True
try:
output_base = subprocess.check_output(
["bazel", "info", "output_base"], text=True, stderr=subprocess.DEVNULL
).strip()
candidate = os.path.join(
output_base, "external", "pigweed+", "pw_tokenizer", "py"
)
if os.path.isdir(candidate):
sys.path.insert(0, candidate)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
return False
_PW_TOKENIZER_AVAILABLE = _find_pw_tokenizer()
if _PW_TOKENIZER_AVAILABLE:
from pw_tokenizer import Detokenizer
try:
import serial
except ImportError:
print("Error: pyserial not installed. Install with: pip install pyserial")
sys.exit(1)
class UartTestExecutor:
"""Handles AST1060 UART test execution with GPIO control."""
# Default success/failure patterns for test monitoring
SUCCESS_PATTERNS = ["COMPLETE", "TEST PASSED", "All tests passed"]
FAILURE_PATTERNS = ["panic", "FAIL", "ERROR", "abort"]
def __init__(self, args):
self.args = args
self.serial_port: Optional[serial.Serial] = None
self.log_file = (
getattr(args, "log_file", None) or f"uart-test-{os.getpid()}.log"
)
self.log_file_handle = None
elf = getattr(args, "elf", None)
self.detokenizer = (
Detokenizer(elf) if (elf and _PW_TOKENIZER_AVAILABLE) else None
)
def log(self, message: str):
"""Print message unless in quiet mode."""
if not self.args.quiet:
print(message, flush=True)
def print_uart_data(self, data: str):
"""Print UART data, with detokenized output on the following line in green."""
print(data, end="", flush=True)
if self.detokenizer:
detokenized = self.detokenizer.detokenize_text(data)
if detokenized != data:
print(f"\033[32m{detokenized}\033[0m", end="", flush=True)
def run_command(self, cmd: list, check: bool = True) -> Tuple[int, str, str]:
"""Run command and return (returncode, stdout, stderr)."""
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if check and result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode, cmd, result.stdout, result.stderr
)
return result.returncode, result.stdout, result.stderr
except FileNotFoundError:
raise RuntimeError(f"Command not found: {cmd[0]}")
def gpio_set(self, pin: int, state: str):
"""Set GPIO pin state using pinctrl."""
cmd = ["pinctrl", "set", str(pin), "op", state]
self.log(f"GPIO {pin}: {state}")
if self.args.dry_run:
self.log(f"DRY RUN: {' '.join(cmd)}")
return
try:
self.run_command(cmd)
except Exception as e:
raise RuntimeError(f"Failed to set GPIO {pin} to {state}: {e}")
def toggle_srst(self, state: str):
"""Toggle SRST pin (dl=low, dh=high)."""
self.gpio_set(self.args.srst_pin, state)
def toggle_fwspick(self, state: str):
"""Toggle FWSPICK pin (dh=high, dl=low)."""
if state == "dh":
cmd_state = "pn dh"
else:
cmd_state = state
cmd = ["pinctrl", "set", str(self.args.fwspick_pin), "op"] + cmd_state.split()
self.log(f"GPIO {self.args.fwspick_pin} (FWSPICK): {state}")
if self.args.dry_run:
self.log(f"DRY RUN: {' '.join(cmd)}")
return
try:
self.run_command(cmd)
except Exception as e:
raise RuntimeError(
f"Failed to set FWSPICK {self.args.fwspick_pin} to {state}: {e}"
)
def sequence_to_fwspick_mode(self):
"""Execute sequence to enter FWSPICK mode."""
self.log("Entering FWSPICK mode sequence...")
self.toggle_srst("dl") # SRST low
time.sleep(0.1)
self.toggle_fwspick("dh") # FWSPICK high
time.sleep(1)
self.toggle_srst("dh") # SRST high
time.sleep(1)
self.log("FWSPICK mode sequence complete")
def sequence_to_normal_mode(self):
"""Execute sequence to enter normal boot mode."""
self.log("Entering normal boot mode sequence...")
self.toggle_fwspick("dl") # FWSPICK low
time.sleep(0.1)
self.toggle_srst("dl") # SRST low
time.sleep(0.5)
self.toggle_srst("dh") # SRST high
time.sleep(2)
self.log("Normal boot mode sequence complete")
def open_serial(self) -> bool:
"""Open serial port connection."""
if self.args.skip_uart:
self.log("Skipping UART setup")
return True
if self.args.dry_run:
self.log("DRY RUN: Would open serial port")
return True
try:
self.serial_port = serial.Serial(
port=self.args.uart_device,
baudrate=self.args.baudrate,
timeout=1.0,
write_timeout=1.0,
)
# Open log file
self.log_file_handle = open(self.log_file, "w")
self.log(
f"Serial port opened: {self.args.uart_device} @ {self.args.baudrate} baud"
)
self.log(f"Logging to: {self.log_file}")
return True
except Exception as e:
raise RuntimeError(f"Failed to open serial port: {e}")
def close_serial(self):
"""Close serial port connection."""
if self.serial_port:
self.serial_port.close()
self.serial_port = None
if self.log_file_handle:
self.log_file_handle.close()
self.log_file_handle = None
def read_serial_data(self, timeout_seconds: float = 1.0) -> str:
"""Read available data from serial port."""
if not self.serial_port or self.args.skip_uart:
return ""
if self.args.dry_run:
return ""
try:
self.serial_port.timeout = timeout_seconds
data = self.serial_port.read(1024)
if data:
decoded = data.decode("utf-8", errors="ignore")
# Log to file
if self.log_file_handle:
self.log_file_handle.write(decoded)
self.log_file_handle.flush()
return decoded
return ""
except Exception as e:
self.log(f"Serial read error: {e}")
return ""
def write_serial_data(self, data: bytes) -> bool:
"""Write data to serial port."""
if not self.serial_port or self.args.skip_uart:
return True
if self.args.dry_run:
self.log(f"DRY RUN: Would write {len(data)} bytes to serial")
return True
try:
bytes_written = self.serial_port.write(data)
self.serial_port.flush()
return bytes_written == len(data)
except Exception as e:
self.log(f"Serial write error: {e}")
return False
def wait_for_uart_ready(self, timeout: int = 30) -> bool:
"""Wait for 'U' character indicating UART bootloader ready."""
if self.args.skip_uart:
self.log("Skipping UART ready check")
return True
self.log(f"Waiting for UART ready signal ('U') with {timeout}s timeout...")
if self.args.dry_run:
self.log("DRY RUN: Would wait for UART ready")
return True
start_time = time.time()
buffer = ""
while time.time() - start_time < timeout:
data = self.read_serial_data(0.1)
if data:
buffer += data
if not self.args.quiet:
self.print_uart_data(data)
# Look for 'U' character
if "U" in buffer:
self.log("\nUART bootloader ready detected!")
return True
self.log("\nTimeout waiting for UART ready signal")
return False
def upload_firmware(self) -> bool:
"""Upload firmware via serial port."""
if self.args.skip_uart or not self.args.firmware:
self.log("Skipping firmware upload")
return True
firmware_path = Path(self.args.firmware)
if not firmware_path.exists():
raise RuntimeError(f"Firmware file not found: {firmware_path}")
self.log(f"Uploading firmware: {firmware_path}")
if self.args.dry_run:
self.log("DRY RUN: Would upload firmware")
return True
try:
with open(firmware_path, "rb") as f:
firmware_data = f.read()
self.log(f"Uploading {len(firmware_data)} bytes...")
# Upload firmware in chunks
chunk_size = 1024
bytes_sent = 0
for i in range(0, len(firmware_data), chunk_size):
chunk = firmware_data[i : i + chunk_size]
if not self.write_serial_data(chunk):
self.log("Failed to write firmware chunk")
return False
bytes_sent += len(chunk)
time.sleep(0.01)
# Progress indicator
if not self.args.quiet and bytes_sent % (chunk_size * 10) == 0:
progress = (bytes_sent * 100) // len(firmware_data)
print(f"\rProgress: {progress}%", end="", flush=True)
if not self.args.quiet:
print()
self.log("Firmware upload completed")
return True
except Exception as e:
self.log(f"Failed to upload firmware: {e}")
return False
def monitor_test_execution(self, timeout: int = 600) -> bool:
"""Monitor test execution via serial port."""
if self.args.skip_uart:
self.log("Skipping test monitoring")
return True
actual_timeout = getattr(self.args, "test_timeout", timeout)
self.log(f"Monitoring test execution with {actual_timeout}s timeout...")
if self.args.dry_run:
self.log("DRY RUN: Would monitor test execution")
return True
start_time = time.time()
buffer = ""
test_results = {"passed": 0, "failed": 0, "skipped": 0}
while time.time() - start_time < actual_timeout:
data = self.read_serial_data(0.5)
if data:
buffer += data
if not self.args.quiet:
self.print_uart_data(data)
lines = buffer.split("\n")
for line in lines:
if "PASS" in line:
test_results["passed"] += 1
elif "FAIL" in line:
test_results["failed"] += 1
elif "SKIP" in line:
test_results["skipped"] += 1
# Check for completion
for pattern in self.SUCCESS_PATTERNS:
if pattern in line:
self.log(f"\nTest execution completed!")
self.log(f"Results: {test_results}")
return test_results["failed"] == 0
# Check for failure
for pattern in self.FAILURE_PATTERNS:
if pattern.lower() in line.lower():
self.log(f"\nFailure detected: {pattern}")
return False
buffer = "\n".join(lines[-10:])
self.log(f"\nTest monitoring timeout. Results so far: {test_results}")
return test_results["failed"] == 0
def cleanup(self):
"""Clean up resources."""
self.close_serial()
def run_full_test_sequence(self) -> bool:
"""Execute the complete test sequence."""
try:
if not self.open_serial():
return False
self.sequence_to_fwspick_mode()
if not self.wait_for_uart_ready():
return False
if not self.upload_firmware():
return False
return self.monitor_test_execution()
finally:
self.cleanup()
def main():
parser = argparse.ArgumentParser(
description="AST1060 UART Test Execution Script",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Full test sequence
./uart_test_exec.py /dev/ttyUSB0 firmware.bin
# Manual GPIO control
./uart_test_exec.py --manual-srst low
./uart_test_exec.py --manual-fwspick high
# Sequence control
./uart_test_exec.py --sequence fwspick-mode
./uart_test_exec.py --sequence normal-mode
# Upload-only (no GPIO, no monitoring)
./uart_test_exec.py --upload-only /dev/ttyUSB0 firmware.bin
# Bazel test mode (exit code indicates pass/fail)
./uart_test_exec.py --bazel-test /dev/ttyUSB0 firmware.bin
""",
)
# Positional arguments
parser.add_argument(
"uart_device", nargs="?", help="UART device path (e.g., /dev/ttyUSB0)"
)
parser.add_argument("firmware", nargs="?", help="Firmware binary file path")
parser.add_argument("--elf", help="ELF file for pw_tokenizer detokenization")
# GPIO control
parser.add_argument(
"--srst-pin", type=int, default=23, help="SRST GPIO pin number (default: 23)"
)
parser.add_argument(
"--fwspick-pin",
type=int,
default=18,
help="FWSPICK GPIO pin number (default: 18)",
)
# Manual GPIO operations
parser.add_argument(
"--manual-srst",
choices=["low", "high", "dl", "dh"],
help="Manually toggle SRST pin",
)
parser.add_argument(
"--manual-fwspick",
choices=["low", "high", "dl", "dh"],
help="Manually toggle FWSPICK pin",
)
# Sequence operations
parser.add_argument(
"--sequence",
choices=["fwspick-mode", "normal-mode"],
help="Run GPIO sequence",
)
# UART settings
parser.add_argument(
"-b",
"--baudrate",
type=int,
default=115200,
help="UART baud rate (default: 115200)",
)
parser.add_argument(
"--test-timeout",
type=int,
default=600,
help="Test execution monitoring timeout in seconds (default: 600)",
)
parser.add_argument(
"--log-file", help="Log file path (auto-generated if not specified)"
)
# Control flags
parser.add_argument(
"--skip-uart", action="store_true", help="Skip all UART operations"
)
parser.add_argument(
"-q", "--quiet", action="store_true", help="Run silently (no output)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without executing",
)
parser.add_argument(
"--upload-only",
action="store_true",
help="Skip GPIO commands, upload firmware only",
)
parser.add_argument(
"--bazel-test",
action="store_true",
help="Run in Bazel test mode (structured output)",
)
parser.add_argument(
"--skip-gpio",
action="store_true",
help="Skip GPIO operations but still monitor tests",
)
args = parser.parse_args()
# Validate pw_tokenizer / --elf argument consistency
if os.environ.get("PW_TOK_ROOT") and not args.elf:
print(
"Error: PW_TOK_ROOT is set but --elf was not provided. "
"Detokenization requires an ELF file."
)
sys.exit(1)
if args.elf and not _PW_TOKENIZER_AVAILABLE:
print(
"Error: --elf was provided but pw_tokenizer could not be located. "
"Set PW_TOK_ROOT to the Pigweed root or ensure Bazel has fetched it."
)
sys.exit(1)
# Validate arguments
if args.upload_only:
if not args.uart_device or not args.firmware:
parser.error("--upload-only requires UART device and firmware file")
elif not any(
[args.manual_srst, args.manual_fwspick, args.sequence, args.uart_device]
):
parser.error("Must specify manual GPIO control, sequence, or UART device")
if (
args.uart_device
and not args.skip_uart
and not args.dry_run
and not Path(args.uart_device).exists()
):
parser.error(f"UART device not found: {args.uart_device}")
executor = UartTestExecutor(args)
try:
# Handle manual GPIO operations
if args.manual_srst:
state = "dl" if args.manual_srst in ["low", "dl"] else "dh"
executor.toggle_srst(state)
return 0
if args.manual_fwspick:
state = "dh" if args.manual_fwspick in ["high", "dh"] else "dl"
executor.toggle_fwspick(state)
return 0
# Upload-only mode
if args.upload_only:
if not executor.open_serial():
return 1
ok = executor.upload_firmware()
executor.cleanup()
return 0 if ok else 1
# Handle sequence operations
if args.sequence == "fwspick-mode":
executor.sequence_to_fwspick_mode()
return 0
elif args.sequence == "normal-mode":
executor.sequence_to_normal_mode()
return 0
# Run full test sequence (or skip GPIO if requested)
if args.skip_gpio:
# Open serial, upload, and monitor without GPIO
try:
if not executor.open_serial():
return 1
if not executor.upload_firmware():
return 1
if executor.monitor_test_execution():
executor.log("Test execution completed successfully!")
return 0
else:
executor.log("Test execution failed!")
return 1
finally:
executor.cleanup()
else:
if executor.run_full_test_sequence():
executor.log("Test execution completed successfully!")
return 0
else:
executor.log("Test execution failed!")
return 1
except KeyboardInterrupt:
executor.log("\nInterrupted by user")
return 130
except Exception as e:
executor.log(f"Error: {e}")
return 1
finally:
executor.cleanup()
if __name__ == "__main__":
sys.exit(main())