|
| 1 | +########################################################################################## |
| 2 | +# Copyright (c) 2025 Nordic Semiconductor |
| 3 | +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause |
| 4 | +########################################################################################## |
| 5 | + |
| 6 | +import os |
| 7 | +import time |
| 8 | +import json |
| 9 | +import types |
| 10 | +import pytest |
| 11 | +import csv |
| 12 | +import pandas as pd |
| 13 | +import plotly.express as px |
| 14 | +from tests.conftest import get_uarts |
| 15 | +from ppk2_api.ppk2_api import PPK2_API |
| 16 | +from utils.uart import Uart |
| 17 | +from utils.flash_tools import flash_device, reset_device, recover_device |
| 18 | +import sys |
| 19 | +sys.path.append(os.getcwd()) |
| 20 | +from utils.logger import get_logger |
| 21 | + |
| 22 | +logger = get_logger() |
| 23 | + |
| 24 | +UART_TIMEOUT = 60 * 30 |
| 25 | +POWER_TIMEOUT = 60 * 15 |
| 26 | +MAX_CURRENT_PSM_UA = 10 |
| 27 | +SAMPLING_INTERVAL = 0.01 |
| 28 | +CSV_FILE = "power_measurements.csv" |
| 29 | +HMTL_PLOT_FILE = "power_measurements_plot.html" |
| 30 | +SEGGER = os.getenv('SEGGER') |
| 31 | + |
| 32 | + |
| 33 | +def save_badge_data(average): |
| 34 | + badge_filename = "power_badge.json" |
| 35 | + logger.info(f"Minimum average current measured: {average}uA") |
| 36 | + if average < 0: |
| 37 | + pytest.skip(f"current cant be negative, current average: {average}") |
| 38 | + elif average <= 10: |
| 39 | + color = "green" |
| 40 | + elif average <= 50: |
| 41 | + color = "yellow" |
| 42 | + else: |
| 43 | + color = "red" |
| 44 | + |
| 45 | + badge_data = { |
| 46 | + "label": "🔗 PSM current uA", |
| 47 | + "message": f"{average}", |
| 48 | + "schemaVersion": 1, |
| 49 | + "color": f"{color}" |
| 50 | + } |
| 51 | + |
| 52 | + # Save the JSON data to a file |
| 53 | + with open(badge_filename, 'w') as json_file: |
| 54 | + json.dump(badge_data, json_file) |
| 55 | + |
| 56 | + logger.info(f"Minimum average current saved to {badge_filename}") |
| 57 | + |
| 58 | + |
| 59 | +def save_measurement_data(samples): |
| 60 | + # Generate timestamps for each sample assuming uniform sampling interval |
| 61 | + timestamps = [round(i * SAMPLING_INTERVAL, 2) for i in range(len(samples))] |
| 62 | + |
| 63 | + with open(CSV_FILE, 'w', newline='') as csvfile: |
| 64 | + fieldnames = ['Time (s)', 'Current (uA)'] |
| 65 | + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
| 66 | + |
| 67 | + writer.writeheader() |
| 68 | + for t, current in zip(timestamps, samples): |
| 69 | + writer.writerow({'Time (s)': t, 'Current (uA)': current}) |
| 70 | + |
| 71 | + logger.info(f"Measurement data saved to {CSV_FILE}") |
| 72 | + |
| 73 | + |
| 74 | +def generate_time_series_html(csv_file, date_column, value_column, output_file="time_series_plot.html"): |
| 75 | + """ |
| 76 | + Generates an HTML file with an interactive time series plot from a CSV file. |
| 77 | +
|
| 78 | + Parameters: |
| 79 | + - csv_file (str): Path to the CSV file containing the time series data. |
| 80 | + - date_column (str): Name of the column containing date or time information. |
| 81 | + - value_column (str): Name of the column containing the values to plot. |
| 82 | + - output_file (str): Name of the output HTML file (default is "time_series_plot.html"). |
| 83 | +
|
| 84 | + Returns: |
| 85 | + - str: The path to the generated HTML file. |
| 86 | + """ |
| 87 | + # Load the CSV file |
| 88 | + df = pd.read_csv(csv_file, parse_dates=[date_column]) |
| 89 | + |
| 90 | + title = "Asset Tracket Template Current Consumption Plot\n\n" |
| 91 | + note_text = "Note: application is still in development, not reaching target psm current yet" |
| 92 | + title += f"<br><span style='font-size:12px;color:gray;'>{note_text}</span>" |
| 93 | + |
| 94 | + # Create an interactive Plotly line chart |
| 95 | + fig = px.line(df, x=date_column, y=value_column, title=title) |
| 96 | + |
| 97 | + # Save the plot to an HTML file |
| 98 | + fig.write_html(output_file) |
| 99 | + |
| 100 | + logger.info(f"HTML file generated: {output_file}") |
| 101 | + return output_file |
| 102 | + |
| 103 | + |
| 104 | +@pytest.fixture(scope="module") |
| 105 | +def thingy91x_ppk2(): |
| 106 | + ''' |
| 107 | + This fixture sets up ppk measurement tool. |
| 108 | + ''' |
| 109 | + ppk2s_connected = PPK2_API.list_devices() |
| 110 | + ppk2s_connected.sort() |
| 111 | + if len(ppk2s_connected) == 2: |
| 112 | + ppk2_port = ppk2s_connected[0] |
| 113 | + ppk2_serial = ppk2s_connected[1] |
| 114 | + logger.info(f"Found PPK2 at port: {ppk2_port}, serial: {ppk2_serial}") |
| 115 | + elif len(ppk2s_connected) == 0: |
| 116 | + pytest.skip("No ppk found") |
| 117 | + else: |
| 118 | + pytest.skip(f"PPK should list 2 ports, but found {ppk2s_connected}") |
| 119 | + |
| 120 | + ppk2_dev = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) |
| 121 | + |
| 122 | + # get modifier might fail, retry 15 times |
| 123 | + for _ in range(15): |
| 124 | + try: |
| 125 | + ppk2_dev.get_modifiers() |
| 126 | + break |
| 127 | + except Exception as e: |
| 128 | + logger.error(f"Failed to get modifiers: {e}") |
| 129 | + time.sleep(5) |
| 130 | + else: |
| 131 | + pytest.skip("Failed to get ppk modifiers after 10 attempts") |
| 132 | + |
| 133 | + ppk2_dev.set_source_voltage(3300) |
| 134 | + ppk2_dev.use_ampere_meter() # set ampere meter mode |
| 135 | + ppk2_dev.toggle_DUT_power("ON") # enable DUT power |
| 136 | + |
| 137 | + time.sleep(10) |
| 138 | + for i in range(10): |
| 139 | + try: |
| 140 | + all_uarts = get_uarts() |
| 141 | + logger.warning(f"momo all uarts {all_uarts}") |
| 142 | + if not all_uarts: |
| 143 | + logger.error("No UARTs found") |
| 144 | + log_uart_string = all_uarts[0] |
| 145 | + break |
| 146 | + except Exception as e: |
| 147 | + logger.warning(f"Exception: {e}") |
| 148 | + ppk2_dev.toggle_DUT_power("OFF") # disable DUT power |
| 149 | + time.sleep(2) |
| 150 | + ppk2_dev.toggle_DUT_power("ON") # enable DUT power |
| 151 | + time.sleep(5) |
| 152 | + continue |
| 153 | + else: |
| 154 | + pytest.skip("NO uart after 10 attempts") |
| 155 | + |
| 156 | + t91x_uart = Uart(log_uart_string, timeout=UART_TIMEOUT) |
| 157 | + |
| 158 | + yield types.SimpleNamespace(ppk2_dev=ppk2_dev, t91x_uart=t91x_uart) |
| 159 | + |
| 160 | + t91x_uart.stop() |
| 161 | + recover_device(serial=SEGGER) |
| 162 | + ppk2_dev.stop_measuring() |
| 163 | + |
| 164 | +@pytest.mark.slow |
| 165 | +def test_power(thingy91x_ppk2, hex_file): |
| 166 | + ''' |
| 167 | + Test that the device can reach PSM and measure the current consumption |
| 168 | +
|
| 169 | + Current consumption is measured and report generated. |
| 170 | + ''' |
| 171 | + flash_device(os.path.abspath(hex_file), serial=SEGGER) |
| 172 | + reset_device(serial=SEGGER) |
| 173 | + try: |
| 174 | + thingy91x_ppk2.t91x_uart.wait_for_str("Connected to Cloud", timeout=120) |
| 175 | + except AssertionError: |
| 176 | + pytest.skip("Device unable to connect to cloud, skip ppk test") |
| 177 | + |
| 178 | + thingy91x_ppk2.ppk2_dev.start_measuring() |
| 179 | + |
| 180 | + start = time.time() |
| 181 | + min_rolling_average = float('inf') |
| 182 | + rolling_average = float('inf') |
| 183 | + samples_list = [] |
| 184 | + last_log_time = start |
| 185 | + psm_reached = False |
| 186 | + |
| 187 | + # Initialize an empty pandas Series to store samples over time |
| 188 | + samples_series = pd.Series(dtype='float64') |
| 189 | + while time.time() < start + POWER_TIMEOUT: |
| 190 | + try: |
| 191 | + read_data = thingy91x_ppk2.ppk2_dev.get_data() |
| 192 | + if read_data != b'': |
| 193 | + ppk_samples, _ = thingy91x_ppk2.ppk2_dev.get_samples(read_data) |
| 194 | + sample = sum(ppk_samples) / len(ppk_samples) |
| 195 | + sample = round(sample, 2) |
| 196 | + samples_list.append(sample) |
| 197 | + |
| 198 | + # Append the new sample to the Pandas Series |
| 199 | + samples_series = pd.concat([samples_series, pd.Series([sample])], ignore_index=True) |
| 200 | + |
| 201 | + # Log and store every 3 seconds |
| 202 | + current_time = time.time() |
| 203 | + if current_time - last_log_time >= 3: |
| 204 | + # Calculate rolling average over the last 3 seconds |
| 205 | + window_size = int(3 / SAMPLING_INTERVAL) |
| 206 | + rolling_average_series = samples_series.rolling(window=window_size).mean() |
| 207 | + rolling_average = rolling_average_series.iloc[-1] # Get the last rolling average value |
| 208 | + rolling_average = round(rolling_average, 2) if not pd.isna(rolling_average) else rolling_average |
| 209 | + logger.info(f"Average current over last 3 secs: {rolling_average} uA") |
| 210 | + |
| 211 | + if rolling_average < min_rolling_average: |
| 212 | + min_rolling_average = rolling_average |
| 213 | + |
| 214 | + last_log_time = current_time |
| 215 | + |
| 216 | + # Check if PSM target has been reached |
| 217 | + if rolling_average < MAX_CURRENT_PSM_UA and rolling_average > 0: |
| 218 | + psm_reached = True |
| 219 | + |
| 220 | + except Exception as e: |
| 221 | + logger.error(f"Catching exception: {e}") |
| 222 | + pytest.skip("Something went wrong, unable to perform power measurements") |
| 223 | + |
| 224 | + time.sleep(SAMPLING_INTERVAL) # lower time between sampling -> less samples read in one sampling period |
| 225 | + |
| 226 | + # Save measurement data and generate HTML report |
| 227 | + save_badge_data(min_rolling_average) |
| 228 | + save_measurement_data(samples_list) |
| 229 | + generate_time_series_html(CSV_FILE, 'Time (s)', 'Current (uA)', HMTL_PLOT_FILE) |
| 230 | + |
| 231 | + # Determine test result based on whether PSM was reached |
| 232 | + if not psm_reached: |
| 233 | + pytest.fail(f"PSM target not reached after {POWER_TIMEOUT / 60} minutes, only reached {min_rolling_average} uA") |
0 commit comments