Skip to content

Commit 98fe5af

Browse files
tests: on_target: add shell tests
Button and cloud shell tests are added. Renamed test_uart_output.py to test_shell.py. Refactored nrfcloud utils. Signed-off-by: Giacomo Dematteis <giacomo.dematteis@nordicsemi.no>
1 parent 04e3058 commit 98fe5af

File tree

6 files changed

+164
-67
lines changed

6 files changed

+164
-67
lines changed

tests/on_target/tests/conftest.py

+20-4
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@
1212
import sys
1313
sys.path.append(os.getcwd())
1414
from utils.logger import get_logger
15-
from utils.nrfcloud_fota import NRFCloudFOTA
15+
from utils.nrfcloud import NRFCloud, NRFCloudFOTA
1616

1717
logger = get_logger()
1818

1919
UART_TIMEOUT = 60 * 30
2020

2121
SEGGER = os.getenv('SEGGER')
2222
UART_ID = os.getenv('UART_ID', SEGGER)
23-
FOTADEVICE_UUID = os.getenv('UUID')
23+
DEVICE_UUID = os.getenv('UUID')
2424
NRFCLOUD_API_KEY = os.getenv('NRFCLOUD_API_KEY')
2525
DUT_DEVICE_TYPE = os.getenv('DUT_DEVICE_TYPE')
2626

@@ -73,15 +73,31 @@ def dut_board():
7373

7474
scan_log_for_assertions(uart_log)
7575

76+
@pytest.fixture(scope="function")
77+
def dut_cloud(dut_board):
78+
if not NRFCLOUD_API_KEY:
79+
pytest.skip("NRFCLOUD_API_KEY environment variable not set")
80+
if not DEVICE_UUID:
81+
pytest.skip("UUID environment variable not set")
82+
83+
cloud = NRFCloud(api_key=NRFCLOUD_API_KEY)
84+
device_id = DEVICE_UUID
85+
86+
yield types.SimpleNamespace(
87+
**dut_board.__dict__,
88+
cloud=cloud,
89+
device_id=device_id,
90+
)
91+
7692
@pytest.fixture(scope="function")
7793
def dut_fota(dut_board):
7894
if not NRFCLOUD_API_KEY:
7995
pytest.skip("NRFCLOUD_API_KEY environment variable not set")
80-
if not FOTADEVICE_UUID:
96+
if not DEVICE_UUID:
8197
pytest.skip("UUID environment variable not set")
8298

8399
fota = NRFCloudFOTA(api_key=NRFCLOUD_API_KEY)
84-
device_id = FOTADEVICE_UUID
100+
device_id = DEVICE_UUID
85101
data = {
86102
'job_id': '',
87103
}

tests/on_target/tests/test_functional/test_fota.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import os
99
import functools
1010
from utils.flash_tools import flash_device, reset_device
11-
from utils.nrfcloud_fota import NRFCloudFOTAError
11+
from utils.nrfcloud import NRFCloudFOTAError
1212
import sys
1313
sys.path.append(os.getcwd())
1414
from utils.logger import get_logger

tests/on_target/tests/test_functional/test_sampling.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
def test_sampling(dut_board, hex_file):
1818
flash_device(os.path.abspath(hex_file))
1919
dut_board.uart.xfactoryreset()
20-
patterns_cloud_connection = [
21-
"Network connectivity established",
22-
"Connected to Cloud"
23-
]
2420

2521
# Log patterns
2622
pattern_location = "location_event_handler: Got location: lat:"
@@ -42,6 +38,7 @@ def test_sampling(dut_board, hex_file):
4238
# Cloud connection
4339
dut_board.uart.flush()
4440
reset_device()
41+
dut_board.uart.wait_for_str("Connected to Cloud", timeout=120)
4542

4643
# Sampling
4744
dut_board.uart.wait_for_str(pattern_list, timeout=120)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
##########################################################################################
2+
# Copyright (c) 2025 Nordic Semiconductor
3+
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
4+
##########################################################################################
5+
6+
import os
7+
import time
8+
from utils.flash_tools import flash_device, reset_device
9+
import sys
10+
sys.path.append(os.getcwd())
11+
from utils.logger import get_logger
12+
13+
logger = get_logger()
14+
15+
CLOUD_TIMEOUT = 60
16+
17+
def test_shell(dut_cloud, hex_file):
18+
'''
19+
Test that the device is operating normally by checking UART output
20+
'''
21+
flash_device(os.path.abspath(hex_file))
22+
dut_cloud.uart.xfactoryreset()
23+
24+
patterns_boot = [
25+
"Connected to Cloud",
26+
"main: requesting_sensors_and_polling_entry: Next trigger in"
27+
]
28+
patterns_button_press = [
29+
"main: requesting_location_entry: requesting_location_entry",
30+
]
31+
patterns_cloud_publish = [
32+
'Sending on payload channel: {"messageType":"DATA","appId":"donald","data":"duck"',
33+
]
34+
patterns_network_disconnected = [
35+
"network: Network connectivity lost",
36+
]
37+
patterns_network_connected = [
38+
"network: Network connectivity established",
39+
]
40+
41+
# Boot
42+
dut_cloud.uart.flush()
43+
reset_device()
44+
dut_cloud.uart.wait_for_str(patterns_boot, timeout=120)
45+
46+
# Button press
47+
dut_cloud.uart.flush()
48+
dut_cloud.uart.write("att_button_press 1\r\n")
49+
dut_cloud.uart.wait_for_str(patterns_button_press, timeout=20)
50+
51+
# Cloud publish
52+
dut_cloud.uart.flush()
53+
dut_cloud.uart.write("att_cloud_publish donald duck\r\n")
54+
dut_cloud.uart.wait_for_str(patterns_cloud_publish, timeout=20)
55+
56+
messages = dut_cloud.cloud.get_messages(dut_cloud.device_id, appname="donald", max_records=20, max_age_hrs=0.25)
57+
58+
# Wait for message to be reported to cloud
59+
start = time.time()
60+
while time.time() - start < CLOUD_TIMEOUT:
61+
time.sleep(5)
62+
messages = dut_cloud.cloud.get_messages(dut_cloud.device_id, appname="donald", max_records=20, max_age_hrs=0.25)
63+
logger.debug(f"Found messages: {messages}")
64+
65+
latest_message = messages[0] if messages else None
66+
if latest_message:
67+
check_message_age = dut_cloud.cloud.check_message_age(message=latest_message, seconds=30)
68+
if check_message_age:
69+
break
70+
else:
71+
logger.debug("No message with recent timestamp, retrying...")
72+
continue
73+
else:
74+
raise RuntimeError("No new message to cloud observed")
75+
76+
# LTE disconnect
77+
dut_cloud.uart.flush()
78+
dut_cloud.uart.write("att_network disconnect\r\n")
79+
dut_cloud.uart.wait_for_str(patterns_network_disconnected, timeout=20)
80+
81+
# LTE reconnect
82+
dut_cloud.uart.flush()
83+
dut_cloud.uart.write("att_network connect\r\n")
84+
dut_cloud.uart.wait_for_str(patterns_network_connected, timeout=120)

tests/on_target/tests/test_functional/test_uart_output.py

-44
This file was deleted.

tests/on_target/utils/nrfcloud_fota.py tests/on_target/utils/nrfcloud.py

+58-14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import requests
1212
from enum import Enum
1313
from typing import Union
14+
from datetime import datetime, timedelta, timezone
1415
from utils.logger import get_logger
1516
from requests.exceptions import HTTPError
1617

@@ -24,7 +25,7 @@ class FWType(Enum):
2425
class NRFCloudFOTAError(Exception):
2526
pass
2627

27-
class NRFCloudFOTA():
28+
class NRFCloud():
2829
def __init__(self, api_key: str, url: str="https://api.nrfcloud.com/v1", timeout: int=10) -> None:
2930
""" Initalizes the class """
3031
self.url = url
@@ -64,6 +65,62 @@ def _patch(self, path: str, **kwargs):
6465
r.raise_for_status()
6566
return r
6667

68+
def get_devices(self, path: str="", params=None) -> dict:
69+
return self._get(path=f"/devices{path}", params=params)
70+
71+
def get_device(self, device_id: str, params=None) -> dict:
72+
"""
73+
Get all information about particular device on nrfcloud.com
74+
75+
:param device_id: Device ID
76+
:return: Json structure of result from nrfcloud.com
77+
"""
78+
return self.get_devices(path=f"/{device_id}", params=params)
79+
80+
def get_messages(self, device: str=None, appname: str="donald", max_records: int=50, max_age_hrs: int=24) -> list:
81+
"""
82+
Get messages sent from asset_tracker to nrfcloud.com
83+
84+
:param device_id: Limit result to messages from particular device
85+
:param max_records: Limit number of messages to fetch
86+
:param max_age_hrs: Limit fetching messages by timestamp
87+
:return: List of (timestamp, message)
88+
"""
89+
end = datetime.now(timezone.utc).strftime(self.time_fmt)
90+
start = (datetime.now(timezone.utc) - timedelta(
91+
hours=max_age_hrs)).strftime(self.time_fmt)
92+
params = {
93+
'start': start,
94+
'end': end,
95+
'pageSort': 'desc',
96+
'pageLimit': max_records
97+
}
98+
99+
if device:
100+
params['deviceId'] = device
101+
if appname:
102+
params['appId'] = appname
103+
104+
timestamp = lambda x: datetime.strptime(x['receivedAt'], self.time_fmt)
105+
messages = self._get(path="/messages", params=params)
106+
107+
return [(timestamp(x), x['message'])
108+
for x in messages['items']]
109+
110+
def check_message_age(self, message: dict, hours: int=0, minutes: int=0, seconds: int=0) -> bool:
111+
"""
112+
Check age of message, return False if message older than parameters
113+
114+
:param messages: Single message
115+
:param hours: Max message age hours
116+
:param minutes: Max message age minutes
117+
:param seconds: Max message age seconds
118+
:return: bool True/False
119+
"""
120+
diff = timedelta(hours=hours, minutes=minutes, seconds=seconds)
121+
return datetime.now(timezone.utc) - message[0].replace(tzinfo=timezone.utc) < diff
122+
123+
class NRFCloudFOTA(NRFCloud):
67124
def upload_firmware(
68125
self, name: str, bin_file: str, version: str, description: str, fw_type: FWType, bin_file_2=None
69126
) -> str:
@@ -126,7 +183,6 @@ def upload_firmware(
126183
return m.group(3)
127184
return m.group(2)
128185

129-
130186
def upload_zephyr_zip(self, zip_path: str, version: str, name: str=""):
131187
"""
132188
Upload zip image built by zephyr
@@ -250,18 +306,6 @@ def post_fota_job(self, uuid: str, fw_id: str) -> Union[str, None]:
250306
self.delete_fota_job(job_id)
251307
return None
252308

253-
def get_devices(self, path: str="", params=None) -> dict:
254-
return self._get(path=f"/devices{path}", params=params)
255-
256-
def get_device(self, device_id: str, params=None) -> dict:
257-
"""
258-
Get all information about particular device on nrfcloud.com
259-
260-
:param device_id: Device ID
261-
:return: Json structure of result from nrfcloud.com
262-
"""
263-
return self.get_devices(path=f"/{device_id}", params=params)
264-
265309
def cancel_incomplete_jobs(self, uuid):
266310
fota_jobs = self.list_fota_jobs(pageLimit=100)
267311
items = fota_jobs['items']

0 commit comments

Comments
 (0)