Skip to content

Commit

Permalink
feat: manage and use vcans for usage in native_sim
Browse files Browse the repository at this point in the history
This change adds capabilities to search and use virtual CAN devices in
different native_sim test executions. Hereby, each execution gets its
own virtual can device so no interference happens. If no vcan is found,
it uses the one from device tree, but prints a warning before.

This is not the cleanest solution, but works in our tests and fixes
scaling problem for protocol tests without hardware

Signed-off-by: Stefan Kraus <stefan.kraus@ul.com>
  • Loading branch information
MP-StefanKraus committed Aug 6, 2024
1 parent 3a53673 commit bc90a7d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,22 @@ class NativeSimulatorAdapter(BinaryAdapterBase):

def generate_command(self) -> None:
"""Set command to run."""
self.command = [str(self.device_config.build_dir / 'zephyr' / 'zephyr.exe')]
base_command = [str(self.device_config.build_dir / 'zephyr' / 'zephyr.exe')]
cli_flags = self._get_execution_cli_flags()
self.command = base_command + cli_flags

def _get_execution_cli_flags(self) -> list[str]:
"""Add additional flags for execution"""
return [f'--can-if={self._extract_can()}']

def _extract_can(self) -> str:
device_properties = self.device_config.properties
for property in device_properties:
key, value = property.split(":")
if key == 'host_can':
return value

return 'vcan0'


class UnitSimulatorAdapter(BinaryAdapterBase):
Expand Down
27 changes: 27 additions & 0 deletions scripts/pylib/twister/twisterlib/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import threading
import time
import shutil
import multiprocessing as mp
import itertools

from twisterlib.error import ConfigurationError
from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED
Expand All @@ -27,6 +29,21 @@

_WINDOWS = platform.system() == 'Windows'

# this is done globally, so each process later has access to the
# same instance. This can and should be refactored for 'upstream' (if wanted)
vcan_queue = mp.Queue()
for i in itertools.count():
if not os.path.exists(f"/sys/class/net/vcan{i}"):
break
vcan_queue.put(f"vcan{i}")

initial_number_of_vcans = vcan_queue.qsize()
if initial_number_of_vcans == 0:
logger.warning("No virtual CAN in form `vcan$i` found! "
"If you execute tests with 'CAN' on native_sim, "
"this may lead to interferring test executions!"

Check warning on line 44 in scripts/pylib/twister/twisterlib/harness.py

View workflow job for this annotation

GitHub Actions / Run compliance checks on patch series (PR)

TYPO_SPELLING

scripts/pylib/twister/twisterlib/harness.py:44 'interferring' may be misspelled - perhaps 'interfering'?
)


result_re = re.compile(r".*(PASS|FAIL|SKIP) - (test_)?(\S*) in (\d*[.,]?\d*) seconds")
class Harness:
Expand Down Expand Up @@ -297,6 +314,7 @@ def _configure(self, instance: TestInstance, tool_name: str, report_filename: st
self.running_dir = instance.build_dir
self.source_dir = instance.testsuite.source_dir
self.reserved_serial = None
self.reserved_vcan = None

self.tool_name = tool_name

Expand All @@ -323,6 +341,13 @@ def generate_command(self) -> list[str]:
)
elif handler.type_str in SUPPORTED_SIMS_IN_PYTEST:
command.append(f'--device-type={handler.type_str}')
# if no vcans are created, do not pass anything and use defaults
# better way would be to filter based on test tags, but this information
# is not existing at this point anymore
if initial_number_of_vcans:
self.reserved_vcan = vcan_queue.get()
command.extend(['--device-properties', f'host_can:{self.reserved_vcan}'])

elif handler.type_str == 'build':
command.append('--device-type=custom')
else:
Expand All @@ -341,6 +366,8 @@ def _run(self, timeout):
finally:
if self.reserved_serial:
self.instance.handler.make_device_available(self.reserved_serial)
if self.reserved_vcan:
vcan_queue.put(self.reserved_vcan)
self._update_test_status()

def _generate_parameters_for_hardware(self, handler: Handler):
Expand Down

0 comments on commit bc90a7d

Please sign in to comment.