blob: c5b95bd9864d0022ae097714df278009c2c7e4f7 [file] [log] [blame]
# Copyright: (c) 2025, Intel Corporation
# Author: Arkadiusz Cholewinski <arkadiuszx.cholewinski@intel.com>
import csv
import logging
import queue
import re
import threading
import time
import utils.UtilityFunctions as UtilityFunctions
from abstract.PowerMonitor import PowerMonitor
from stm32l562e_dk.PowerShieldConfig import PowerShieldConf
from stm32l562e_dk.PowerShieldData import PowerShieldData
from stm32l562e_dk.SerialHandler import SerialHandler
class PowerShield(PowerMonitor):
def __init__(self):
"""
Initializes the PowerShield.
"""
self.handler = None
self.dataQueue = queue.Queue()
self.acqComplete = False
self.acqStart = False
self.target_voltage = None
self.target_temperature = None
self.acqTimeoutThread = None
self.power_shield_conf = PowerShieldConf()
self.power_shield_data = PowerShieldData()
def init(self):
"""
Initializes the power monitor.
"""
self.__take_control()
self.__set_voltage(self.power_shield_conf.target_voltage)
self.__set_format(self.power_shield_conf.data_format)
self.__set_func_mode(self.power_shield_conf.function_mode)
def connect(self, power_device_path: str):
"""Opens the connection using the SerialHandler."""
self.handler = SerialHandler(power_device_path, 3686400)
self.handler.open()
def disconnect(self):
"""Closes the connection using the SerialHandler."""
self.handler.close()
def __send_command(self, command: str, expected_ack: str = None, ack: bool = False) -> str:
"""
Sends a command to the device, retrieves the response,
and optionally verifies the acknowledgment.
:param command: The command to send.
:param expected_ack: The expected acknowledgment response (e.g., "ack htc").
:return: The response received from the device.
"""
if not self.handler.is_open():
logging.info(f"Error: Connection is not open. Cannot send command: {command}")
return ""
logging.debug(f"Sending command: {command}")
self.handler.send_cmd(command)
if ack:
response = self.handler.receive_cmd()
logging.debug(f"Response: {response}")
# Check if the response contains the expected acknowledgment
if expected_ack and expected_ack not in response:
logging.error(f"Error: Expected acknowledgment '{expected_ack}' not found.")
return ""
return response
return 0
def __test_communication(self):
"""
Sends a version command to the device.
"""
if not self.handler.is_open():
logging.error("Error: Connection is not open. Cannot send version command.")
return ""
command = 'version'
logging.info(f"Sending command: {command}")
self.handler.send_cmd(command)
response = self.handler.receive_cmd()
logging.info(f"Response: {response}")
return response
def __reset(self):
"""
Sends the reset command ('PSRST') to the power monitor device,
closes the connection, waits for the reset process to complete,
and repeatedly attempts to reconnect until successful.
"""
command = "psrst"
if not self.handler.is_open():
logging.error("Error: Connection is not open. Cannot reset the device.")
return
logging.info(f"Sending reset command: {command}")
self.handler.send_cmd(command)
# Close the connection
self.handler.close()
self.handler.serial_connection = None
time.sleep(5)
# Attempt to reopen the connection
try:
self.handler.open()
logging.info("Connection reopened after reset.")
except Exception as e:
logging.error(f"Failed to reopen connection after reset: {e}")
def __get_voltage_level(self) -> float:
"""
Sends the 'volt get' command and returns the voltage value as a float.
:return: The voltage level as a float, in volts (V).
"""
command = 'volt get'
response = self.__send_command(command, expected_ack="ack volt get", ack=True)
# If response contains the expected acknowledgment, extract and return the voltage
if response:
parts = response.split()
try:
if len(parts) >= 5:
# Use regex to find a string that matches the pattern, e.g., "3292-03"
match = re.search(r'(\d+)-(\d+)', parts[5])
if match:
# Extract the base (3292) and exponent (03)
base = match.group(1)
exponent = match.group(2)
# Construct the scientific notation string (e.g., 3292e-03)
voltage_str = f"{base}e-{exponent}"
# Convert the string into a float
voltage = float(voltage_str)
# Return the voltage as a float
self.target_voltage = round(voltage, 3)
return self.target_voltage
except ValueError:
logging.error("Error: Could not convert temperature value.")
return float('nan')
else:
logging.error("Error: No response for voltage command.")
return float('nan')
def __get_temperature(self, unit: str = PowerShieldConf.TemperatureUnit.CELSIUS) -> float:
"""
Sends the temperature command and returns the temperature as a float.
:param unit: The unit to request the temperature in, either 'degc' or 'degf'.
:return: The temperature value as a float, in the specified unit (°C or °F).
"""
# Send the temp command with the unit
response = self.__send_command(f"temp {unit}", expected_ack=f"ack temp {unit}", ack=True)
# If response contains the expected acknowledgment, extract the temperature
if response:
try:
# Example response format: "PowerShield > ack temp degc 28.0"
parts = response.split()
if len(parts) >= 5 and parts[5].replace('.', '', 1).isdigit():
# Extract temperature and convert to float
self.target_temetarute = float(parts[5])
logging.info(f"Temperature: {self.target_temetarute} {unit}")
return self.target_temetarute
else:
print("Error: Temperature value not found in response.")
return None
except ValueError:
logging.error("Error: Could not convert temperature value.")
return None
else:
logging.error("Error: No response for temp command.")
return None
def __take_control(self) -> str:
"""
Sends the 'htc' command and verifies the acknowledgment.
:return: The acknowledgment response or error message.
"""
return self.__send_command("htc", expected_ack="ack htc", ack=True)
def __set_format(self, data_format: str = PowerShieldConf.DataFormat.ASCII_DEC):
"""
Sets the measurement data format.
The format can be either ASCII (decimal) or Binary (hexadecimal).
:param data_format: The data format to set.
Options are 'ascii_dec' or 'bin_hexa'.
:return: None
"""
# Validate the input format
if data_format not in vars(PowerShieldConf.DataFormat).values():
logging.error(
f"Error: Invalid format '{data_format}'. "
"Valid options are 'ascii_dec' or 'bin_hexa'."
)
return
command = f"format {data_format}"
response = self.__send_command(command, expected_ack=f"ack format {data_format}", ack=True)
# If response contains the expected acknowledgment, the format was set successfully
if response:
logging.info(f"Data format set to {data_format}.")
else:
logging.error(f"Error: Failed to set data format to {data_format}.")
def __set_frequency(self, frequency: enumerate):
"""
Sets the sampling frequency for the measurement.
The frequency can be any valid value from the list.
:param frequency: The sampling frequency to set.
Valid options include:
{100k, 50k, 20k, 10k, 5k, 2k, 1k, 500, 200, 100, 50, 20, 10, 5, 2, 1}.
:return: None
"""
# Validate the input frequency
if frequency not in vars(PowerShieldConf.SamplingFrequency).values():
logging.error(
f"Error: Invalid frequency '{frequency}'."
"Valid options are:"
"100k, 50k, 20k, 10k, 5k, 2k, 1k, 500, 200, 100, 50, 20, 10, 5, 2, 1."
)
return
command = f"freq {frequency}"
response = self.__send_command(command, expected_ack=f"ack freq {frequency}", ack=True)
if response:
logging.info(f"Sampling frequency set to {frequency}.")
else:
logging.error(f"Error: Failed to set sampling frequency to {frequency}.")
def __set_acquisition_time(self, acquisition_time: str = '0'):
command = f"acqtime {acquisition_time}"
response = self.__send_command(
command, expected_ack=f"ack acqtime {acquisition_time}", ack=True
)
if response:
logging.info(f"Acquisition time set to {acquisition_time}.")
else:
logging.error(f"Error: Failed to set acquisition time to {acquisition_time}.")
def __set_voltage(self, voltage: enumerate):
command = f"volt {voltage}"
response = self.__send_command(command, expected_ack=f"ack volt {voltage}", ack=True)
if response:
logging.info(f"Voltage set to {voltage}.")
else:
logging.error(f"Error: Failed to set voltage to {voltage}.")
def __set_func_mode(self, function_mode: str = PowerShieldConf.FunctionMode.HIGH):
"""
Sets the acquisition mode for current measurement.
The function_mode can be either 'optim' or 'high'.
- 'optim': Priority on current resolution (100 nA - 10 mA) with max freq at 100 kHz.
- 'high': High current (30 µA - 10 mA), high frequency (50-100 kHz), high resolution.
:param mode: The acquisition mode. Must be either 'optim' or 'high'.
:return: None
"""
# Validate the input format
if function_mode not in vars(PowerShieldConf.FunctionMode).values():
logging.error(
f"Error: Invalid format '{function_mode}'."
"Valid options are 'ascii_dec' or 'bin_hexa'."
)
return
command = f"funcmode {function_mode}"
response = self.__send_command(
command, expected_ack=f"ack funcmode {function_mode}", ack=True
)
if response:
logging.info(f"Data format set to {function_mode}.")
else:
logging.error(f"Error: Failed to set data format to {function_mode}.")
def __acq_data(self):
"""
Continuously reads data from the serial port and puts it
into a queue until acquisition is complete.
"""
logging.info("Started data acquisition...")
while True:
# Read the first byte
first_byte = self.handler.read_bytes(1)
if len(first_byte) < 1 or self.acqComplete: # Exit conditions
logging.info("Stopping data acquisition...")
return
# Check if it's metadata
if first_byte == b'\xf0': # Metadata marker
second_byte = self.handler.read_bytes(1)
# Handle metadata types
metadata_type = second_byte[0]
self.__handle_metadata(metadata_type)
else:
# Not metadata, treat as data
if self.acqStart:
second_byte = self.handler.read_bytes(1)
data = []
data.append(first_byte)
if len(second_byte) < 1 or self.acqComplete:
logging.info("Stopping data acquisition...")
return
data.append(second_byte)
amps = UtilityFunctions.convert_to_amps(
UtilityFunctions.bytes_to_twobyte_values(data)
)
self.dataQueue.put([amps])
def __handle_metadata(self, metadata_type):
if metadata_type == 0xF1:
logging.info("Received Metadata: ASCII error message.")
# self.handle_metadata_error()
elif metadata_type == 0xF2:
logging.info("Received Metadata: ASCII information message.")
# self.handle_metadata_info()
elif metadata_type == 0xF3:
logging.info("Received Metadata: Timestamp message.")
self.__handle_metadata_timestamp()
self.acqStart = True
elif metadata_type == 0xF4:
logging.info("Received Metadata: End of acquisition tag.")
self.__handle_metadata_end()
self.__handle_summary()
elif metadata_type == 0xF5:
logging.info("Received Metadata: Overcurrent detected.")
# self.handle_metadata_overcurrent()
else:
logging.error(f"Error: Unknown Metadata Type: {metadata_type:#04x}")
def __handle_summary(self):
s = ""
while True:
# Read the first byte
x = self.handler.read_bytes(1)
if len(x) < 1 or x == 0xF0:
self.acqComplete = True
return s.replace("\0", "").strip().replace("\r", "").replace("\n\n\n", "\n")
s += str(x, encoding='ascii', errors='ignore')
def __handle_metadata_end(self):
"""
Handle metadata end of acquisition message.
"""
# Read the next 2 bytes
metadata_bytes = self.handler.read_bytes(2)
if len(metadata_bytes) < 2:
logging.error("Error: Incomplete end of acquisition metadata reveived.")
return
# Check for end tags (last 2 bytes)
end_tag_1 = metadata_bytes[0]
end_tag_2 = metadata_bytes[1]
if end_tag_1 != 0xFF or end_tag_2 != 0xFF:
logging.error("Error: Invalid metadata end tags received.")
return
def __handle_metadata_timestamp(self):
"""
Handle metadata timestamp message. Parses and displays the timestamp and buffer load.
"""
# Read the next 7 bytes (timestamp + buffer load + end tags)
metadata_bytes = self.handler.read_bytes(7)
if len(metadata_bytes) < 7:
logging.error("Error: Incomplete timestamp metadata received.")
return
# Parse the timestamp (4 bytes, big-endian)
timestamp_ms = int.from_bytes(metadata_bytes[0:4], byteorder='big', signed=False)
# Parse the buffer Tx load value (1 byte)
buffer_load = metadata_bytes[4]
# Check for end tags (last 2 bytes)
end_tag_1 = metadata_bytes[5]
end_tag_2 = metadata_bytes[6]
if end_tag_1 != 0xFF or end_tag_2 != 0xFF:
logging.error("Error: Invalid metadata end tags received.")
return
# Display parsed values
logging.info(f"Metadata Timestamp: {timestamp_ms} ms")
logging.info(f"Buffer Tx Load: {buffer_load}%")
def __start_measurement(self):
"""
Starts the measurement by sending the 'start' command. Once the measurement starts,
data can be received continuously until the 'stop' command is sent.
:return: None
"""
command = "start"
self.acqComplete = False
self.__send_command(command)
raw_to_file_Thread = threading.Thread(
target=self.__raw_to_file, args=(self.power_shield_conf.output_file,)
)
raw_to_file_Thread.start()
logging.info("Measurement started. Receiving data...")
self.__acq_data()
raw_to_file_Thread.join()
def __raw_to_file(self, outputFilePath: str):
# Open a CSV file for writing
with open(outputFilePath, 'w', newline='') as outputFile:
writer = csv.writer(outputFile)
while True:
if self.dataQueue.empty() and bool(self.acqComplete):
outputFile.close()
break
if not self.dataQueue.empty():
data = self.dataQueue.get()
writer.writerow(data)
outputFile.flush()
else:
time.sleep(0.1)
def measure(self, time: int, freq: str = None, reset: bool = False):
self.power_shield_conf.acquisition_time = time
_time, self.power_shield_conf.acquisition_time_unit = UtilityFunctions.convert_acq_time(
time
)
if reset:
self.__reset()
self.__take_control()
self.__set_format(self.power_shield_conf.data_format)
if freq is not None:
self.__set_frequency(freq)
else:
self.__set_frequency(self.power_shield_conf.sampling_frequency)
self.__set_acquisition_time(
UtilityFunctions.convert_to_scientific_notation(
time=_time, unit=self.power_shield_conf.acquisition_time_unit
)
)
self.__start_measurement()
def get_data(self, unit: str = PowerShieldConf.MeasureUnit.RAW_DATA):
if self.acqComplete:
# Open the CSV file
with open(self.power_shield_conf.output_file) as file:
csv_reader = csv.reader(file)
for row in csv_reader:
self.power_shield_data.data.append(row[0])
if unit == PowerShieldConf.MeasureUnit.CURRENT_RMS:
self.power_shield_data.current_RMS = UtilityFunctions.calculate_rms(
self.power_shield_data.data
)
return self.power_shield_data.current_RMS
elif unit == PowerShieldConf.MeasureUnit.POWER:
_delta_time = self.power_shield_conf.acquisition_time
self.power_shield_data.power = 0
for data in self.power_shield_data.data:
self.power_shield_data.power += float(
float(data) * float(_delta_time) * float(self.target_voltage)
)
return self.power_shield_data.power
elif unit == PowerShieldConf.MeasureUnit.RAW_DATA:
return self.power_shield_data.data
else:
logging.error("Error: Unknown unit of requested data")
else:
logging.info("Acquisition not complete.")
return None