Skip to content

Commit

Permalink
feat(pool): add option to bound task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
alitto committed Dec 24, 2024
1 parent ac7c6a3 commit 43ce73c
Show file tree
Hide file tree
Showing 12 changed files with 687 additions and 58 deletions.
4 changes: 3 additions & 1 deletion internal/linkedbuffer/linkedbuffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package linkedbuffer

import (
"math"
"sync"
"sync/atomic"
)
Expand Down Expand Up @@ -125,7 +126,8 @@ func (b *LinkedBuffer[T]) Len() uint64 {
readCount := b.readCount.Load()

if writeCount < readCount {
return 0 // Make sure we don't return a negative value
// The writeCount counter wrapped around
return math.MaxUint64 - readCount + writeCount
}

return writeCount - readCount
Expand Down
7 changes: 5 additions & 2 deletions internal/linkedbuffer/linkedbuffer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package linkedbuffer

import (
"math"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -88,8 +89,10 @@ func TestLinkedBufferLen(t *testing.T) {

assert.Equal(t, uint64(0), buf.Len())

buf.readCount.Add(1)
assert.Equal(t, uint64(0), buf.Len())
// Test wrap around
buf.writeCount.Add(math.MaxUint64)
buf.readCount.Add(math.MaxUint64 - 3)
assert.Equal(t, uint64(3), buf.Len())
}

func TestLinkedBufferWithReusedBuffer(t *testing.T) {
Expand Down
142 changes: 142 additions & 0 deletions internal/semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package semaphore

import (
"context"
"fmt"
"sync"
)

type Weighted struct {
ctx context.Context
cond *sync.Cond
size int
n int
waiting int
}

func NewWeighted(ctx context.Context, size int) *Weighted {
sem := &Weighted{
ctx: ctx,
cond: sync.NewCond(&sync.Mutex{}),
size: size,
n: size,
}

// Notify all waiters when the context is done
context.AfterFunc(ctx, func() {

Check failure on line 26 in internal/semaphore/semaphore.go

View workflow job for this annotation

GitHub Actions / Coverage report

undefined: context.AfterFunc

Check failure on line 26 in internal/semaphore/semaphore.go

View workflow job for this annotation

GitHub Actions / Run tests (1.20.x, ubuntu-latest)

undefined: context.AfterFunc
sem.cond.Broadcast()
})

return sem
}

func (w *Weighted) Acquire(weight int) error {
if weight <= 0 {
return fmt.Errorf("semaphore: weight %d cannot be negative or zero", weight)
}
if weight > w.size {
return fmt.Errorf("semaphore: weight %d is greater than semaphore size %d", weight, w.size)
}

w.cond.L.Lock()
defer w.cond.L.Unlock()

done := w.ctx.Done()

select {
case <-done:
return w.ctx.Err()
default:
}

for weight > w.n {
// Check if the context is done
select {
case <-done:
return w.ctx.Err()
default:
}

w.waiting++
w.cond.Wait()
w.waiting--
}

w.n -= weight

return nil
}

func (w *Weighted) TryAcquire(weight int) bool {
if weight <= 0 {
return false
}
if weight > w.size {
return false
}

w.cond.L.Lock()
defer w.cond.L.Unlock()

// Check if the context is done
select {
case <-w.ctx.Done():
return false
default:
}

if weight > w.n {
// Not enough room in the semaphore
return false
}

w.n -= weight

return true
}

func (w *Weighted) Release(weight int) error {
if weight <= 0 {
return fmt.Errorf("semaphore: weight %d cannot be negative or zero", weight)
}
if weight > w.size {
return fmt.Errorf("semaphore: weight %d is greater than semaphore size %d", weight, w.size)
}

w.cond.L.Lock()
defer w.cond.L.Unlock()

if weight > w.size-w.n {
return fmt.Errorf("semaphore: trying to release more than acquired: %d > %d", weight, w.size-w.n)
}

w.n += weight
w.cond.Broadcast()

return nil
}

func (w *Weighted) Size() int {
return w.size
}

func (w *Weighted) Acquired() int {
w.cond.L.Lock()
defer w.cond.L.Unlock()

return w.size - w.n
}

func (w *Weighted) Available() int {
w.cond.L.Lock()
defer w.cond.L.Unlock()

return w.n
}

func (w *Weighted) Waiting() int {
w.cond.L.Lock()
defer w.cond.L.Unlock()

return w.waiting
}
192 changes: 192 additions & 0 deletions internal/semaphore/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package semaphore

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/alitto/pond/v2/internal/assert"
)

func TestWeighted(t *testing.T) {
sem := NewWeighted(context.Background(), 10)

// Acquire 5
err := sem.Acquire(5)
assert.Equal(t, nil, err)

// Acquire 4
err = sem.Acquire(4)
assert.Equal(t, nil, err)

// Try to acquire 2
assert.Equal(t, false, sem.TryAcquire(2))

// Try to acquire 1
assert.Equal(t, true, sem.TryAcquire(1))

// Release 7
sem.Release(7)

// Try to acquire 7
assert.Equal(t, true, sem.TryAcquire(7))
}

func TestWeightedWithMoreAcquirersThanReleasers(t *testing.T) {
sem := NewWeighted(context.Background(), 6)

goroutines := 12
acquire := 2
release := 5
wg := sync.WaitGroup{}
acquireSuccessCount := atomic.Uint64{}
acquireFailCount := atomic.Uint64{}

wg.Add(goroutines)

// Launch goroutines that try to acquire the semaphore
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()

if err := sem.Acquire(acquire); err != nil {
acquireFailCount.Add(1)
} else {
acquireSuccessCount.Add(1)
}

if sem.Acquired() >= release {
sem.Release(release)
}
}()
}

// Wait for goroutines to finish
wg.Wait()

assert.Equal(t, uint64(12), acquireSuccessCount.Load())
assert.Equal(t, uint64(0), acquireFailCount.Load())
assert.Equal(t, 4, sem.Acquired())
}

func TestWeightedAcquireWithInvalidWeights(t *testing.T) {
sem := NewWeighted(context.Background(), 10)

// Acquire 0
err := sem.Acquire(0)
assert.Equal(t, "semaphore: weight 0 cannot be negative or zero", err.Error())

// Try to acquire 0
res := sem.TryAcquire(0)
assert.Equal(t, false, res)

// Acquire -1
err = sem.Acquire(-1)
assert.Equal(t, "semaphore: weight -1 cannot be negative or zero", err.Error())

// Try to acquire -1
res = sem.TryAcquire(-1)
assert.Equal(t, false, res)

// Acquire 11
err = sem.Acquire(11)
assert.Equal(t, "semaphore: weight 11 is greater than semaphore size 10", err.Error())

// Try to acquire 11
res = sem.TryAcquire(11)
assert.Equal(t, false, res)
}

func TestWeightedReleaseWithInvalidWeights(t *testing.T) {
sem := NewWeighted(context.Background(), 10)

// Release 0
err := sem.Release(0)
assert.Equal(t, "semaphore: weight 0 cannot be negative or zero", err.Error())

// Release -1
err = sem.Release(-1)
assert.Equal(t, "semaphore: weight -1 cannot be negative or zero", err.Error())

// Release 11
err = sem.Release(11)
assert.Equal(t, "semaphore: weight 11 is greater than semaphore size 10", err.Error())

// Release 1
err = sem.Release(1)
assert.Equal(t, "semaphore: trying to release more than acquired: 1 > 0", err.Error())
}

func TestWeightedWithContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

sem := NewWeighted(ctx, 10)

// Acquire the semaphore
err := sem.Acquire(5)
assert.Equal(t, nil, err)

// Cancel the context
cancel()

// Attempt to acquire the semaphore
err = sem.Acquire(5)
assert.Equal(t, context.Canceled, err)

// Try to acquire the semaphore
assert.Equal(t, false, sem.TryAcquire(5))
}

func TestWeightedWithContextCanceledWhileWaiting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

sem := NewWeighted(ctx, 10)

writers := 30
wg := sync.WaitGroup{}
wg.Add(writers)

assert.Equal(t, 10, sem.Size())
assert.Equal(t, 0, sem.Acquired())
assert.Equal(t, 10, sem.Available())
assert.Equal(t, 0, sem.Waiting())

// Acquire the semaphore more than the semaphore size
for i := 0; i < writers; i++ {
go func() {
defer wg.Done()
sem.Acquire(1)
}()
}

// Wait until 10 goroutines are blocked
for sem.Acquired() < 10 {
time.Sleep(1 * time.Millisecond)
}

assert.Equal(t, 10, sem.Acquired())
assert.Equal(t, 0, sem.Available())

// Release 10 goroutines
err := sem.Release(10)
assert.Equal(t, nil, err)

// Wait until 10 goroutines are blocked
for sem.Acquired() < 10 {
time.Sleep(1 * time.Millisecond)
}

// Cancel the context
cancel()

// Wait for goroutines to finish
wg.Wait()

assert.Equal(t, 10, sem.Acquired())
assert.Equal(t, 0, sem.Available())
assert.Equal(t, 0, sem.Waiting())
assert.Equal(t, context.Canceled, sem.Acquire(1))
assert.Equal(t, false, sem.TryAcquire(1))
}
Loading

0 comments on commit 43ce73c

Please sign in to comment.