From 345f13b0b8f426524ddf91b3ba0e723c5a88b0d9 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Wed, 19 Feb 2025 18:51:56 -0600 Subject: [PATCH 1/2] Adding 3 tests for MPITaskScheduler * test_larger_jobs_prioritized checks to confirm the ordering of jobs in the backlog queue * test_hashable_backlog_queue tests to confirm that the PrioritizedTask dataclass avoid the priority queue failing to hash tasks with the same priority. * an extended test for new MPITaskScheduler logic --- .../tests/test_mpi_apps/test_mpi_scheduler.py | 89 ++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_mpi_apps/test_mpi_scheduler.py b/parsl/tests/test_mpi_apps/test_mpi_scheduler.py index 92248cafe9..48df93af2b 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_scheduler.py +++ b/parsl/tests/test_mpi_apps/test_mpi_scheduler.py @@ -1,6 +1,6 @@ -import logging import os import pickle +import random from unittest import mock import pytest @@ -161,3 +161,90 @@ def test_MPISched_contention(): assert task_on_worker_side['task_id'] == 2 _, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer']) assert len(resource_spec['MPI_NODELIST'].split(',')) == 8 + + +@pytest.mark.local +def test_hashable_backlog_queue(): + """Run multiple large tasks that to force entry into backlog_queue + where queue.PriorityQueue expects hashability/comparability + """ + + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + + assert scheduler._free_node_counter.value == 8 + + for i in range(3): + mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs", + resource_specification={ + "num_nodes": 8, + "ranks_per_node": 2 + }) + task_package = {"task_id": i, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks" + + +@pytest.mark.local +def test_larger_jobs_prioritized(): + """Larger jobs should be scheduled first""" + + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + max_nodes = len(scheduler.available_nodes) + + # The first task will get scheduled with all the nodes, + # and the remainder hits the backlog queue. + node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)] + + for task_id, num_nodes in enumerate(node_request_list): + mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs", + resource_specification={ + "num_nodes": num_nodes, + "ranks_per_node": 2 + }) + task_package = {"task_id": task_id, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + # Confirm that the tasks are sorted in decreasing order + prev_priority = 0 + for i in range(len(node_request_list) - 1): + p_task = scheduler._backlog_queue.get() + assert p_task.priority < 0 + assert p_task.priority <= prev_priority + + +@pytest.mark.local +def test_tiny_large_loop(): + """Run a set of tiny and large tasks in a loop""" + + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + + assert scheduler._free_node_counter.value == 8 + + for i in range(10): + num_nodes = 2 if i % 2 == 0 else 8 + mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs", + resource_specification={ + "num_nodes": num_nodes, + "ranks_per_node": 2 + }) + task_package = {"task_id": i, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + for i in range(10): + task = task_q.get(timeout=30) + result_pkl = pickle.dumps( + {"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"}) + result_q.put(result_pkl) + got_result = scheduler.get_result(True, 1) + + assert got_result == result_pkl From 22a4ce0f442659d7637d425a31540e72597adde6 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 24 Feb 2025 14:49:06 -0600 Subject: [PATCH 2/2] New dataclass for MPITaskScheduler and fixes to the backlog scheduling logic * `schedule_backlog_tasks` is now updated to fetch all tasks in the backlog_queue and then attempt to schedule them avoiding the infinite loop. * A new `PrioritizedTask` dataclass is added that disable comparison on the task: dict element. * The priority is set num_nodes * -1 to ensure that larger jobs get prioritized. --- .../mpi_resource_management.py | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py index ccbf52410b..7951da617e 100644 --- a/parsl/executors/high_throughput/mpi_resource_management.py +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -4,6 +4,7 @@ import pickle import queue import subprocess +from dataclasses import dataclass, field from enum import Enum from typing import Dict, List, Optional @@ -69,6 +70,14 @@ def __str__(self): return f"MPINodesUnavailable(requested={self.requested} available={self.available})" +@dataclass(order=True) +class PrioritizedTask: + # Comparing dict will fail since they are unhashable + # This dataclass limits comparison to the priority field + priority: int + task: Dict = field(compare=False) + + class TaskScheduler: """Default TaskScheduler that does no taskscheduling @@ -111,7 +120,7 @@ def __init__( super().__init__(pending_task_q, pending_result_q) self.scheduler = identify_scheduler() # PriorityQueue is threadsafe - self._backlog_queue: queue.PriorityQueue = queue.PriorityQueue() + self._backlog_queue: queue.PriorityQueue[PrioritizedTask] = queue.PriorityQueue() self._map_tasks_to_nodes: Dict[str, List[str]] = {} self.available_nodes = get_nodes_in_batchjob(self.scheduler) self._free_node_counter = SpawnContext.Value("i", len(self.available_nodes)) @@ -169,7 +178,8 @@ def put_task(self, task_package: dict): allocated_nodes = self._get_nodes(nodes_needed) except MPINodesUnavailable: logger.info(f"Not enough resources, placing task {tid} into backlog") - self._backlog_queue.put((nodes_needed, task_package)) + # Negate the priority element so that larger tasks are prioritized + self._backlog_queue.put(PrioritizedTask(-1 * nodes_needed, task_package)) return else: resource_spec["MPI_NODELIST"] = ",".join(allocated_nodes) @@ -182,14 +192,16 @@ def put_task(self, task_package: dict): def _schedule_backlog_tasks(self): """Attempt to schedule backlogged tasks""" - try: - _nodes_requested, task_package = self._backlog_queue.get(block=False) - self.put_task(task_package) - except queue.Empty: - return - else: - # Keep attempting to schedule tasks till we are out of resources - self._schedule_backlog_tasks() + + # Separate fetching tasks from the _backlog_queue and scheduling them + # since tasks that failed to schedule will be pushed to the _backlog_queue + backlogged_tasks = [] + while not self._backlog_queue.empty(): + prioritized_task = self._backlog_queue.get(block=False) + backlogged_tasks.append(prioritized_task.task) + + for backlogged_task in backlogged_tasks: + self.put_task(backlogged_task) def get_result(self, block: bool = True, timeout: Optional[float] = None): """Return result and relinquish provisioned nodes"""