Skip to content

Fixes TypeError and infinite looping in MPITaskScheduler #3783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pickle
import queue
import subprocess
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional

Expand Down Expand Up @@ -69,6 +70,14 @@ def __str__(self):
return f"MPINodesUnavailable(requested={self.requested} available={self.available})"


@dataclass(order=True)
class PrioritizedTask:
# Comparing dict will fail since they are unhashable
# This dataclass limits comparison to the priority field
priority: int
task: Dict = field(compare=False)


class TaskScheduler:
"""Default TaskScheduler that does no taskscheduling

Expand Down Expand Up @@ -111,7 +120,7 @@ def __init__(
super().__init__(pending_task_q, pending_result_q)
self.scheduler = identify_scheduler()
# PriorityQueue is threadsafe
self._backlog_queue: queue.PriorityQueue = queue.PriorityQueue()
self._backlog_queue: queue.PriorityQueue[PrioritizedTask] = queue.PriorityQueue()
self._map_tasks_to_nodes: Dict[str, List[str]] = {}
self.available_nodes = get_nodes_in_batchjob(self.scheduler)
self._free_node_counter = SpawnContext.Value("i", len(self.available_nodes))
Expand Down Expand Up @@ -169,7 +178,8 @@ def put_task(self, task_package: dict):
allocated_nodes = self._get_nodes(nodes_needed)
except MPINodesUnavailable:
logger.info(f"Not enough resources, placing task {tid} into backlog")
self._backlog_queue.put((nodes_needed, task_package))
# Negate the priority element so that larger tasks are prioritized
self._backlog_queue.put(PrioritizedTask(-1 * nodes_needed, task_package))
return
else:
resource_spec["MPI_NODELIST"] = ",".join(allocated_nodes)
Expand All @@ -182,14 +192,16 @@ def put_task(self, task_package: dict):

def _schedule_backlog_tasks(self):
"""Attempt to schedule backlogged tasks"""
try:
_nodes_requested, task_package = self._backlog_queue.get(block=False)
self.put_task(task_package)
except queue.Empty:
return
else:
# Keep attempting to schedule tasks till we are out of resources
self._schedule_backlog_tasks()

# Separate fetching tasks from the _backlog_queue and scheduling them
# since tasks that failed to schedule will be pushed to the _backlog_queue
backlogged_tasks = []
while not self._backlog_queue.empty():
prioritized_task = self._backlog_queue.get(block=False)
backlogged_tasks.append(prioritized_task.task)

for backlogged_task in backlogged_tasks:
self.put_task(backlogged_task)

Comment on lines +196 to 205
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a static analysis, this looks better to me. No more infinite loop potential, but I do observe that this could mean a lot of unpacking and then repacking. "It works," so I'm not going to fuss about it, but a different data structure might help with that.

More actionably, however, this looks like it would lose tasks? What happens when .get(block=False) raises queue.Empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, Kevin! These few lines do make me worry about race conditions.

Additionally, will the very aggressive scheduling here (always attempt to schedule everything) will still result in large tasks being continually delayed? If there are small tasks, they'll get scheduled before the big one still.

That might be ok with some users, but what about a simple "run in the order of execution" strategy as our baseline?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WardLT it's issue #3323 but one layer deeper into the dispatch logic!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(we shouldn't overtrivialise this or assume there's a universal solution or try to make a comprehensive set of here are the options that will satisfy everyone)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khk-globus Thanks for the review, this is a good catch!

  • Unpacking-repacking: Yep, we shouldn't have to do this if we store the resource_spec
  • queue.Empty: I was working with the idea that since only this function can pop an item from the queue, checking for empty() is sufficient to guarantee that get will not raise a queue.Empty. I can rework this to avoid this.

@WardLT I share your concern. There's no notion of fairness here, and as @benclifford pointed out coming up with scheduling logic that'll work for everyone is hard. Right now, I expect larger tasks to end up getting delayed. Like @benclifford mentioned we could move this logic to the interchange (#3323), but we still need to implement these alternative scheduling algorithms but I'm hesitant to do so without user feedback.

def get_result(self, block: bool = True, timeout: Optional[float] = None):
"""Return result and relinquish provisioned nodes"""
Expand Down
89 changes: 88 additions & 1 deletion parsl/tests/test_mpi_apps/test_mpi_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import os
import pickle
import random
from unittest import mock

import pytest
Expand Down Expand Up @@ -161,3 +161,90 @@ def test_MPISched_contention():
assert task_on_worker_side['task_id'] == 2
_, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer'])
assert len(resource_spec['MPI_NODELIST'].split(',')) == 8


@pytest.mark.local
def test_hashable_backlog_queue():
"""Run multiple large tasks that to force entry into backlog_queue
where queue.PriorityQueue expects hashability/comparability
"""

task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)

assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8

assert scheduler._free_node_counter.value == 8

for i in range(3):
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
resource_specification={
"num_nodes": 8,
"ranks_per_node": 2
})
task_package = {"task_id": i, "buffer": mock_task_buffer}
scheduler.put_task(task_package)
assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks"


@pytest.mark.local
def test_larger_jobs_prioritized():
"""Larger jobs should be scheduled first"""

task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)

max_nodes = len(scheduler.available_nodes)

# The first task will get scheduled with all the nodes,
# and the remainder hits the backlog queue.
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]

for task_id, num_nodes in enumerate(node_request_list):
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
resource_specification={
"num_nodes": num_nodes,
"ranks_per_node": 2
})
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
scheduler.put_task(task_package)

# Confirm that the tasks are sorted in decreasing order
prev_priority = 0
for i in range(len(node_request_list) - 1):
p_task = scheduler._backlog_queue.get()
assert p_task.priority < 0
assert p_task.priority <= prev_priority


@pytest.mark.local
def test_tiny_large_loop():
"""Run a set of tiny and large tasks in a loop"""

task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)

assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8

assert scheduler._free_node_counter.value == 8

for i in range(10):
num_nodes = 2 if i % 2 == 0 else 8
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
resource_specification={
"num_nodes": num_nodes,
"ranks_per_node": 2
})
task_package = {"task_id": i, "buffer": mock_task_buffer}
scheduler.put_task(task_package)

for i in range(10):
task = task_q.get(timeout=30)
result_pkl = pickle.dumps(
{"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"})
result_q.put(result_pkl)
got_result = scheduler.get_result(True, 1)

assert got_result == result_pkl
Loading