|
| 1 | +########################################################################################## |
| 2 | +# Copyright (c) 2025 Nordic Semiconductor |
| 3 | +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause |
| 4 | +########################################################################################## |
| 5 | + |
| 6 | +import pytest |
| 7 | +import time |
| 8 | +import os |
| 9 | +import functools |
| 10 | +from utils.flash_tools import flash_device, reset_device |
| 11 | +from utils.nrfcloud_fota import FWType, NRFCloudFOTAError |
| 12 | +import sys |
| 13 | +sys.path.append(os.getcwd()) |
| 14 | +from utils.logger import get_logger |
| 15 | + |
| 16 | +logger = get_logger() |
| 17 | + |
| 18 | +MFW_202_FILEPATH = "artifacts/mfw_nrf91x1_2.0.2.zip" |
| 19 | + |
| 20 | +# Stable version used for testing |
| 21 | +TEST_APP_VERSION = "foo" |
| 22 | +TEST_APP_BIN = "artifacts/stable_version_jan_2025-update-signed.bin" |
| 23 | + |
| 24 | +DELTA_MFW_BUNDLEID = "MODEM*3471f88e*mfw_nrf91x1_2.0.2-FOTA-TEST" |
| 25 | +FULL_MFW_BUNDLEID = "MDM_FULL*124c2b20*mfw_nrf91x1_full_2.0.2" |
| 26 | +NEW_MFW_DELTA_VERSION = "mfw_nrf91x1_2.0.2-FOTA-TEST" |
| 27 | +MFW_202_VERSION = "mfw_nrf91x1_2.0.2" |
| 28 | + |
| 29 | +DEVICE_MSG_TIMEOUT = 60 * 5 |
| 30 | +APP_FOTA_TIMEOUT = 60 * 10 |
| 31 | +FULL_MFW_FOTA_TIMEOUT = 60 * 30 |
| 32 | + |
| 33 | + |
| 34 | +def await_nrfcloud(func, expected, field, timeout): |
| 35 | + start = time.time() |
| 36 | + logger.info(f"Awaiting {field} == {expected} in nrfcloud shadow...") |
| 37 | + while True: |
| 38 | + time.sleep(5) |
| 39 | + if time.time() - start > timeout: |
| 40 | + raise RuntimeError(f"Timeout awaiting {field} update") |
| 41 | + try: |
| 42 | + data = func() |
| 43 | + except Exception as e: |
| 44 | + logger.warning(f"Exception {e} during waiting for {field}") |
| 45 | + continue |
| 46 | + logger.debug(f"Reported {field}: {data}") |
| 47 | + if expected in data: |
| 48 | + break |
| 49 | + |
| 50 | +def get_appversion(t91x_fota): |
| 51 | + shadow = t91x_fota.fota.get_device(t91x_fota.device_id) |
| 52 | + return shadow["state"]["reported"]["device"]["deviceInfo"]["appVersion"] |
| 53 | + |
| 54 | +def get_modemversion(t91x_fota): |
| 55 | + shadow = t91x_fota.fota.get_device(t91x_fota.device_id) |
| 56 | + return shadow["state"]["reported"]["device"]["deviceInfo"]["modemFirmware"] |
| 57 | + |
| 58 | +def run_fota_resumption(t91x_fota, fota_type): |
| 59 | + timeout_50_percent= APP_FOTA_TIMEOUT/2 |
| 60 | + t91x_fota.uart.wait_for_str("50%", timeout=timeout_50_percent) |
| 61 | + logger.debug(f"Testing fota resumption on disconnect for {fota_type} fota") |
| 62 | + |
| 63 | + patterns_lte_offline = ["network: Network connectivity lost"] |
| 64 | + patterns_lte_normal = ["network: Network connectivity established"] |
| 65 | + |
| 66 | + # LTE disconnect |
| 67 | + t91x_fota.uart.flush() |
| 68 | + t91x_fota.uart.write("lte offline\r\n") |
| 69 | + t91x_fota.uart.wait_for_str(patterns_lte_offline, timeout=20) |
| 70 | + |
| 71 | + # LTE reconnect |
| 72 | + t91x_fota.uart.flush() |
| 73 | + t91x_fota.uart.write("lte normal\r\n") |
| 74 | + t91x_fota.uart.wait_for_str(patterns_lte_normal, timeout=120) |
| 75 | + |
| 76 | + t91x_fota.uart.wait_for_str("fota_download: Refuse fragment, restart with offset") |
| 77 | + t91x_fota.uart.wait_for_str("fota_download: Downloading from offset:") |
| 78 | + |
| 79 | +@pytest.fixture |
| 80 | +def run_fota_fixture(t91x_fota, hex_file): |
| 81 | + def _run_fota(bundle_id="", fota_type="app", fotatimeout=APP_FOTA_TIMEOUT, new_version=TEST_APP_VERSION): |
| 82 | + flash_device(os.path.abspath(hex_file)) |
| 83 | + t91x_fota.uart.xfactoryreset() |
| 84 | + t91x_fota.uart.flush() |
| 85 | + reset_device() |
| 86 | + t91x_fota.uart.wait_for_str("Connected to Cloud") |
| 87 | + |
| 88 | + if fota_type == "app": |
| 89 | + bundle_id = t91x_fota.fota.upload_firmware( |
| 90 | + "nightly_test_app", |
| 91 | + TEST_APP_BIN, |
| 92 | + TEST_APP_VERSION, |
| 93 | + "Bundle used for nightly test", |
| 94 | + FWType.app, |
| 95 | + ) |
| 96 | + logger.info(f"Uploaded file {TEST_APP_BIN}: bundleId: {bundle_id}") |
| 97 | + |
| 98 | + try: |
| 99 | + t91x_fota.data['job_id'] = t91x_fota.fota.create_fota_job(t91x_fota.device_id, bundle_id) |
| 100 | + t91x_fota.data['bundle_id'] = bundle_id |
| 101 | + except NRFCloudFOTAError as e: |
| 102 | + pytest.skip(f"FOTA create_job REST API error: {e}") |
| 103 | + logger.info(f"Created FOTA Job (ID: {t91x_fota.data['job_id']})") |
| 104 | + |
| 105 | + # Sleep a bit and trigger fota poll |
| 106 | + for i in range(3): |
| 107 | + try: |
| 108 | + time.sleep(30) |
| 109 | + t91x_fota.uart.write("zbus button_press\r\n") |
| 110 | + t91x_fota.uart.wait_for_str("nrf_cloud_fota_poll: Starting FOTA download") |
| 111 | + break |
| 112 | + except AssertionError: |
| 113 | + continue |
| 114 | + else: |
| 115 | + raise AssertionError(f"Fota update not available after {i} attempts") |
| 116 | + |
| 117 | + |
| 118 | + if fota_type == "app": |
| 119 | + run_fota_resumption(t91x_fota, "app") |
| 120 | + |
| 121 | + await_nrfcloud( |
| 122 | + functools.partial(t91x_fota.fota.get_fota_status, t91x_fota.data['job_id']), |
| 123 | + "COMPLETED", |
| 124 | + "FOTA status", |
| 125 | + fotatimeout |
| 126 | + ) |
| 127 | + try: |
| 128 | + if fota_type == "app": |
| 129 | + await_nrfcloud( |
| 130 | + functools.partial(get_appversion, t91x_fota), |
| 131 | + new_version, |
| 132 | + "appVersion", |
| 133 | + DEVICE_MSG_TIMEOUT |
| 134 | + ) |
| 135 | + else: |
| 136 | + await_nrfcloud( |
| 137 | + functools.partial(get_modemversion, t91x_fota), |
| 138 | + new_version, |
| 139 | + "modemFirmware", |
| 140 | + DEVICE_MSG_TIMEOUT |
| 141 | + ) |
| 142 | + except RuntimeError: |
| 143 | + logger.error(f"Version is not {new_version} after {DEVICE_MSG_TIMEOUT}s") |
| 144 | + |
| 145 | + return _run_fota |
| 146 | + |
| 147 | + |
| 148 | +def test_app_fota(run_fota_fixture): |
| 149 | + ''' |
| 150 | + Test application FOTA from nightly version to stable version |
| 151 | + ''' |
| 152 | + run_fota_fixture() # Uses default parameters for app FOTA |
| 153 | + |
| 154 | +def test_delta_mfw_fota(run_fota_fixture): |
| 155 | + ''' |
| 156 | + Test delta modem FOTA on nrf9151 |
| 157 | + ''' |
| 158 | + try: |
| 159 | + run_fota_fixture( |
| 160 | + bundle_id=DELTA_MFW_BUNDLEID, |
| 161 | + fota_type="delta", |
| 162 | + new_version=NEW_MFW_DELTA_VERSION |
| 163 | + ) |
| 164 | + finally: |
| 165 | + # Restore mfw202, no matter if test pass/fails |
| 166 | + flash_device(os.path.abspath(MFW_202_FILEPATH)) |
| 167 | + |
| 168 | +@pytest.mark.slow |
| 169 | +def test_full_mfw_fota(run_fota_fixture): |
| 170 | + ''' |
| 171 | + Test full modem FOTA on nrf9151 |
| 172 | + ''' |
| 173 | + run_fota_fixture( |
| 174 | + bundle_id=FULL_MFW_BUNDLEID, |
| 175 | + fota_type="full", |
| 176 | + new_version=MFW_202_VERSION, |
| 177 | + fotatimeout=FULL_MFW_FOTA_TIMEOUT |
| 178 | + ) |
0 commit comments