diff --git a/tests/on_target/tests/conftest.py b/tests/on_target/tests/conftest.py index bb98e19..2a2fff2 100644 --- a/tests/on_target/tests/conftest.py +++ b/tests/on_target/tests/conftest.py @@ -12,7 +12,7 @@ import sys sys.path.append(os.getcwd()) from utils.logger import get_logger -from utils.nrfcloud_fota import NRFCloudFOTA +from utils.nrfcloud import NRFCloud, NRFCloudFOTA logger = get_logger() @@ -20,7 +20,7 @@ SEGGER = os.getenv('SEGGER') UART_ID = os.getenv('UART_ID', SEGGER) -FOTADEVICE_UUID = os.getenv('UUID') +DEVICE_UUID = os.getenv('UUID') NRFCLOUD_API_KEY = os.getenv('NRFCLOUD_API_KEY') DUT_DEVICE_TYPE = os.getenv('DUT_DEVICE_TYPE') @@ -73,15 +73,31 @@ def dut_board(): scan_log_for_assertions(uart_log) +@pytest.fixture(scope="function") +def dut_cloud(dut_board): + if not NRFCLOUD_API_KEY: + pytest.skip("NRFCLOUD_API_KEY environment variable not set") + if not DEVICE_UUID: + pytest.skip("UUID environment variable not set") + + cloud = NRFCloud(api_key=NRFCLOUD_API_KEY) + device_id = DEVICE_UUID + + yield types.SimpleNamespace( + **dut_board.__dict__, + cloud=cloud, + device_id=device_id, + ) + @pytest.fixture(scope="function") def dut_fota(dut_board): if not NRFCLOUD_API_KEY: pytest.skip("NRFCLOUD_API_KEY environment variable not set") - if not FOTADEVICE_UUID: + if not DEVICE_UUID: pytest.skip("UUID environment variable not set") fota = NRFCloudFOTA(api_key=NRFCLOUD_API_KEY) - device_id = FOTADEVICE_UUID + device_id = DEVICE_UUID data = { 'job_id': '', } diff --git a/tests/on_target/tests/test_functional/test_fota.py b/tests/on_target/tests/test_functional/test_fota.py index f46b564..068b8b4 100644 --- a/tests/on_target/tests/test_functional/test_fota.py +++ b/tests/on_target/tests/test_functional/test_fota.py @@ -8,7 +8,7 @@ import os import functools from utils.flash_tools import flash_device, reset_device -from utils.nrfcloud_fota import NRFCloudFOTAError +from utils.nrfcloud import NRFCloudFOTAError import sys sys.path.append(os.getcwd()) from utils.logger import get_logger diff --git a/tests/on_target/tests/test_functional/test_sampling.py b/tests/on_target/tests/test_functional/test_sampling.py index bdffab1..ed527a9 100644 --- a/tests/on_target/tests/test_functional/test_sampling.py +++ b/tests/on_target/tests/test_functional/test_sampling.py @@ -17,10 +17,6 @@ def test_sampling(dut_board, hex_file): flash_device(os.path.abspath(hex_file)) dut_board.uart.xfactoryreset() - patterns_cloud_connection = [ - "Network connectivity established", - "Connected to Cloud" - ] # Log patterns pattern_location = "location_event_handler: Got location: lat:" @@ -42,6 +38,7 @@ def test_sampling(dut_board, hex_file): # Cloud connection dut_board.uart.flush() reset_device() + dut_board.uart.wait_for_str("Connected to Cloud", timeout=120) # Sampling dut_board.uart.wait_for_str(pattern_list, timeout=120) diff --git a/tests/on_target/tests/test_functional/test_shell.py b/tests/on_target/tests/test_functional/test_shell.py new file mode 100644 index 0000000..5c264d1 --- /dev/null +++ b/tests/on_target/tests/test_functional/test_shell.py @@ -0,0 +1,84 @@ +########################################################################################## +# Copyright (c) 2025 Nordic Semiconductor +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +########################################################################################## + +import os +import time +from utils.flash_tools import flash_device, reset_device +import sys +sys.path.append(os.getcwd()) +from utils.logger import get_logger + +logger = get_logger() + +CLOUD_TIMEOUT = 60 + +def test_shell(dut_cloud, hex_file): + ''' + Test that the device is operating normally by checking UART output + ''' + flash_device(os.path.abspath(hex_file)) + dut_cloud.uart.xfactoryreset() + + patterns_boot = [ + "Connected to Cloud", + "main: requesting_sensors_and_polling_entry: Next trigger in" + ] + patterns_button_press = [ + "main: requesting_location_entry: requesting_location_entry", + ] + patterns_cloud_publish = [ + 'Sending on payload channel: {"messageType":"DATA","appId":"donald","data":"duck"', + ] + patterns_network_disconnected = [ + "network: Network connectivity lost", + ] + patterns_network_connected = [ + "network: Network connectivity established", + ] + + # Boot + dut_cloud.uart.flush() + reset_device() + dut_cloud.uart.wait_for_str(patterns_boot, timeout=120) + + # Button press + dut_cloud.uart.flush() + dut_cloud.uart.write("att_button_press 1\r\n") + dut_cloud.uart.wait_for_str(patterns_button_press, timeout=20) + + # Cloud publish + dut_cloud.uart.flush() + dut_cloud.uart.write("att_cloud_publish donald duck\r\n") + dut_cloud.uart.wait_for_str(patterns_cloud_publish, timeout=20) + + messages = dut_cloud.cloud.get_messages(dut_cloud.device_id, appname="donald", max_records=20, max_age_hrs=0.25) + + # Wait for message to be reported to cloud + start = time.time() + while time.time() - start < CLOUD_TIMEOUT: + time.sleep(5) + messages = dut_cloud.cloud.get_messages(dut_cloud.device_id, appname="donald", max_records=20, max_age_hrs=0.25) + logger.debug(f"Found messages: {messages}") + + latest_message = messages[0] if messages else None + if latest_message: + check_message_age = dut_cloud.cloud.check_message_age(message=latest_message, seconds=30) + if check_message_age: + break + else: + logger.debug("No message with recent timestamp, retrying...") + continue + else: + raise RuntimeError("No new message to cloud observed") + + # LTE disconnect + dut_cloud.uart.flush() + dut_cloud.uart.write("att_network disconnect\r\n") + dut_cloud.uart.wait_for_str(patterns_network_disconnected, timeout=20) + + # LTE reconnect + dut_cloud.uart.flush() + dut_cloud.uart.write("att_network connect\r\n") + dut_cloud.uart.wait_for_str(patterns_network_connected, timeout=120) diff --git a/tests/on_target/tests/test_functional/test_uart_output.py b/tests/on_target/tests/test_functional/test_uart_output.py deleted file mode 100644 index 8360542..0000000 --- a/tests/on_target/tests/test_functional/test_uart_output.py +++ /dev/null @@ -1,44 +0,0 @@ -########################################################################################## -# Copyright (c) 2024 Nordic Semiconductor -# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause -########################################################################################## - -import os -from utils.flash_tools import flash_device, reset_device -import sys -sys.path.append(os.getcwd()) -from utils.logger import get_logger - -logger = get_logger() - -def test_network_reconnect(dut_board, hex_file): - ''' - Test that the device is operating normally by checking UART output - ''' - flash_device(os.path.abspath(hex_file)) - dut_board.uart.xfactoryreset() - patterns_boot = [ - "Network connectivity established", - "Connected to Cloud", - ] - patterns_network_disconnected = [ - "network: Network connectivity lost", - ] - patterns_network_connected = [ - "network: Network connectivity established", - ] - - # Boot - dut_board.uart.flush() - reset_device() - dut_board.uart.wait_for_str(patterns_boot, timeout=120) - - # LTE disconnect - dut_board.uart.flush() - dut_board.uart.write("att_network disconnect\r\n") - dut_board.uart.wait_for_str(patterns_network_disconnected, timeout=20) - - # LTE reconnect - dut_board.uart.flush() - dut_board.uart.write("att_network connect\r\n") - dut_board.uart.wait_for_str(patterns_network_connected, timeout=120) diff --git a/tests/on_target/utils/nrfcloud_fota.py b/tests/on_target/utils/nrfcloud.py similarity index 86% rename from tests/on_target/utils/nrfcloud_fota.py rename to tests/on_target/utils/nrfcloud.py index 46af6e3..668263b 100644 --- a/tests/on_target/utils/nrfcloud_fota.py +++ b/tests/on_target/utils/nrfcloud.py @@ -11,6 +11,7 @@ import requests from enum import Enum from typing import Union +from datetime import datetime, timedelta, timezone from utils.logger import get_logger from requests.exceptions import HTTPError @@ -24,7 +25,7 @@ class FWType(Enum): class NRFCloudFOTAError(Exception): pass -class NRFCloudFOTA(): +class NRFCloud(): def __init__(self, api_key: str, url: str="https://api.nrfcloud.com/v1", timeout: int=10) -> None: """ Initalizes the class """ self.url = url @@ -64,6 +65,62 @@ def _patch(self, path: str, **kwargs): r.raise_for_status() return r + def get_devices(self, path: str="", params=None) -> dict: + return self._get(path=f"/devices{path}", params=params) + + def get_device(self, device_id: str, params=None) -> dict: + """ + Get all information about particular device on nrfcloud.com + + :param device_id: Device ID + :return: Json structure of result from nrfcloud.com + """ + return self.get_devices(path=f"/{device_id}", params=params) + + def get_messages(self, device: str=None, appname: str="donald", max_records: int=50, max_age_hrs: int=24) -> list: + """ + Get messages sent from asset_tracker to nrfcloud.com + + :param device_id: Limit result to messages from particular device + :param max_records: Limit number of messages to fetch + :param max_age_hrs: Limit fetching messages by timestamp + :return: List of (timestamp, message) + """ + end = datetime.now(timezone.utc).strftime(self.time_fmt) + start = (datetime.now(timezone.utc) - timedelta( + hours=max_age_hrs)).strftime(self.time_fmt) + params = { + 'start': start, + 'end': end, + 'pageSort': 'desc', + 'pageLimit': max_records + } + + if device: + params['deviceId'] = device + if appname: + params['appId'] = appname + + timestamp = lambda x: datetime.strptime(x['receivedAt'], self.time_fmt) + messages = self._get(path="/messages", params=params) + + return [(timestamp(x), x['message']) + for x in messages['items']] + + def check_message_age(self, message: dict, hours: int=0, minutes: int=0, seconds: int=0) -> bool: + """ + Check age of message, return False if message older than parameters + + :param messages: Single message + :param hours: Max message age hours + :param minutes: Max message age minutes + :param seconds: Max message age seconds + :return: bool True/False + """ + diff = timedelta(hours=hours, minutes=minutes, seconds=seconds) + return datetime.now(timezone.utc) - message[0].replace(tzinfo=timezone.utc) < diff + +class NRFCloudFOTA(NRFCloud): def upload_firmware( self, name: str, bin_file: str, version: str, description: str, fw_type: FWType, bin_file_2=None ) -> str: @@ -126,7 +183,6 @@ def upload_firmware( return m.group(3) return m.group(2) - def upload_zephyr_zip(self, zip_path: str, version: str, name: str=""): """ Upload zip image built by zephyr @@ -250,18 +306,6 @@ def post_fota_job(self, uuid: str, fw_id: str) -> Union[str, None]: self.delete_fota_job(job_id) return None - def get_devices(self, path: str="", params=None) -> dict: - return self._get(path=f"/devices{path}", params=params) - - def get_device(self, device_id: str, params=None) -> dict: - """ - Get all information about particular device on nrfcloud.com - - :param device_id: Device ID - :return: Json structure of result from nrfcloud.com - """ - return self.get_devices(path=f"/{device_id}", params=params) - def cancel_incomplete_jobs(self, uuid): fota_jobs = self.list_fota_jobs(pageLimit=100) items = fota_jobs['items']