| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Write a binary over serial to provision the Gonk FPGA.""" |
| |
| import argparse |
| from datetime import datetime |
| from itertools import islice |
| import operator |
| from pathlib import Path |
| import sys |
| import time |
| from typing import Optional |
| |
| import serial |
| from serial import Serial |
| from serial.tools.list_ports import comports |
| from serial.tools.miniterm import Miniterm, Transform |
| |
| import pw_cli.color |
| from gonk_adc.adc_measurement_pb2 import FramedProto |
| |
| _COLOR = pw_cli.color.colors() |
| |
| |
| def _parse_args(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| '-p', |
| '--port', |
| type=str, |
| help='Serial port path.', |
| ) |
| parser.add_argument( |
| '-b', |
| '--baudrate', |
| type=int, |
| default=2000000, |
| help='Sets the baudrate for serial communication.', |
| ) |
| parser.add_argument( |
| '--product', |
| default='GENERIC_F730R8TX', |
| help='Use first serial port matching this product name.', |
| ) |
| parser.add_argument( |
| '--serial-number', |
| help='Use the first serial port matching this number.', |
| ) |
| parser.add_argument( |
| 'bitstream_file', |
| type=Path, |
| help='FPGA Bitstream file.', |
| ) |
| parser.add_argument( |
| '--skip-config', |
| action='store_true', |
| ) |
| return parser.parse_args() |
| |
| |
| def get_serial_port( |
| product: Optional[str] = None, |
| serial_number: Optional[str] = None, |
| ) -> str: |
| |
| ports = sorted(comports(), key=operator.attrgetter('device')) |
| |
| # Print matching devices |
| for port in ports: |
| if (product is not None and port.product is not None |
| and product in port.product): |
| return port.device |
| |
| if (serial_number is not None and port.serial_number is not None |
| and serial_number in port.serial_number): |
| return port.device |
| |
| return '' |
| |
| |
| FILE_LENGTH = 135100 |
| FILE_START_STR = 'FF 00 00 FF' |
| SYNC_START_STR = '7E AA 99 7E' |
| FILE_START_BYTES = bytes.fromhex(FILE_START_STR) |
| SYNC_START_BYTES = bytes.fromhex(SYNC_START_STR) |
| |
| BIN_LOG_SYNC_START_STR = '0d 99 bb aa 7e' |
| BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR) |
| |
| |
| class IncorrectBinaryFormat(Exception): |
| """Exception raised when FPGA bitstream file is in an unexpected format.""" |
| |
| |
| def batched(iterable, n): |
| """Batch data into tuples of length n. The last batch may be shorter. |
| |
| Example usage: |
| |
| .. code-block:: pycon |
| |
| >>> list(batched('ABCDEFG', 3)) |
| ['ABC', 'DEF', 'G'] |
| """ |
| if n < 1: |
| raise ValueError('n must be at least one') |
| it = iter(iterable) |
| while batch := tuple(islice(it, n)): |
| yield batch |
| |
| |
| class HandleBinaryData(Transform): |
| """Miniterm transform to handle incoming byte data.""" |
| def __init__(self) -> None: |
| self.include_binary = True |
| self.data = bytes() |
| self.timestamp_prefix = ' timestamp: ' |
| self.vbus_prefix = ' vbus:' |
| self.vshunt_prefix = ' vshunt:' |
| self.start_time = time.time() |
| self.time_format = '%Y%m%d %H:%M:%S.%f' |
| self.binary_format_started = False |
| |
| def rx(self, text, data=None): |
| """text received from serial port""" |
| if not data: |
| return text |
| |
| # Concat data with any existing captures |
| self.data = self.data + data |
| |
| # Check for binary start bytes. |
| if BIN_LOG_SYNC_START_BYTES in self.data: |
| self.binary_format_started = True |
| if not self.binary_format_started: |
| return text |
| |
| # Check for a complete proto capture. |
| sections = self.data.split(BIN_LOG_SYNC_START_BYTES, maxsplit=2) |
| if len(sections) < 3: |
| # Don't print anything / filter out this text |
| return '' |
| |
| # Section 0 contains previous bytes not between two proto start bytes. |
| _discard_bytes = sections[0] |
| # Section 1 is a complete proto packet. |
| proto_bytes = BIN_LOG_SYNC_START_BYTES + sections[1] |
| |
| # Done, reset self.data to the remaining bytes minus the above packet. |
| self.data = BIN_LOG_SYNC_START_BYTES + sections[2] |
| |
| # Parse the proto |
| try: |
| framed_proto = FramedProto() |
| framed_proto.ParseFromString(proto_bytes) |
| vbus_values = [] |
| vshunt_values = [] |
| for adc_measure in framed_proto.payload.adc_measurements: |
| vbus_values.append(adc_measure.vbus_value) |
| vshunt_values.append(adc_measure.vshunt_value) |
| |
| # TODO(tonymd): Use Python logging and separate output file |
| output = [ |
| (datetime.now().strftime(self.time_format) + '.' + |
| str(len(proto_bytes)) + '.'), |
| self.timestamp_prefix, |
| str(framed_proto.payload.timestamp), |
| self.vbus_prefix, |
| ','.join(str(value) for value in vbus_values), |
| self.vshunt_prefix, |
| ','.join(str(value) for value in vshunt_values), |
| ] |
| return ' '.join(output) + '\n' |
| except FramedProto.DecodeError: |
| # TODO(tonymd): Handle failed packets. |
| return 'FramedProto.DecodeError\n' |
| |
| def tx(self, text): |
| """text to be sent to serial port""" |
| return text |
| |
| def echo(self, text): |
| """text to be sent but displayed on console""" |
| return text |
| |
| |
| class MinitermBinary(Miniterm): |
| def reader(self): |
| """loop and copy serial->console""" |
| # pylint: disable=too-many-nested-blocks |
| try: |
| while self.alive and self._reader_alive: |
| # read all that is there or wait for one byte |
| data = self.serial.read(self.serial.in_waiting or 1) |
| if data: |
| if self.raw: |
| self.console.write_bytes(data) |
| else: |
| text = self.rx_decoder.decode(data) |
| for transformation in self.rx_transformations: |
| if hasattr(transformation, 'include_binary'): |
| text = transformation.rx(text, data) |
| else: |
| text = transformation.rx(text) |
| self.console.write(text) |
| except serial.SerialException: |
| self.alive = False |
| self.console.cancel() |
| raise # XXX handle instead of re-raise? |
| |
| |
| def main( |
| baudrate: int, |
| bitstream_file: Path, |
| port: Optional[str] = None, |
| product: Optional[str] = None, |
| serial_number: Optional[str] = None, |
| skip_config: bool = False, |
| ) -> int: |
| """Write a bitstream file over serial while monitoring output.""" |
| |
| # Check for valid bitstream file. |
| if not bitstream_file.is_file(): |
| raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"') |
| |
| bitstream_bytes = bitstream_file.read_bytes() |
| if (len(bitstream_bytes) != 135100 |
| or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES): |
| raise IncorrectBinaryFormat( |
| f'the bitstream file must be:\n' |
| f' {FILE_LENGTH} bytes in length\n' |
| f' Start with "{FILE_START_STR} {SYNC_START_STR}"') |
| |
| # Init serial port. |
| if port is None: |
| port = get_serial_port(product=product, serial_number=serial_number) |
| |
| serial_instance = Serial(port=port, baudrate=baudrate, timeout=320) |
| |
| # Use pyserial miniterm to monitor recieved data. |
| miniterm = MinitermBinary( |
| serial_instance, |
| echo=False, |
| eol='crlf', |
| filters=(), |
| ) |
| # Use Ctrl-C as the exit character. (Miniterm default is Ctrl-]) |
| miniterm.exit_character = chr(0x03) |
| miniterm.set_rx_encoding('utf-8', errors='backslashreplace') |
| miniterm.set_tx_encoding('utf-8') |
| |
| # Start monitoring serial data. |
| miniterm.start() |
| |
| if not skip_config: |
| # Wait a couple seconds to print early log messages from Gonk. |
| time.sleep(2) |
| |
| # Write out the bitstream in batches. |
| print('Sending bitstream...') |
| written_bytes: int = 0 |
| for byte_batch in batched(bitstream_bytes, 8): |
| result = serial_instance.write(byte_batch) |
| if result: |
| written_bytes += result |
| serial_instance.flush() |
| print(f'Done sending bitstream. Wrote {written_bytes}') |
| |
| miniterm.rx_transformations.append(HandleBinaryData()) |
| |
| # Wait for ctrl-c, then shutdown miniterm. |
| try: |
| miniterm.join(True) |
| except KeyboardInterrupt: |
| pass |
| sys.stderr.write('\n--- exit ---\n') |
| miniterm.join() |
| miniterm.close() |
| |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main(**vars(_parse_args()))) |