Skip to content

Commit

Permalink
Merge pull request #2 from txaty/generics
Browse files Browse the repository at this point in the history
Add async methods and invalid number handling
  • Loading branch information
txaty authored Sep 2, 2022
2 parents 39ad55d + cffe764 commit ca0de72
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 14 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# Gool

A generic goroutine pool just like Python ThreadPoolExecutor.

Gool provides the following methods:

- ```Submit```: Submit a task and return the result (if any).
- ```AsyncSubmit```: Submit a task and return a future of the result (if any), the future is actually the result
channel.
- ```Map```: Submit a bundle of tasks and return the results in order (if any).
- ```AsyncMap```: Submit a bundle of tasks and return the futures of the results (if any), the futures are the result
channels.
52 changes: 38 additions & 14 deletions pool.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,80 @@
package gool

import (
"runtime"
)

// Pool implements a simple goroutine pool
type Pool[A, R any] struct {
numWorkers int
jobChan chan Task[A, R]
taskChan chan Task[A, R]
}

// NewPool creates a new goroutine pool with the given number of workers and job queue capacity.
// If numWorkers is less than 1, it will be set to the number of CPUs.
// If cap (task queue capacity) is less than 1, it will be set to twice the number of workers.
func NewPool[A, R any](numWorkers, cap int) *Pool[A, R] {
if numWorkers <= 0 {
numWorkers = runtime.NumCPU()
}
if cap <= 0 {
cap = 2 * numWorkers
}
p := &Pool[A, R]{
numWorkers: numWorkers,
jobChan: make(chan Task[A, R], cap),
taskChan: make(chan Task[A, R], cap),
}
for i := 0; i < numWorkers; i++ {
newWorker(p.jobChan)
newWorker(p.taskChan)
}
return p
}

// Submit submits a task and waits for the result
func (p *Pool[A, R]) Submit(handler func(A) R, args A) R {
result := make(chan R)
p.jobChan <- Task[A, R]{
result := p.AsyncSubmit(handler, args)
return <-result
}

// AsyncSubmit submits a task and returns the channel to wait for the result
func (p *Pool[A, R]) AsyncSubmit(handler func(A) R, args A) chan R {
resChan := make(chan R)
p.taskChan <- Task[A, R]{
handler: handler,
args: args,
result: result,
result: resChan,
}
return <-result
return resChan
}

// Map submits a batch of tasks and waits for the results
func (p *Pool[A, R]) Map(handler func(A) R, args []A) []R {
resultChanList := p.AsyncMap(handler, args)
results := make([]R, len(args))
for i := 0; i < len(args); i++ {
results[i] = <-resultChanList[i]
}
return results
}

// AsyncMap submits a batch of tasks and returns the channel to wait for the results
func (p *Pool[A, R]) AsyncMap(handler func(A) R, args []A) []chan R {
resultChanList := make([]chan R, len(args))
for i := 0; i < len(args); i++ {
resultChanList[i] = make(chan R)
p.jobChan <- Task[A, R]{
p.taskChan <- Task[A, R]{
handler: handler,
args: args[i],
result: resultChanList[i],
}
}
results := make([]R, len(args))
for i := 0; i < len(args); i++ {
results[i] = <-resultChanList[i]
}
return results
return resultChanList
}

// Close closes the pool and waits for all the workers to stop
func (p *Pool[A, R]) Close() {
for i := 0; i < p.numWorkers; i++ {
p.jobChan <- Task[A, R]{
p.taskChan <- Task[A, R]{
stop: true,
}
}
Expand Down
28 changes: 28 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,34 @@ func TestPool_Submit(t *testing.T) {
}
}

func TestPool_AsyncSubmit(t *testing.T) {
type args struct {
handler func(interface{}) interface{}
args interface{}
}
tests := []struct {
name string
args args
}{
{
name: "test",
args: args{
handler: func(arg interface{}) interface{} {
for k := 0; k < 100; k++ {
_ = k
}
return nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPool[interface{}, interface{}](10, 100)
p.AsyncSubmit(tt.args.handler, tt.args.args)
})
}
}
func TestPool_Map(t *testing.T) {
type args struct {
handler func(interface{}) interface{}
Expand Down

0 comments on commit ca0de72

Please sign in to comment.