| # Copyright 2024 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Plot ADC updates as an SVG from a device logfile.""" |
| |
| import argparse |
| from datetime import datetime |
| from datetime import timedelta |
| import logging |
| from pathlib import Path |
| import sys |
| |
| import pw_cli.log |
| |
| from gonk_tools.gonk import DEFAULT_CSV_LOGFILE |
| from gonk_tools.adc import ( |
| get_channel_names, |
| ADC_COUNT, |
| ) |
| |
| |
| import matplotlib.pyplot as plt |
| import numpy as np |
| |
| _LOG = logging.getLogger(__package__) |
| |
| |
| def _parse_args(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| '-i', |
| '--input-csv', |
| type=Path, |
| default=Path(DEFAULT_CSV_LOGFILE), |
| help=f'Input CSV text file. Default: {DEFAULT_CSV_LOGFILE}', |
| ) |
| parser.add_argument( |
| '-o', |
| '--output-svg', |
| type=Path, |
| default=None, |
| help='Output svg file.', |
| ) |
| return parser.parse_args() |
| |
| |
| def plot( |
| input_csv: Path, |
| output_svg: Path, |
| ) -> int: |
| """Plot ADC values.""" |
| # pylint: disable=too-many-locals |
| pw_cli.log.install() |
| |
| print(f'Input CSV: {input_csv}') |
| if output_svg: |
| print(f'Output svg file: {output_svg}') |
| |
| interactive_plotting = not (output_svg) |
| if interactive_plotting: |
| print('No outputs specified; plotting interactively') |
| |
| start_time: datetime | None = None |
| time_values = [] |
| vbus_values: list[list[float]] = [] |
| vshunt_values: list[list[float]] = [] |
| power_values: list[list[float]] = [] |
| channel_names = get_channel_names() |
| |
| for i in range(ADC_COUNT): |
| vbus_values.append([]) |
| vshunt_values.append([]) |
| power_values.append([]) |
| |
| with input_csv.open() as f: |
| current_time = datetime.now() |
| |
| for line in f.readlines(): |
| parts = [line.strip() for line in line.split(',')] |
| if len(parts) != ADC_COUNT * 3 + 2: |
| _LOG.error('Unexpected number of CSV fields: %i', len(parts)) |
| _LOG.error('Fields: %s', parts) |
| _LOG.error('Line: "%s"', line) |
| |
| # Extract the host timestamp |
| dtstr = parts[0] |
| dt = datetime.strptime(dtstr, '%Y%m%d %H:%M:%S.%f') |
| # Extract delta_micros |
| delta_micros = int(parts[1]) |
| |
| # Set start timestamp if not found already. |
| if not start_time: |
| start_time = dt |
| current_time = start_time - timedelta(microseconds=delta_micros) |
| |
| # Increment delta_micros |
| current_time += timedelta(microseconds=delta_micros) |
| |
| # Save the timestamp |
| time_values.append((current_time - start_time).total_seconds()) |
| |
| index = 2 |
| voltages = list(float(i) for i in parts[index : index + ADC_COUNT]) |
| index += ADC_COUNT |
| current = list(float(i) for i in parts[index : index + ADC_COUNT]) |
| index += ADC_COUNT |
| power = list(float(i) for i in parts[index : index + ADC_COUNT]) |
| |
| for i, voltage in enumerate(voltages): |
| vbus_values[i].append(voltage) |
| vshunt_values[i].append(current[i]) |
| power_values[i].append(power[i]) |
| |
| # Plot vbus and vshunt values. |
| _fig, (ax1, ax2, ax3) = plt.subplots( # type: ignore |
| 3, 1, layout='constrained', figsize=[11.67, 8.27] |
| ) |
| |
| times = np.asarray(time_values) |
| ax1.set_xlabel('Time (s)') |
| ax2.set_xlabel('Time (s)') |
| ax3.set_xlabel('Time (s)') |
| |
| linewidth = 0.7 |
| ax1.set_ylabel('vbus') |
| for i in range(ADC_COUNT): |
| ax1.plot( |
| times, |
| np.asarray(vbus_values[i]), |
| label=channel_names[i], |
| linestyle='solid', |
| linewidth=linewidth, |
| ) |
| |
| ax2.set_ylabel('Ishunt') |
| for i in range(ADC_COUNT): |
| ax2.plot( |
| times, |
| np.asarray(vshunt_values[i]), |
| label=channel_names[i], |
| linestyle='solid', |
| linewidth=linewidth, |
| ) |
| ax3.set_ylabel('Power') |
| for i in range(ADC_COUNT): |
| ax3.plot( |
| times, |
| np.asarray(power_values[i]), |
| label=channel_names[i], |
| linestyle='solid', |
| linewidth=linewidth, |
| ) |
| |
| ax1.legend() |
| ax1.grid(True) |
| ax2.legend() |
| ax2.grid(True) |
| ax3.legend() |
| ax3.grid(True) |
| |
| if output_svg: |
| plt.savefig(output_svg) |
| |
| if interactive_plotting: |
| plt.show() |
| |
| return 0 |
| |
| |
| def main() -> None: |
| sys.exit(plot(**vars(_parse_args()))) |
| |
| |
| if __name__ == '__main__': |
| main() |