Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: on_target: add shell tests #125

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions tests/on_target/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import sys
sys.path.append(os.getcwd())
from utils.logger import get_logger
from utils.nrfcloud_fota import NRFCloudFOTA
from utils.nrfcloud import NRFCloud, NRFCloudFOTA

logger = get_logger()

UART_TIMEOUT = 60 * 30

SEGGER = os.getenv('SEGGER')
UART_ID = os.getenv('UART_ID', SEGGER)
FOTADEVICE_UUID = os.getenv('UUID')
DEVICE_UUID = os.getenv('UUID')
NRFCLOUD_API_KEY = os.getenv('NRFCLOUD_API_KEY')
DUT_DEVICE_TYPE = os.getenv('DUT_DEVICE_TYPE')

Expand Down Expand Up @@ -73,15 +73,31 @@ def dut_board():

scan_log_for_assertions(uart_log)

@pytest.fixture(scope="function")
def dut_cloud(dut_board):
if not NRFCLOUD_API_KEY:
pytest.skip("NRFCLOUD_API_KEY environment variable not set")
if not DEVICE_UUID:
pytest.skip("UUID environment variable not set")

cloud = NRFCloud(api_key=NRFCLOUD_API_KEY)
device_id = DEVICE_UUID

yield types.SimpleNamespace(
**dut_board.__dict__,
cloud=cloud,
device_id=device_id,
)

@pytest.fixture(scope="function")
def dut_fota(dut_board):
if not NRFCLOUD_API_KEY:
pytest.skip("NRFCLOUD_API_KEY environment variable not set")
if not FOTADEVICE_UUID:
if not DEVICE_UUID:
pytest.skip("UUID environment variable not set")

fota = NRFCloudFOTA(api_key=NRFCLOUD_API_KEY)
device_id = FOTADEVICE_UUID
device_id = DEVICE_UUID
data = {
'job_id': '',
}
Expand Down
2 changes: 1 addition & 1 deletion tests/on_target/tests/test_functional/test_fota.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import functools
from utils.flash_tools import flash_device, reset_device
from utils.nrfcloud_fota import NRFCloudFOTAError
from utils.nrfcloud import NRFCloudFOTAError
import sys
sys.path.append(os.getcwd())
from utils.logger import get_logger
Expand Down
5 changes: 1 addition & 4 deletions tests/on_target/tests/test_functional/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
def test_sampling(dut_board, hex_file):
flash_device(os.path.abspath(hex_file))
dut_board.uart.xfactoryreset()
patterns_cloud_connection = [
"Network connectivity established",
"Connected to Cloud"
]

# Log patterns
pattern_location = "location_event_handler: Got location: lat:"
Expand All @@ -42,6 +38,7 @@ def test_sampling(dut_board, hex_file):
# Cloud connection
dut_board.uart.flush()
reset_device()
dut_board.uart.wait_for_str("Connected to Cloud", timeout=120)

# Sampling
dut_board.uart.wait_for_str(pattern_list, timeout=120)
Expand Down
84 changes: 84 additions & 0 deletions tests/on_target/tests/test_functional/test_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
##########################################################################################
# Copyright (c) 2025 Nordic Semiconductor
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
##########################################################################################

import os
import time
from utils.flash_tools import flash_device, reset_device
import sys
sys.path.append(os.getcwd())
from utils.logger import get_logger

logger = get_logger()

CLOUD_TIMEOUT = 60

def test_shell(dut_cloud, hex_file):
'''
Test that the device is operating normally by checking UART output
'''
flash_device(os.path.abspath(hex_file))
dut_cloud.uart.xfactoryreset()

patterns_boot = [
"Connected to Cloud",
"main: requesting_sensors_and_polling_entry: Next trigger in"
]
patterns_button_press = [
"main: requesting_location_entry: requesting_location_entry",
]
patterns_cloud_publish = [
'Sending on payload channel: {"messageType":"DATA","appId":"donald","data":"duck"',
]
patterns_network_disconnected = [
"network: Network connectivity lost",
]
patterns_network_connected = [
"network: Network connectivity established",
]

# Boot
dut_cloud.uart.flush()
reset_device()
dut_cloud.uart.wait_for_str(patterns_boot, timeout=120)

# Button press
dut_cloud.uart.flush()
dut_cloud.uart.write("att_button_press 1\r\n")
dut_cloud.uart.wait_for_str(patterns_button_press, timeout=20)

# Cloud publish
dut_cloud.uart.flush()
dut_cloud.uart.write("att_cloud_publish donald duck\r\n")
dut_cloud.uart.wait_for_str(patterns_cloud_publish, timeout=20)

messages = dut_cloud.cloud.get_messages(dut_cloud.device_id, appname="donald", max_records=20, max_age_hrs=0.25)

# Wait for message to be reported to cloud
start = time.time()
while time.time() - start < CLOUD_TIMEOUT:
time.sleep(5)
messages = dut_cloud.cloud.get_messages(dut_cloud.device_id, appname="donald", max_records=20, max_age_hrs=0.25)
logger.debug(f"Found messages: {messages}")

latest_message = messages[0] if messages else None
if latest_message:
check_message_age = dut_cloud.cloud.check_message_age(message=latest_message, seconds=30)
if check_message_age:
break
else:
logger.debug("No message with recent timestamp, retrying...")
continue
else:
raise RuntimeError("No new message to cloud observed")

# LTE disconnect
dut_cloud.uart.flush()
dut_cloud.uart.write("att_network disconnect\r\n")
dut_cloud.uart.wait_for_str(patterns_network_disconnected, timeout=20)

# LTE reconnect
dut_cloud.uart.flush()
dut_cloud.uart.write("att_network connect\r\n")
dut_cloud.uart.wait_for_str(patterns_network_connected, timeout=120)
44 changes: 0 additions & 44 deletions tests/on_target/tests/test_functional/test_uart_output.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests
from enum import Enum
from typing import Union
from datetime import datetime, timedelta, timezone
from utils.logger import get_logger
from requests.exceptions import HTTPError

Expand All @@ -24,7 +25,7 @@ class FWType(Enum):
class NRFCloudFOTAError(Exception):
pass

class NRFCloudFOTA():
class NRFCloud():
def __init__(self, api_key: str, url: str="https://api.nrfcloud.com/v1", timeout: int=10) -> None:
""" Initalizes the class """
self.url = url
Expand Down Expand Up @@ -64,6 +65,62 @@ def _patch(self, path: str, **kwargs):
r.raise_for_status()
return r

def get_devices(self, path: str="", params=None) -> dict:
return self._get(path=f"/devices{path}", params=params)

def get_device(self, device_id: str, params=None) -> dict:
"""
Get all information about particular device on nrfcloud.com

:param device_id: Device ID
:return: Json structure of result from nrfcloud.com
"""
return self.get_devices(path=f"/{device_id}", params=params)

def get_messages(self, device: str=None, appname: str="donald", max_records: int=50, max_age_hrs: int=24) -> list:
"""
Get messages sent from asset_tracker to nrfcloud.com

:param device_id: Limit result to messages from particular device
:param max_records: Limit number of messages to fetch
:param max_age_hrs: Limit fetching messages by timestamp
:return: List of (timestamp, message)
"""
end = datetime.now(timezone.utc).strftime(self.time_fmt)
start = (datetime.now(timezone.utc) - timedelta(
hours=max_age_hrs)).strftime(self.time_fmt)
params = {
'start': start,
'end': end,
'pageSort': 'desc',
'pageLimit': max_records
}

if device:
params['deviceId'] = device
if appname:
params['appId'] = appname

timestamp = lambda x: datetime.strptime(x['receivedAt'], self.time_fmt)
messages = self._get(path="/messages", params=params)

return [(timestamp(x), x['message'])
for x in messages['items']]

def check_message_age(self, message: dict, hours: int=0, minutes: int=0, seconds: int=0) -> bool:
"""
Check age of message, return False if message older than parameters

:param messages: Single message
:param hours: Max message age hours
:param minutes: Max message age minutes
:param seconds: Max message age seconds
:return: bool True/False
"""
diff = timedelta(hours=hours, minutes=minutes, seconds=seconds)
return datetime.now(timezone.utc) - message[0].replace(tzinfo=timezone.utc) < diff

class NRFCloudFOTA(NRFCloud):
def upload_firmware(
self, name: str, bin_file: str, version: str, description: str, fw_type: FWType, bin_file_2=None
) -> str:
Expand Down Expand Up @@ -126,7 +183,6 @@ def upload_firmware(
return m.group(3)
return m.group(2)


def upload_zephyr_zip(self, zip_path: str, version: str, name: str=""):
"""
Upload zip image built by zephyr
Expand Down Expand Up @@ -250,18 +306,6 @@ def post_fota_job(self, uuid: str, fw_id: str) -> Union[str, None]:
self.delete_fota_job(job_id)
return None

def get_devices(self, path: str="", params=None) -> dict:
return self._get(path=f"/devices{path}", params=params)

def get_device(self, device_id: str, params=None) -> dict:
"""
Get all information about particular device on nrfcloud.com

:param device_id: Device ID
:return: Json structure of result from nrfcloud.com
"""
return self.get_devices(path=f"/{device_id}", params=params)

def cancel_incomplete_jobs(self, uuid):
fota_jobs = self.list_fota_jobs(pageLimit=100)
items = fota_jobs['items']
Expand Down
Loading