Skip to content

Commit 345f13b

Browse files
committed
Adding 3 tests for MPITaskScheduler
* test_larger_jobs_prioritized checks to confirm the ordering of jobs in the backlog queue * test_hashable_backlog_queue tests to confirm that the PrioritizedTask dataclass avoid the priority queue failing to hash tasks with the same priority. * an extended test for new MPITaskScheduler logic
1 parent 125493f commit 345f13b

File tree

1 file changed

+88
-1
lines changed

1 file changed

+88
-1
lines changed

parsl/tests/test_mpi_apps/test_mpi_scheduler.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import logging
21
import os
32
import pickle
3+
import random
44
from unittest import mock
55

66
import pytest
@@ -161,3 +161,90 @@ def test_MPISched_contention():
161161
assert task_on_worker_side['task_id'] == 2
162162
_, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer'])
163163
assert len(resource_spec['MPI_NODELIST'].split(',')) == 8
164+
165+
166+
@pytest.mark.local
167+
def test_hashable_backlog_queue():
168+
"""Run multiple large tasks that to force entry into backlog_queue
169+
where queue.PriorityQueue expects hashability/comparability
170+
"""
171+
172+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
173+
scheduler = MPITaskScheduler(task_q, result_q)
174+
175+
assert scheduler.available_nodes
176+
assert len(scheduler.available_nodes) == 8
177+
178+
assert scheduler._free_node_counter.value == 8
179+
180+
for i in range(3):
181+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
182+
resource_specification={
183+
"num_nodes": 8,
184+
"ranks_per_node": 2
185+
})
186+
task_package = {"task_id": i, "buffer": mock_task_buffer}
187+
scheduler.put_task(task_package)
188+
assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks"
189+
190+
191+
@pytest.mark.local
192+
def test_larger_jobs_prioritized():
193+
"""Larger jobs should be scheduled first"""
194+
195+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
196+
scheduler = MPITaskScheduler(task_q, result_q)
197+
198+
max_nodes = len(scheduler.available_nodes)
199+
200+
# The first task will get scheduled with all the nodes,
201+
# and the remainder hits the backlog queue.
202+
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]
203+
204+
for task_id, num_nodes in enumerate(node_request_list):
205+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
206+
resource_specification={
207+
"num_nodes": num_nodes,
208+
"ranks_per_node": 2
209+
})
210+
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
211+
scheduler.put_task(task_package)
212+
213+
# Confirm that the tasks are sorted in decreasing order
214+
prev_priority = 0
215+
for i in range(len(node_request_list) - 1):
216+
p_task = scheduler._backlog_queue.get()
217+
assert p_task.priority < 0
218+
assert p_task.priority <= prev_priority
219+
220+
221+
@pytest.mark.local
222+
def test_tiny_large_loop():
223+
"""Run a set of tiny and large tasks in a loop"""
224+
225+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
226+
scheduler = MPITaskScheduler(task_q, result_q)
227+
228+
assert scheduler.available_nodes
229+
assert len(scheduler.available_nodes) == 8
230+
231+
assert scheduler._free_node_counter.value == 8
232+
233+
for i in range(10):
234+
num_nodes = 2 if i % 2 == 0 else 8
235+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
236+
resource_specification={
237+
"num_nodes": num_nodes,
238+
"ranks_per_node": 2
239+
})
240+
task_package = {"task_id": i, "buffer": mock_task_buffer}
241+
scheduler.put_task(task_package)
242+
243+
for i in range(10):
244+
task = task_q.get(timeout=30)
245+
result_pkl = pickle.dumps(
246+
{"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"})
247+
result_q.put(result_pkl)
248+
got_result = scheduler.get_result(True, 1)
249+
250+
assert got_result == result_pkl

0 commit comments

Comments
 (0)