blob: 1306e54ada43062c1f88d418a7d1c682b3502a7c [file] [log] [blame]
# Copyright 2024 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Plot ADC updates as an SVG from a device logfile."""
import argparse
from datetime import datetime
from datetime import timedelta
import logging
from pathlib import Path
import sys
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
_LOG = logging.getLogger(__package__)
# CHANNEL_MASK should be set to same value used by
# SelectContinuousReadAdcs in the MCU
CHANNEL_MASK = 0b011100111001
ADC_COUNT = CHANNEL_MASK.bit_count()
channel_names_all = [
"VDD_EE",
"VCC18",
"VDDIOAO18",
"VDDCPU_B",
"DCIN",
"VDDQ(DDR)",
"VCC33",
"VCC5V",
"VDDCPU_A",
"VSYS3V3",
"EMMC18",
]
shunt_resistances = [
0.03,
0.50,
0.50,
0.10,
0.05,
0.08,
0.13,
0.05,
0.027,
0.03,
0.5,
]
# From INA229 Datasheet
VBUS_VOLTS_PER_COUNT = 195.3125e-6
VSHUNT_VOLTS_PER_COUNT = 312.5e-9
def get_channel_names(channel_mask):
"""returns list of channel names based on channel_mask."""
retval = list()
for name in channel_names_all:
if channel_mask & 0x01:
retval.append(name)
channel_mask = channel_mask >> 1
return retval
def get_resistances(channel_mask):
retval = list()
for ohms in shunt_resistances:
if channel_mask & 0x01:
retval.append(ohms)
channel_mask = channel_mask >> 1
return retval
def get_shunt_currents(channel_mask, adc_measurements):
retvals = list()
resistances = get_resistances(channel_mask)
for i, resistance in enumerate(resistances):
retvals.append(
VSHUNT_VOLTS_PER_COUNT * adc_measurements[i] / resistance
)
return retvals
def get_bus_voltages(adc_measurements):
retvals = list()
for measurement in adc_measurements:
retvals.append(VBUS_VOLTS_PER_COUNT * measurement)
return retvals
def calc_power(voltages, currents):
retvals = list()
for i, volts in enumerate(voltages):
retvals.append(volts * currents[i])
return retvals
def _parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-i',
'--input-text',
type=Path,
default=Path('gonk-device-logs.txt'),
help='Input log text file.',
)
parser.add_argument(
'-o',
'--output-svg',
type=Path,
default=Path('plot.svg'),
help='Output svg file.',
)
parser.add_argument(
'-c',
'--output-csv',
type=Path,
default=Path(''),
help='Output csv file.',
)
return parser.parse_args()
def main(
input_text: Path,
output_svg: Path,
output_csv: Path,
) -> int:
"""Plot ADC values."""
# pylint: disable=too-many-locals
print("Input %s, Output %s" % (input_text, output_svg))
if output_csv != Path(""):
print("Output csv file: %s" % output_csv)
start_time: Optional[datetime] = None
time_values = []
vbus_values: list[list[float]] = []
vshunt_values: list[list[float]] = []
power_values: list[list[float]] = []
channel_names = get_channel_names(CHANNEL_MASK)
for i in range(ADC_COUNT):
vbus_values.append([])
vshunt_values.append([])
power_values.append([])
with input_text.open() as f:
current_time = datetime.now()
for line in f.readlines():
# Skip lines that are not ADC updates.
if 'delta_microseconds' not in line:
continue
parts = line.split()
# Extract the host timestamp
dtstr = parts[5] + ' ' + parts[6]
dt = datetime.strptime(dtstr, '%Y%m%d %H:%M:%S.%f')
# Extract delta_micros
delta_micros = int(parts[10])
# Set start timestamp if not found already.
if not start_time:
start_time = dt
current_time = start_time - timedelta(microseconds=delta_micros)
# Increment delta_micros
current_time += timedelta(microseconds=delta_micros)
# Save the timestamp and vbus vshunt values for plotting.
time_values.append((current_time - start_time).total_seconds())
# vshunt and vbus fields appear to be flipped in the log stream
# coming from the gonk. These two fields (12,14) are flipped here
# to compensate. b/349079209
vshunt = list(int(i) for i in parts[12].split(','))
vbus = list(int(i) for i in parts[14].split(','))
ishunt: list[float] = get_shunt_currents(CHANNEL_MASK, vshunt)
bus_voltages = get_bus_voltages(vbus)
power: list[float] = calc_power(bus_voltages, ishunt)
for i, voltages in enumerate(bus_voltages):
vbus_values[i].append(voltages)
vshunt_values[i].append(ishunt[i])
power_values[i].append(power[i])
# Plot vbus and vshunt values.
_fig, (ax1, ax2, ax3) = plt.subplots(
3, 1, layout='constrained', figsize=[11.67, 8.27]
)
times = np.asarray(time_values)
ax1.set_xlabel('Time (s)')
ax2.set_xlabel('Time (s)')
ax3.set_xlabel('Time (s)')
linewidth = 0.7
ax1.set_ylabel('vbus')
for i in range(ADC_COUNT):
ax1.plot(
times,
np.asarray(vbus_values[i]),
label=channel_names[i],
linestyle='solid',
linewidth=linewidth,
)
ax2.set_ylabel('Ishunt')
for i in range(ADC_COUNT):
ax2.plot(
times,
np.asarray(vshunt_values[i]),
label=channel_names[i],
linestyle='solid',
linewidth=linewidth,
)
ax3.set_ylabel('Power')
for i in range(ADC_COUNT):
ax3.plot(
times,
np.asarray(power_values[i]),
label=channel_names[i],
linestyle='solid',
linewidth=linewidth,
)
ax1.legend()
ax1.grid(True)
ax2.legend()
ax2.grid(True)
ax3.legend()
ax3.grid(True)
plt.savefig(output_svg)
if output_csv != Path(""):
with output_csv.open("w+") as fd:
fd.write("ts")
for i in range(0, ADC_COUNT):
fd.write(
", %s_Volts, %s_Current, %s_Power"
% (channel_names[i], channel_names[i], channel_names[i])
)
fd.write("\n")
for i, t in enumerate(time_values):
fd.write("%f" % t)
for j in range(0, ADC_COUNT):
fd.write(
", %f, %f, %f"
% (
vbus_values[j][i],
vshunt_values[j][i],
power_values[j][i],
)
)
fd.write("\n")
return 0
if __name__ == '__main__':
print(_parse_args())
sys.exit(main(**vars(_parse_args())))