Skip to content

Commit 38340f6

Browse files
authored
FIX deadlock in test_shutdown_with_sys_exit_at_pickle (#438)
1 parent 70b6f21 commit 38340f6

File tree

4 files changed

+24
-5
lines changed

4 files changed

+24
-5
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
### 3.5.0 - in development
22

3+
- Fix a random deadlock caused by a race condition at executor shutdown that
4+
was observed on Linux and Windows. (#438)
35

46
- Fix detection of the number of physical cores in
57
`cpu_count(only_physical_cores=True)` on some Linux systems and recent

loky/process_executor.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,11 @@ def __init__(
305305
pending_work_items=None,
306306
running_work_items=None,
307307
thread_wakeup=None,
308+
shutdown_lock=None,
308309
reducers=None,
309310
):
310311
self.thread_wakeup = thread_wakeup
312+
self.shutdown_lock = shutdown_lock
311313
self.pending_work_items = pending_work_items
312314
self.running_work_items = running_work_items
313315
super().__init__(max_size, reducers=reducers, ctx=ctx)
@@ -336,7 +338,8 @@ def _on_queue_feeder_error(self, e, obj):
336338
if work_item is not None:
337339
work_item.future.set_exception(raised_error)
338340
del work_item
339-
self.thread_wakeup.wakeup()
341+
with self.shutdown_lock:
342+
self.thread_wakeup.wakeup()
340343
else:
341344
super()._on_queue_feeder_error(e, obj)
342345

@@ -1139,6 +1142,7 @@ def _setup_queues(self, job_reducers, result_reducers, queue_size=None):
11391142
pending_work_items=self._pending_work_items,
11401143
running_work_items=self._running_work_items,
11411144
thread_wakeup=self._executor_manager_thread_wakeup,
1145+
shutdown_lock=self._shutdown_lock,
11421146
reducers=job_reducers,
11431147
ctx=self._context,
11441148
)

tests/_executor_mixin.py

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55
import threading
66
from time import sleep
7+
import random
78

89
from loky import TimeoutError, get_reusable_executor
910
from loky.backend import get_context
@@ -29,6 +30,10 @@ def initializer_event(event):
2930
global _test_event
3031
_test_event = event
3132

33+
# Inject some randomness in the initialization to reveal race conditions.
34+
if random.random() < 0.2:
35+
sleep(random.random() * 0.1) # 0-100ms
36+
3237

3338
def _direct_children_with_cmdline(p):
3439
"""Helper to fetch cmdline from children process list"""

tests/_test_process_executor.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,21 @@ def test_run_after_shutdown(self):
107107

108108
def test_shutdown_with_pickle_error(self):
109109
self.executor.shutdown()
110-
with self.executor_type(max_workers=4) as e:
111-
e.submit(id, ErrorAtPickle())
110+
# Iterate a few times to catch deadlocks/race conditions in the
111+
# executor shutdown.
112+
for _ in range(5):
113+
with self.executor_type(max_workers=4) as e:
114+
with pytest.raises(PicklingError):
115+
e.submit(id, ErrorAtPickle()).result()
112116

113117
def test_shutdown_with_sys_exit_at_pickle(self):
114118
self.executor.shutdown()
115-
with self.executor_type(max_workers=4) as e:
116-
e.submit(id, ExitAtPickle())
119+
# Iterate a few times to catch deadlocks/race conditions in the
120+
# executor shutdown.
121+
for _ in range(5):
122+
with self.executor_type(max_workers=4) as e:
123+
with pytest.raises(PicklingError):
124+
e.submit(id, ExitAtPickle()).result()
117125

118126
def test_interpreter_shutdown(self):
119127
# Free resources to avoid random timeout in CI

0 commit comments

Comments
 (0)