| # Copyright 2024 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Plot ADC updates as an SVG from a device logfile.""" |
| |
| import argparse |
| from datetime import datetime |
| from datetime import timedelta |
| import logging |
| from pathlib import Path |
| import sys |
| from typing import Optional |
| |
| import matplotlib.pyplot as plt |
| import numpy as np |
| |
| _LOG = logging.getLogger(__package__) |
| |
| # CHANNEL_MASK should be set to same value used by |
| # SelectContinuousReadAdcs in the MCU |
| CHANNEL_MASK = 0b011100111001 |
| ADC_COUNT = CHANNEL_MASK.bit_count() |
| |
| channel_names_all = [ |
| "VDD_EE", |
| "VCC18", |
| "VDDIOAO18", |
| "VDDCPU_B", |
| "DCIN", |
| "VDDQ(DDR)", |
| "VCC33", |
| "VCC5V", |
| "VDDCPU_A", |
| "VSYS3V3", |
| "EMMC18", |
| ] |
| shunt_resistances = [ |
| 0.03, |
| 0.50, |
| 0.50, |
| 0.10, |
| 0.05, |
| 0.08, |
| 0.13, |
| 0.05, |
| 0.027, |
| 0.03, |
| 0.5, |
| ] |
| |
| # From INA229 Datasheet |
| VBUS_VOLTS_PER_COUNT = 195.3125e-6 |
| VSHUNT_VOLTS_PER_COUNT = 312.5e-9 |
| |
| |
| def get_channel_names(channel_mask): |
| """returns list of channel names based on channel_mask.""" |
| retval = list() |
| |
| for name in channel_names_all: |
| if channel_mask & 0x01: |
| retval.append(name) |
| channel_mask = channel_mask >> 1 |
| return retval |
| |
| |
| def get_resistances(channel_mask): |
| retval = list() |
| |
| for ohms in shunt_resistances: |
| if channel_mask & 0x01: |
| retval.append(ohms) |
| channel_mask = channel_mask >> 1 |
| return retval |
| |
| |
| def get_shunt_currents(channel_mask, adc_measurements): |
| retvals = list() |
| resistances = get_resistances(channel_mask) |
| for i, resistance in enumerate(resistances): |
| retvals.append( |
| VSHUNT_VOLTS_PER_COUNT * adc_measurements[i] / resistance |
| ) |
| return retvals |
| |
| |
| def get_bus_voltages(adc_measurements): |
| retvals = list() |
| for measurement in adc_measurements: |
| retvals.append(VBUS_VOLTS_PER_COUNT * measurement) |
| return retvals |
| |
| |
| def calc_power(voltages, currents): |
| retvals = list() |
| for i, volts in enumerate(voltages): |
| retvals.append(volts * currents[i]) |
| return retvals |
| |
| |
| def _parse_args(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| '-i', |
| '--input-text', |
| type=Path, |
| default=Path('gonk-device-logs.txt'), |
| help='Input log text file.', |
| ) |
| parser.add_argument( |
| '-o', |
| '--output-svg', |
| type=Path, |
| default=Path('plot.svg'), |
| help='Output svg file.', |
| ) |
| parser.add_argument( |
| '-c', |
| '--output-csv', |
| type=Path, |
| default=Path(''), |
| help='Output csv file.', |
| ) |
| return parser.parse_args() |
| |
| |
| def main( |
| input_text: Path, |
| output_svg: Path, |
| output_csv: Path, |
| ) -> int: |
| """Plot ADC values.""" |
| # pylint: disable=too-many-locals |
| print("Input %s, Output %s" % (input_text, output_svg)) |
| if output_csv != Path(""): |
| print("Output csv file: %s" % output_csv) |
| |
| start_time: Optional[datetime] = None |
| time_values = [] |
| vbus_values: list[list[float]] = [] |
| vshunt_values: list[list[float]] = [] |
| power_values: list[list[float]] = [] |
| channel_names = get_channel_names(CHANNEL_MASK) |
| |
| for i in range(ADC_COUNT): |
| vbus_values.append([]) |
| vshunt_values.append([]) |
| power_values.append([]) |
| |
| with input_text.open() as f: |
| current_time = datetime.now() |
| |
| for line in f.readlines(): |
| # Skip lines that are not ADC updates. |
| if 'delta_microseconds' not in line: |
| continue |
| |
| parts = line.split() |
| |
| # Extract the host timestamp |
| dtstr = parts[5] + ' ' + parts[6] |
| dt = datetime.strptime(dtstr, '%Y%m%d %H:%M:%S.%f') |
| # Extract delta_micros |
| delta_micros = int(parts[10]) |
| |
| # Set start timestamp if not found already. |
| if not start_time: |
| start_time = dt |
| current_time = start_time - timedelta(microseconds=delta_micros) |
| |
| # Increment delta_micros |
| current_time += timedelta(microseconds=delta_micros) |
| |
| # Save the timestamp and vbus vshunt values for plotting. |
| time_values.append((current_time - start_time).total_seconds()) |
| # vshunt and vbus fields appear to be flipped in the log stream |
| # coming from the gonk. These two fields (12,14) are flipped here |
| # to compensate. b/349079209 |
| vshunt = list(int(i) for i in parts[12].split(',')) |
| vbus = list(int(i) for i in parts[14].split(',')) |
| |
| ishunt: list[float] = get_shunt_currents(CHANNEL_MASK, vshunt) |
| bus_voltages = get_bus_voltages(vbus) |
| power: list[float] = calc_power(bus_voltages, ishunt) |
| |
| for i, voltages in enumerate(bus_voltages): |
| vbus_values[i].append(voltages) |
| vshunt_values[i].append(ishunt[i]) |
| power_values[i].append(power[i]) |
| |
| # Plot vbus and vshunt values. |
| _fig, (ax1, ax2, ax3) = plt.subplots( |
| 3, 1, layout='constrained', figsize=[11.67, 8.27] |
| ) |
| |
| times = np.asarray(time_values) |
| ax1.set_xlabel('Time (s)') |
| ax2.set_xlabel('Time (s)') |
| ax3.set_xlabel('Time (s)') |
| |
| linewidth = 0.7 |
| ax1.set_ylabel('vbus') |
| for i in range(ADC_COUNT): |
| ax1.plot( |
| times, |
| np.asarray(vbus_values[i]), |
| label=channel_names[i], |
| linestyle='solid', |
| linewidth=linewidth, |
| ) |
| |
| ax2.set_ylabel('Ishunt') |
| for i in range(ADC_COUNT): |
| ax2.plot( |
| times, |
| np.asarray(vshunt_values[i]), |
| label=channel_names[i], |
| linestyle='solid', |
| linewidth=linewidth, |
| ) |
| ax3.set_ylabel('Power') |
| for i in range(ADC_COUNT): |
| ax3.plot( |
| times, |
| np.asarray(power_values[i]), |
| label=channel_names[i], |
| linestyle='solid', |
| linewidth=linewidth, |
| ) |
| |
| ax1.legend() |
| ax1.grid(True) |
| ax2.legend() |
| ax2.grid(True) |
| ax3.legend() |
| ax3.grid(True) |
| plt.savefig(output_svg) |
| |
| if output_csv != Path(""): |
| with output_csv.open("w+") as fd: |
| fd.write("ts") |
| for i in range(0, ADC_COUNT): |
| fd.write( |
| ", %s_Volts, %s_Current, %s_Power" |
| % (channel_names[i], channel_names[i], channel_names[i]) |
| ) |
| fd.write("\n") |
| for i, t in enumerate(time_values): |
| fd.write("%f" % t) |
| for j in range(0, ADC_COUNT): |
| fd.write( |
| ", %f, %f, %f" |
| % ( |
| vbus_values[j][i], |
| vshunt_values[j][i], |
| power_values[j][i], |
| ) |
| ) |
| fd.write("\n") |
| |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| print(_parse_args()) |
| sys.exit(main(**vars(_parse_args()))) |