Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pool): add option to bound task queue #95

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,41 @@ jobs:
name: Run tests
strategy:
matrix:
go-version: [1.23.x, 1.22.x, 1.21.x, 1.20.x]
go-version: [1.23.x, 1.22.x, 1.21.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
cache: false
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
cache: false

- name: Test
run: make test-ci
- name: Test
run: make test-ci
codecov:
name: Coverage report
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: false
- name: Install Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: false

- name: Test
run: make coverage
- name: Test
run: make coverage

- uses: codecov/codecov-action@v5
with:
files: coverage.out
fail_ci_if_error: true
verbose: true
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
- uses: codecov/codecov-action@v5
with:
files: coverage.out
fail_ci_if_error: true
verbose: true
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ Some common use cases include:
- Complete pool metrics such as number of running workers, tasks waiting in the queue [and more](#metrics--monitoring)
- Configurable parent context to stop all workers when it is cancelled
- **New features in v2**:
- Unbounded task queues
- Bounded or Unbounded task queues
- Submission of tasks that return results
- Awaitable task completion
- Type safe APIs for tasks that return errors or results
- Panics recovery (panics are captured and returned as errors)
- Subpools with a fraction of the parent pool's maximum number of workers
- Blocking and non-blocking submission of tasks when the queue is full
- [API reference](https://pkg.go.dev/github.com/alitto/pond/v2)

## Installation
Expand Down Expand Up @@ -386,6 +388,24 @@ if err != nil {
}
```

### Bounded task queues (v2)

By default, task queues are unbounded, meaning that tasks are queued indefinitely until the pool is stopped (or the process runs out of memory). You can limit the number of tasks that can be queued by setting a queue size when creating a pool (`WithQueueSize` option).

``` go
// Create a pool with a maximum of 10 tasks in the queue
pool := pond.NewPool(1, pond.WithQueueSize(10))
```

**Blocking vs non-blocking task submission**

When a pool defines a queue size (bounded), you can also specify how to handle tasks submitted when the queue is full. By default, task submission blocks until there is space in the queue (blocking mode), but you can change this behavior to non-blocking by setting the `WithNonBlocking` option to `true` when creating a pool. If the queue is full and non-blocking task submission is enabled, the task is dropped and an error is returned (`ErrQueueFull`).

``` go
// Create a pool with a maximum of 10 tasks in the queue and non-blocking task submission
pool := pond.NewPool(1, pond.WithQueueSize(10), pond.WithNonBlocking(true))
```

### Metrics & monitoring

Each worker pool instance exposes useful metrics that can be queried through the following methods:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alitto/pond/v2

go 1.20
go 1.21
16 changes: 10 additions & 6 deletions group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,23 @@ func TestTaskGroupWithContextCanceled(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()
taskStarted := make(chan struct{})

task := group.SubmitErr(func() error {
taskStarted <- struct{}{}

err := group.SubmitErr(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
return nil
}
}).Wait()
})

<-taskStarted
cancel()

err := task.Wait()

assert.Equal(t, context.Canceled, err)
}
Expand Down
4 changes: 3 additions & 1 deletion internal/linkedbuffer/linkedbuffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package linkedbuffer

import (
"math"
"sync"
"sync/atomic"
)
Expand Down Expand Up @@ -125,7 +126,8 @@
readCount := b.readCount.Load()

if writeCount < readCount {
return 0 // Make sure we don't return a negative value
// The writeCount counter wrapped around
return math.MaxUint64 - readCount + writeCount

Check warning on line 130 in internal/linkedbuffer/linkedbuffer.go

View check run for this annotation

Codecov / codecov/patch

internal/linkedbuffer/linkedbuffer.go#L129-L130

Added lines #L129 - L130 were not covered by tests
}

return writeCount - readCount
Expand Down
7 changes: 5 additions & 2 deletions internal/linkedbuffer/linkedbuffer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package linkedbuffer

import (
"math"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -88,8 +89,10 @@ func TestLinkedBufferLen(t *testing.T) {

assert.Equal(t, uint64(0), buf.Len())

buf.readCount.Add(1)
assert.Equal(t, uint64(0), buf.Len())
// Test wrap around
buf.writeCount.Add(math.MaxUint64)
buf.readCount.Add(math.MaxUint64 - 3)
assert.Equal(t, uint64(3), buf.Len())
}

func TestLinkedBufferWithReusedBuffer(t *testing.T) {
Expand Down
142 changes: 142 additions & 0 deletions internal/semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package semaphore

import (
"context"
"fmt"
"sync"
)

type Weighted struct {
ctx context.Context
cond *sync.Cond
size int
n int
waiting int
}

func NewWeighted(ctx context.Context, size int) *Weighted {
sem := &Weighted{
ctx: ctx,
cond: sync.NewCond(&sync.Mutex{}),
size: size,
n: size,
}

// Notify all waiters when the context is done
context.AfterFunc(ctx, func() {
sem.cond.Broadcast()
})

return sem
}

func (w *Weighted) Acquire(weight int) error {
if weight <= 0 {
return fmt.Errorf("semaphore: weight %d cannot be negative or zero", weight)
}
if weight > w.size {
return fmt.Errorf("semaphore: weight %d is greater than semaphore size %d", weight, w.size)
}

w.cond.L.Lock()
defer w.cond.L.Unlock()

done := w.ctx.Done()

select {
case <-done:
return w.ctx.Err()
default:
}

for weight > w.n {
// Check if the context is done
select {
case <-done:
return w.ctx.Err()
default:
}

w.waiting++
w.cond.Wait()
w.waiting--
}

w.n -= weight

return nil
}

func (w *Weighted) TryAcquire(weight int) bool {
if weight <= 0 {
return false
}
if weight > w.size {
return false
}

w.cond.L.Lock()
defer w.cond.L.Unlock()

// Check if the context is done
select {
case <-w.ctx.Done():
return false
default:
}

if weight > w.n {
// Not enough room in the semaphore
return false
}

w.n -= weight

return true
}

func (w *Weighted) Release(weight int) error {
if weight <= 0 {
return fmt.Errorf("semaphore: weight %d cannot be negative or zero", weight)
}
if weight > w.size {
return fmt.Errorf("semaphore: weight %d is greater than semaphore size %d", weight, w.size)
}

w.cond.L.Lock()
defer w.cond.L.Unlock()

if weight > w.size-w.n {
return fmt.Errorf("semaphore: trying to release more than acquired: %d > %d", weight, w.size-w.n)
}

w.n += weight
w.cond.Broadcast()

return nil
}

func (w *Weighted) Size() int {
return w.size
}

func (w *Weighted) Acquired() int {
w.cond.L.Lock()
defer w.cond.L.Unlock()

return w.size - w.n
}

func (w *Weighted) Available() int {
w.cond.L.Lock()
defer w.cond.L.Unlock()

return w.n
}

func (w *Weighted) Waiting() int {
w.cond.L.Lock()
defer w.cond.L.Unlock()

return w.waiting
}
Loading
Loading