blob: 8e6f15c1b2f8a33e07c15c9f857aaeadcde18cbf [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Write a binary over serial to provision the Gonk FPGA."""
import argparse
from datetime import datetime
from itertools import islice
import operator
from pathlib import Path
import sys
import time
from typing import Optional
import serial
from serial import Serial
from serial.tools.list_ports import comports
from serial.tools.miniterm import Miniterm, Transform
import pw_cli.color
from gonk_adc.adc_measurement_pb2 import FramedProto
_COLOR = pw_cli.color.colors()
def _parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-p',
'--port',
type=str,
help='Serial port path.',
)
parser.add_argument(
'-b',
'--baudrate',
type=int,
default=2000000,
help='Sets the baudrate for serial communication.',
)
parser.add_argument(
'--product',
default='GENERIC_F730R8TX',
help='Use first serial port matching this product name.',
)
parser.add_argument(
'--serial-number',
help='Use the first serial port matching this number.',
)
parser.add_argument(
'bitstream_file',
type=Path,
help='FPGA Bitstream file.',
)
parser.add_argument(
'--skip-config',
action='store_true',
)
return parser.parse_args()
def get_serial_port(
product: Optional[str] = None,
serial_number: Optional[str] = None,
) -> str:
ports = sorted(comports(), key=operator.attrgetter('device'))
# Print matching devices
for port in ports:
if (product is not None and port.product is not None
and product in port.product):
return port.device
if (serial_number is not None and port.serial_number is not None
and serial_number in port.serial_number):
return port.device
return ''
FILE_LENGTH = 135100
FILE_START_STR = 'FF 00 00 FF'
SYNC_START_STR = '7E AA 99 7E'
FILE_START_BYTES = bytes.fromhex(FILE_START_STR)
SYNC_START_BYTES = bytes.fromhex(SYNC_START_STR)
BIN_LOG_SYNC_START_STR = '0d 99 bb aa 7e'
BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR)
class IncorrectBinaryFormat(Exception):
"""Exception raised when FPGA bitstream file is in an unexpected format."""
def batched(iterable, n):
"""Batch data into tuples of length n. The last batch may be shorter.
Example usage:
.. code-block:: pycon
>>> list(batched('ABCDEFG', 3))
['ABC', 'DEF', 'G']
"""
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
class HandleBinaryData(Transform):
"""Miniterm transform to handle incoming byte data."""
def __init__(self) -> None:
self.include_binary = True
self.data = bytes()
self.timestamp_prefix = ' timestamp: '
self.vbus_prefix = ' vbus:'
self.vshunt_prefix = ' vshunt:'
self.start_time = time.time()
self.time_format = '%Y%m%d %H:%M:%S.%f'
self.binary_format_started = False
def rx(self, text, data=None):
"""text received from serial port"""
if not data:
return text
# Concat data with any existing captures
self.data = self.data + data
# Check for binary start bytes.
if BIN_LOG_SYNC_START_BYTES in self.data:
self.binary_format_started = True
if not self.binary_format_started:
return text
# Check for a complete proto capture.
sections = self.data.split(BIN_LOG_SYNC_START_BYTES, maxsplit=2)
if len(sections) < 3:
# Don't print anything / filter out this text
return ''
# Section 0 contains previous bytes not between two proto start bytes.
_discard_bytes = sections[0]
# Section 1 is a complete proto packet.
proto_bytes = BIN_LOG_SYNC_START_BYTES + sections[1]
# Done, reset self.data to the remaining bytes minus the above packet.
self.data = BIN_LOG_SYNC_START_BYTES + sections[2]
# Parse the proto
try:
framed_proto = FramedProto()
framed_proto.ParseFromString(proto_bytes)
vbus_values = []
vshunt_values = []
for adc_measure in framed_proto.payload.adc_measurements:
vbus_values.append(adc_measure.vbus_value)
vshunt_values.append(adc_measure.vshunt_value)
# TODO(tonymd): Use Python logging and separate output file
output = [
(datetime.now().strftime(self.time_format) + '.' +
str(len(proto_bytes)) + '.'),
self.timestamp_prefix,
str(framed_proto.payload.timestamp),
self.vbus_prefix,
','.join(str(value) for value in vbus_values),
self.vshunt_prefix,
','.join(str(value) for value in vshunt_values),
]
return ' '.join(output) + '\n'
except FramedProto.DecodeError:
# TODO(tonymd): Handle failed packets.
return 'FramedProto.DecodeError\n'
def tx(self, text):
"""text to be sent to serial port"""
return text
def echo(self, text):
"""text to be sent but displayed on console"""
return text
class MinitermBinary(Miniterm):
def reader(self):
"""loop and copy serial->console"""
# pylint: disable=too-many-nested-blocks
try:
while self.alive and self._reader_alive:
# read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
self.console.write_bytes(data)
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
if hasattr(transformation, 'include_binary'):
text = transformation.rx(text, data)
else:
text = transformation.rx(text)
self.console.write(text)
except serial.SerialException:
self.alive = False
self.console.cancel()
raise # XXX handle instead of re-raise?
def main(
baudrate: int,
bitstream_file: Path,
port: Optional[str] = None,
product: Optional[str] = None,
serial_number: Optional[str] = None,
skip_config: bool = False,
) -> int:
"""Write a bitstream file over serial while monitoring output."""
# Check for valid bitstream file.
if not bitstream_file.is_file():
raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"')
bitstream_bytes = bitstream_file.read_bytes()
if (len(bitstream_bytes) != 135100
or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES):
raise IncorrectBinaryFormat(
f'the bitstream file must be:\n'
f' {FILE_LENGTH} bytes in length\n'
f' Start with "{FILE_START_STR} {SYNC_START_STR}"')
# Init serial port.
if port is None:
port = get_serial_port(product=product, serial_number=serial_number)
serial_instance = Serial(port=port, baudrate=baudrate, timeout=320)
# Use pyserial miniterm to monitor recieved data.
miniterm = MinitermBinary(
serial_instance,
echo=False,
eol='crlf',
filters=(),
)
# Use Ctrl-C as the exit character. (Miniterm default is Ctrl-])
miniterm.exit_character = chr(0x03)
miniterm.set_rx_encoding('utf-8', errors='backslashreplace')
miniterm.set_tx_encoding('utf-8')
# Start monitoring serial data.
miniterm.start()
if not skip_config:
# Wait a couple seconds to print early log messages from Gonk.
time.sleep(2)
# Write out the bitstream in batches.
print('Sending bitstream...')
written_bytes: int = 0
for byte_batch in batched(bitstream_bytes, 8):
result = serial_instance.write(byte_batch)
if result:
written_bytes += result
serial_instance.flush()
print(f'Done sending bitstream. Wrote {written_bytes}')
miniterm.rx_transformations.append(HandleBinaryData())
# Wait for ctrl-c, then shutdown miniterm.
try:
miniterm.join(True)
except KeyboardInterrupt:
pass
sys.stderr.write('\n--- exit ---\n')
miniterm.join()
miniterm.close()
return 0
if __name__ == '__main__':
sys.exit(main(**vars(_parse_args())))