tools: Bundle FPGA bitstream into gonk_fpga
- Bundle toplevel.bin file into a new gonk-fpga Python package.
- Update README.md and BUILD.gn Python targets.
- write_fpga.py changes:
- If gonk-fpga package is available use that as the default
bitstream file.
- Add --bitstream-file as an option. If omitted the script will not
send the fpga bin over serial.
- Prioritize --serial-number matching
- Cleanup proto ADC update output text.
- Cleanup load+write bitsream functions
- Cleanup parse proto function
Change-Id: I0dcdcfcc293e6f51a71b591c945f9d69a187c8bb
Reviewed-on: https://pigweed-review.googlesource.com/c/gonk/+/190110
Reviewed-by: Rob Mohr <mohrr@google.com>
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
diff --git a/BUILD.gn b/BUILD.gn
index 222e8fc..90eb91d 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -44,34 +44,39 @@
]
}
+# This group requires the following tools for the FPGA build:
+# yosys, nextpnr-ice40, icepack, icetime
group("fpga") {
- deps = [ "//fpga" ]
+ deps = [
+ ":pip_install_gonk_fpga",
+ "//fpga",
+ ]
}
# Python Targets
_gonk_python_packages = [ "//tools" ]
-
_gonk_proto_packages = [ "//lib/adc:protos.python" ]
# Pigweed Python packages to include
_pigweed_python_packages = [ "$dir_pw_env_setup:core_pigweed_python_packages" ]
-_all_python_packages = _gonk_python_packages + _pigweed_python_packages
+_all_python_packages =
+ _gonk_python_packages + _gonk_proto_packages + _pigweed_python_packages
+# Group that defines all Python test and lint targets.
pw_python_group("gonk_python") {
python_deps = _gonk_python_packages
}
+# The python virtualenv used for testing and linting.
pw_python_venv("gonk_build_venv") {
path = "$root_build_dir/python-venv"
source_packages = _all_python_packages
}
-# This template collects all python packages and their dependencies into a
-# single super Python package for installation into the bootstrapped virtual
-# environment.
+# Bundle of all Gonk Python packages and their Pigweed dependencies
pw_python_distribution("gonk_python_distribution") {
- packages = _pigweed_python_packages + _gonk_proto_packages
+ packages = _gonk_python_packages + _gonk_proto_packages
generate_setup_cfg = {
name = "gonk-dist"
version = "0.0.1"
@@ -80,19 +85,45 @@
}
}
+# Pip install target for Gonk Python tools and protos.
pw_python_pip_install("pip_install_gonk_dist") {
packages = [ ":gonk_python_distribution" ]
}
-pw_python_pip_install("pip_install_gonk_editable_tools") {
- packages = _gonk_python_packages
- editable = true
+# Python data package for the FPGA build.
+pw_python_distribution("gonk_fpga_distribution") {
+ packages = []
+ generate_setup_cfg = {
+ name = "gonk-fpga"
+ version = "0.0.1"
+ append_date_to_version = true
+ include_default_pyproject_file = true
+ include_extra_files_in_package_data = true
+ auto_create_package_data_init_py_files = true
+ }
+
+ public_deps = []
+
+ # The FPGA image is only built on Linux so far.
+ if (host_os == "linux") {
+ _fpga_artifacts_dir = "$root_build_dir/obj/fpga/toplevel"
+
+ public_deps += [ "//fpga:toplevel._bin($default_toolchain)" ]
+ extra_files =
+ [ "$_fpga_artifacts_dir/toplevel.bin > gonk_fpga/toplevel.bin" ]
+ }
}
+# Pip install target for the FPGA datapackage only. This is sparate since the
+# FPGA toolchain is an optional build component.
+pw_python_pip_install("pip_install_gonk_fpga") {
+ packages = [ ":gonk_fpga_distribution" ]
+}
+
+# Python group used durring bootstrap.
pw_python_group("python") {
python_deps = [
- # Generate and pip install _pigweed_python_packages
":pip_install_gonk_dist",
- ":pip_install_gonk_editable_tools",
+ "$dir_pw_env_setup:pip_install_pigweed_package",
]
}
diff --git a/README.md b/README.md
index c2ce2ad..02a38fc 100644
--- a/README.md
+++ b/README.md
@@ -48,7 +48,7 @@
sudo apt install fpga-icestorm nextpnr-ice40 yosys
```
-Run this to compile the Gonk Verilog:
+Run this to compile:
```sh
pw build
@@ -70,11 +70,11 @@
yosys-log.txt
```
-## Gonk `fpga_config` Example
+## Gonk fpga config Example
Flash the stm32f7 and launch the `write_fpga.py` script on a bitstream file.
-### Flash with `dfu-util`
+### Flash with dfu-util
1. Unplug gonk from USB and replug with MODE button held down.
@@ -89,44 +89,10 @@
1. Write an FPGA bitstream with the `write_fpga.py` script:
```sh
- python ./tools/gonk_tools/write_fpga.py ./applications/fpga_config/fpga_blinky.bin
+ python ./tools/gonk_tools/write_fpga.py --bitstream-file ./out/gn/obj/fpga/toplevel/toplevel.bin
```
-
-## Gonk `spi_flash_test` Example
-
-### Flash with `dfu-util`
-
-1. Unplug gonk from USB and replug with MODE button held down.
-
-1. Run `pw flash` on the MCU binary.
-
- ```sh
- pw flash ./out/gn/arduino_size_optimized/obj/applications/spi_flash_test/spi_flash_test.bin
- ```
-
-1. Unplug Gonk from USB and replug to reset the hardware. SPI bus issues have
- been observed without this step.
-
-1. Connect over serial.
-
- ```sh
- python -m serial.tools.miniterm --raw /dev/ttyGonk 1000000
- ```
-
- You should see output matching:
-
- ```
- --- Miniterm on /dev/ttyGonk 1000000,8,N,1 ---
- --- Quit: Ctrl+] | Menu: Ctrl+T | Help: Ctrl+T followed by Ctrl+H ---
- INF Device id: 1f 84 1
- INF Device id: 1f 84 1
- INF Device id: 1f 84 1
- INF Device id: 1f 84 1
- ```
-
-
-### Flash with BlackMagic Probe:
+### Alternative: Flash with BlackMagic Probe
```sh
./scripts/flash-with-blackmagic-probe.sh ./out/gn/arduino_size_optimized/obj/applications/spi_flash_test/bin/spi_flash_test.elf
diff --git a/tools/gonk_tools/write_fpga.py b/tools/gonk_tools/write_fpga.py
index 8e6f15c..9699bcd 100644
--- a/tools/gonk_tools/write_fpga.py
+++ b/tools/gonk_tools/write_fpga.py
@@ -15,6 +15,7 @@
import argparse
from datetime import datetime
+import importlib.resources
from itertools import islice
import operator
from pathlib import Path
@@ -26,12 +27,26 @@
from serial import Serial
from serial.tools.list_ports import comports
from serial.tools.miniterm import Miniterm, Transform
+from google.protobuf.message import DecodeError
import pw_cli.color
from gonk_adc.adc_measurement_pb2 import FramedProto
_COLOR = pw_cli.color.colors()
+_BUNDLED_FPGA_BINFILE = ''
+_BUNDLED_FPGA_BINFILE_NAME = 'toplevel.bin'
+
+# Check for a bundled FPGA bitstream file.
+try:
+ with importlib.resources.as_file(
+ importlib.resources.files('gonk_fpga') /
+ _BUNDLED_FPGA_BINFILE_NAME) as bin_path:
+ if bin_path.is_file():
+ _BUNDLED_FPGA_BINFILE = _BUNDLED_FPGA_BINFILE_NAME
+except ModuleNotFoundError:
+ pass
+
def _parse_args():
parser = argparse.ArgumentParser(description=__doc__)
@@ -58,13 +73,10 @@
help='Use the first serial port matching this number.',
)
parser.add_argument(
- 'bitstream_file',
+ '--bitstream-file',
type=Path,
- help='FPGA Bitstream file.',
- )
- parser.add_argument(
- '--skip-config',
- action='store_true',
+ help=('FPGA Bitstream file. Can be a filesystem path or "DEFAULT" to '
+ 'use a Gonk Python tools bundled binary file.'),
)
return parser.parse_args()
@@ -73,19 +85,22 @@
product: Optional[str] = None,
serial_number: Optional[str] = None,
) -> str:
+ """Return serial ports that match give serial numbers or product names."""
ports = sorted(comports(), key=operator.attrgetter('device'))
- # Print matching devices
+ # Return devices matching serial numbers first.
+ for port in ports:
+ if (serial_number is not None and port.serial_number is not None
+ and serial_number in port.serial_number):
+ return port.device
+ # If no matching serial numbers, check for matching product names.
for port in ports:
if (product is not None and port.product is not None
and product in port.product):
return port.device
- if (serial_number is not None and port.serial_number is not None
- and serial_number in port.serial_number):
- return port.device
-
+ # No matches found.
return ''
@@ -99,6 +114,10 @@
BIN_LOG_SYNC_START_BYTES = bytes.fromhex(BIN_LOG_SYNC_START_STR)
+class UnknownSerialDevice(Exception):
+ """Exception raised when no device is specified."""
+
+
class IncorrectBinaryFormat(Exception):
"""Exception raised when FPGA bitstream file is in an unexpected format."""
@@ -123,17 +142,48 @@
class HandleBinaryData(Transform):
"""Miniterm transform to handle incoming byte data."""
def __init__(self) -> None:
- self.include_binary = True
self.data = bytes()
- self.timestamp_prefix = ' timestamp: '
+ self.timestamp_prefix = ' delta_microseconds:'
self.vbus_prefix = ' vbus:'
self.vshunt_prefix = ' vshunt:'
self.start_time = time.time()
self.time_format = '%Y%m%d %H:%M:%S.%f'
self.binary_format_started = False
+ def _parse_proto(self, proto_bytes: bytes) -> str:
+ # Parse the proto message.
+ try:
+ framed_proto = FramedProto()
+ framed_proto.ParseFromString(proto_bytes)
+ vbus_values = []
+ vshunt_values = []
+ for adc_measure in framed_proto.payload.adc_measurements:
+ vbus_values.append(adc_measure.vbus_value)
+ vshunt_values.append(adc_measure.vshunt_value)
+
+ # TODO(tonymd): Use Python logging and separate output file
+ output = [
+ # Host time
+ datetime.now().strftime(self.time_format),
+ # Update byte size
+ f' size: {str(len(proto_bytes))} ',
+ self.timestamp_prefix,
+ # Delta microseconds
+ str(framed_proto.payload.timestamp),
+ # Vshunt values
+ self.vshunt_prefix,
+ ','.join(str(value) for value in vshunt_values),
+ # Vbus values
+ self.vbus_prefix,
+ ','.join(str(value) for value in vbus_values),
+ ]
+ return ' '.join(output) + '\n'
+ except DecodeError:
+ # TODO(tonymd): Handle failed packets.
+ return 'FramedProto.DecodeError\n'
+
def rx(self, text, data=None):
- """text received from serial port"""
+ """Text received from the serial port."""
if not data:
return text
@@ -160,38 +210,14 @@
# Done, reset self.data to the remaining bytes minus the above packet.
self.data = BIN_LOG_SYNC_START_BYTES + sections[2]
- # Parse the proto
- try:
- framed_proto = FramedProto()
- framed_proto.ParseFromString(proto_bytes)
- vbus_values = []
- vshunt_values = []
- for adc_measure in framed_proto.payload.adc_measurements:
- vbus_values.append(adc_measure.vbus_value)
- vshunt_values.append(adc_measure.vshunt_value)
-
- # TODO(tonymd): Use Python logging and separate output file
- output = [
- (datetime.now().strftime(self.time_format) + '.' +
- str(len(proto_bytes)) + '.'),
- self.timestamp_prefix,
- str(framed_proto.payload.timestamp),
- self.vbus_prefix,
- ','.join(str(value) for value in vbus_values),
- self.vshunt_prefix,
- ','.join(str(value) for value in vshunt_values),
- ]
- return ' '.join(output) + '\n'
- except FramedProto.DecodeError:
- # TODO(tonymd): Handle failed packets.
- return 'FramedProto.DecodeError\n'
+ return self._parse_proto(proto_bytes)
def tx(self, text):
- """text to be sent to serial port"""
+ """Text to be sent to the serial port."""
return text
def echo(self, text):
- """text to be sent but displayed on console"""
+ """Text to be sent but displayed on console."""
return text
@@ -201,7 +227,7 @@
# pylint: disable=too-many-nested-blocks
try:
while self.alive and self._reader_alive:
- # read all that is there or wait for one byte
+ # Read all that is there or wait for one byte
data = self.serial.read(self.serial.in_waiting or 1)
if data:
if self.raw:
@@ -209,7 +235,7 @@
else:
text = self.rx_decoder.decode(data)
for transformation in self.rx_transformations:
- if hasattr(transformation, 'include_binary'):
+ if isinstance(transformation, HandleBinaryData):
text = transformation.rx(text, data)
else:
text = transformation.rx(text)
@@ -220,21 +246,22 @@
raise # XXX handle instead of re-raise?
-def main(
- baudrate: int,
- bitstream_file: Path,
- port: Optional[str] = None,
- product: Optional[str] = None,
- serial_number: Optional[str] = None,
- skip_config: bool = False,
-) -> int:
- """Write a bitstream file over serial while monitoring output."""
+def load_bitstream_file(bitstream_file: Path) -> bytes:
+ """Check for valid bitstream file and load it as bytes."""
+ bitstream_bytes = b''
- # Check for valid bitstream file.
- if not bitstream_file.is_file():
- raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"')
+ if str(bitstream_file) == 'DEFAULT':
+ if not _BUNDLED_FPGA_BINFILE:
+ raise FileNotFoundError('No default bitstream file is available.')
- bitstream_bytes = bitstream_file.read_bytes()
+ bitstream_path = importlib.resources.files('gonk_fpga').joinpath(
+ _BUNDLED_FPGA_BINFILE)
+ bitstream_bytes = bitstream_path.read_bytes()
+ else:
+ if not bitstream_file.is_file():
+ raise FileNotFoundError(f'\nUnable to load "{bitstream_file}"')
+ bitstream_bytes = bitstream_file.read_bytes()
+
if (len(bitstream_bytes) != 135100
or bitstream_bytes[0:8] != FILE_START_BYTES + SYNC_START_BYTES):
raise IncorrectBinaryFormat(
@@ -242,11 +269,41 @@
f' {FILE_LENGTH} bytes in length\n'
f' Start with "{FILE_START_STR} {SYNC_START_STR}"')
+ return bitstream_bytes
+
+
+def write_bitstream_file(bitstream_bytes: bytes,
+ serial_instance: Serial) -> None:
+ """Write a series of bytes to serial."""
+
+ # Write out the bitstream in batches.
+ print('Sending bitstream...')
+ written_bytes: int = 0
+ for byte_batch in batched(bitstream_bytes, 8):
+ result = serial_instance.write(byte_batch)
+ if result:
+ written_bytes += result
+ serial_instance.flush()
+ print(f'Done sending bitstream. Wrote {written_bytes}')
+
+
+def main(
+ baudrate: int,
+ bitstream_file: Optional[Path],
+ port: Optional[str] = None,
+ product: Optional[str] = None,
+ serial_number: Optional[str] = None,
+) -> int:
+ """Write a bitstream file over serial while monitoring output."""
# Init serial port.
if port is None:
port = get_serial_port(product=product, serial_number=serial_number)
- serial_instance = Serial(port=port, baudrate=baudrate, timeout=320)
+ if not port:
+ raise UnknownSerialDevice(
+ 'No --serial-number --product or --port path provided.')
+
+ serial_instance = Serial(port=port, baudrate=baudrate, timeout=0.1)
# Use pyserial miniterm to monitor recieved data.
miniterm = MinitermBinary(
@@ -260,24 +317,17 @@
miniterm.set_rx_encoding('utf-8', errors='backslashreplace')
miniterm.set_tx_encoding('utf-8')
+ miniterm.rx_transformations.append(HandleBinaryData())
+
# Start monitoring serial data.
miniterm.start()
- if not skip_config:
+ # Send the bitstream_file.
+ if bitstream_file:
+ bitstream_bytes = load_bitstream_file(bitstream_file)
# Wait a couple seconds to print early log messages from Gonk.
time.sleep(2)
-
- # Write out the bitstream in batches.
- print('Sending bitstream...')
- written_bytes: int = 0
- for byte_batch in batched(bitstream_bytes, 8):
- result = serial_instance.write(byte_batch)
- if result:
- written_bytes += result
- serial_instance.flush()
- print(f'Done sending bitstream. Wrote {written_bytes}')
-
- miniterm.rx_transformations.append(HandleBinaryData())
+ write_bitstream_file(bitstream_bytes, serial_instance)
# Wait for ctrl-c, then shutdown miniterm.
try: