Skip to content

Commit

Permalink
Merge pull request #86 from alitto/fix/AD/dispatcher-lock
Browse files Browse the repository at this point in the history
fix(dispatcher): ensure workers exit reliably when worker count is low
  • Loading branch information
alitto authored Nov 15, 2024
2 parents 9f8d16a + 1d93c12 commit d1e5a78
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
matrix:
go-version: [1.22.x, 1.21.x, 1.20.x]
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
Expand Down
21 changes: 11 additions & 10 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"github.com/alitto/pond/v2/internal/future"
)

var MAX_TASKS_CHAN_LENGTH = runtime.NumCPU() * 128
var NUM_CPU = runtime.NumCPU()

var PERSISTENT_WORKER_COUNT = int64(runtime.NumCPU())
var MAX_TASKS_CHAN_LENGTH = NUM_CPU * 128

var ErrPoolStopped = errors.New("pool stopped")

Expand Down Expand Up @@ -241,23 +241,24 @@ func (p *pool) startWorker(limit int) {
return
}
p.workerWaitGroup.Add(1)
workerNumber := p.workerCount.Add(1)
// Guarantee at least PERSISTENT_WORKER_COUNT workers are always running during dispatch to prevent deadlocks
canExitDuringDispatch := workerNumber > PERSISTENT_WORKER_COUNT
go p.worker(canExitDuringDispatch)
p.workerCount.Add(1)
go p.worker()
}

func (p *pool) workerCanExit(canExitDuringDispatch bool) bool {
if canExitDuringDispatch {
func (p *pool) workerCanExit() bool {
if int(p.workerCount.Load()) > NUM_CPU {
// If there are more workers than CPUs, then we can trust workerCount to be at least 1
p.workerCount.Add(-1)
return true
}

// Check if the dispatcher is running
if !p.dispatcherRunning.TryLock() {
// Dispatcher is running, cannot exit yet
runtime.Gosched()
return false
}

if len(p.tasks) > 0 {
// There are tasks in the queue, cannot exit yet
p.dispatcherRunning.Unlock()
Expand All @@ -269,7 +270,7 @@ func (p *pool) workerCanExit(canExitDuringDispatch bool) bool {
return true
}

func (p *pool) worker(canExitDuringDispatch bool) {
func (p *pool) worker() {
defer func() {
p.workerWaitGroup.Done()
}()
Expand Down Expand Up @@ -306,7 +307,7 @@ func (p *pool) worker(canExitDuringDispatch bool) {
// No tasks left

// Check if the worker can exit
if p.workerCanExit(canExitDuringDispatch) {
if p.workerCanExit() {
return
}
continue
Expand Down

0 comments on commit d1e5a78

Please sign in to comment.