| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Write a binary over serial to provision the Gonk FPGA.""" |
| |
| import argparse |
| from itertools import islice |
| import operator |
| from pathlib import Path |
| import sys |
| import time |
| from typing import Optional |
| |
| from serial import Serial |
| from serial.tools.list_ports import comports |
| from serial.tools.miniterm import Miniterm |
| |
| |
| def _parse_args(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| '-p', |
| '--port', |
| type=str, |
| help='Serial port path.', |
| ) |
| parser.add_argument( |
| '-b', |
| '--baudrate', |
| type=int, |
| default=1000000, |
| help='Sets the baudrate for serial communication.', |
| ) |
| parser.add_argument( |
| '--product', |
| default='GENERIC_F730R8TX', |
| help='Use first serial port matching this product name.', |
| ) |
| parser.add_argument( |
| '--serial-number', |
| help='Use the first serial port matching this number.', |
| ) |
| parser.add_argument( |
| 'bitstream_file', |
| type=Path, |
| help='FPGA Bitstream file.', |
| ) |
| return parser.parse_args() |
| |
| |
| def get_serial_port( |
| product: Optional[str] = None, |
| serial_number: Optional[str] = None, |
| ) -> str: |
| |
| ports = sorted(comports(), key=operator.attrgetter('device')) |
| |
| # Print matching devices |
| for port in ports: |
| if (product is not None and port.product is not None |
| and product in port.product): |
| return port.device |
| |
| if (serial_number is not None and port.serial_number is not None |
| and serial_number in port.serial_number): |
| return port.device |
| |
| return '' |
| |
| |
| FILE_LENGTH = 135100 |
| FILE_START_STR = 'FF 00 00 FF' |
| SYNC_START_STR = '7E AA 99 7E' |
| FILE_START_BYTES = bytes.fromhex(FILE_START_STR) |
| SYNC_START_BYTES = bytes.fromhex(SYNC_START_STR) |
| |
| |
| class IncorrectBinaryFormat(Exception): |
| """Exception raised when FPGA bitstream file is in an unexpected format.""" |
| |
| |
| def batched(iterable, n): |
| """Batch data into tuples of length n. The last batch may be shorter. |
| |
| Example usage: |
| |
| .. code-block:: pycon |
| |
| >>> list(batched('ABCDEFG', 3)) |
| ['ABC', 'DEF', 'G'] |
| """ |
| if n < 1: |
| raise ValueError('n must be at least one') |
| it = iter(iterable) |
| while batch := tuple(islice(it, n)): |
| yield batch |
| |
| |
| def main( |
| baudrate: int, |
| bitstream_file: Path, |
| port: Optional[str] = None, |
| product: Optional[str] = None, |
| serial_number: Optional[str] = None, |
| ) -> int: |
| """Write a bitstream file over serial while monitoring output.""" |
| |
| # Check for valid bitstream file. |
| if not bitstream_file.is_file(): |
| raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"') |
| |
| bitstream_bytes = bitstream_file.read_bytes() |
| if (len(bitstream_bytes) != 135100 |
| or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES): |
| raise IncorrectBinaryFormat( |
| f'the bitstream file must be:\n' |
| f' {FILE_LENGTH} bytes in length\n' |
| f' Start with "{FILE_START_STR} {SYNC_START_STR}"') |
| |
| # Init serial port. |
| if port is None: |
| port = get_serial_port(product=product, serial_number=serial_number) |
| |
| serial_instance = Serial(port=port, baudrate=baudrate, timeout=320) |
| |
| # Use pyserial miniterm to monitor recieved data. |
| miniterm = Miniterm( |
| serial_instance, |
| echo=False, |
| eol='crlf', |
| filters=(), |
| ) |
| # Use Ctrl-C as the exit character. (Miniterm default is Ctrl-]) |
| miniterm.exit_character = chr(0x03) |
| miniterm.set_rx_encoding('utf-8') |
| miniterm.set_tx_encoding('utf-8') |
| |
| # Start monitoring serial data. |
| miniterm.start() |
| |
| # Wait a couple seconds to print early log messages from Gonk. |
| time.sleep(2) |
| |
| # Write out the bitstream in batches. |
| print('Sending bitstream...') |
| written_bytes: int = 0 |
| for byte_batch in batched(bitstream_bytes, 8): |
| result = serial_instance.write(byte_batch) |
| if result: |
| written_bytes += result |
| serial_instance.flush() |
| print(f'Done sending bitstream. Wrote {written_bytes}') |
| |
| # Wait for ctrl-c, then shutdown miniterm. |
| try: |
| miniterm.join(True) |
| except KeyboardInterrupt: |
| pass |
| sys.stderr.write('\n--- exit ---\n') |
| miniterm.join() |
| miniterm.close() |
| |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main(**vars(_parse_args()))) |