Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go1.18 feature type parameter #39

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

[![godoc for Jeffail/tunny][1]][2]
[![goreportcard for Jeffail/tunny][3]][4]
![Go v1.18][5]

> Notice: Worker handler changed from `func(any) any` to `func[T, U any](T) (U, error)`.

Tunny is a Golang library for spawning and managing a goroutine pool, allowing
you to limit work coming from any number of goroutines with a synchronous API.
Expand Down Expand Up @@ -43,12 +46,16 @@ import (
func main() {
numCPUs := runtime.NumCPU()

pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
// support generics go1.18
// pool := tunny.NewFunc(numCPUs, func(payload int) (string, error) {
// return "", nil })

pool := tunny.NewFunc(numCPUs, func(payload interface{}) (interface{}, error) {
var result []byte

// TODO: Something CPU heavy with payload

return result
return result, nil
})
defer pool.Close()

Expand All @@ -61,7 +68,10 @@ func main() {

// Funnel this work into our pool. This call is synchronous and will
// block until the job is completed.
result := pool.Process(input)
result, err := pool.Process(input)
if err != nil {
_ = err // do something to error
}

w.Write(result.([]byte))
})
Expand Down Expand Up @@ -131,4 +141,5 @@ should not be relied upon.
[2]: http://godoc.org/github.com/Jeffail/tunny
[3]: https://goreportcard.com/badge/github.com/Jeffail/tunny
[4]: https://goreportcard.com/report/Jeffail/tunny
[5]: https://img.shields.io/badge/Go-v1.18-007d9c
[tunny-worker]: https://godoc.org/github.com/Jeffail/tunny#Worker
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/Jeffail/tunny

go 1.13
go 1.18
151 changes: 91 additions & 60 deletions tunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ var (
//
// Each of these duties are implemented as a single method and can be averted
// when not needed by simply implementing an empty func.
type Worker interface {
type Worker[T, U any] interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}
Process(T) (U, error)

// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
Expand All @@ -66,49 +66,50 @@ type Worker interface {

// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{}
type closureWorker[T, U any] struct {
processor func(T) (U, error)
}

func (w *closureWorker) Process(payload interface{}) interface{} {
func (w *closureWorker[T, U]) Process(payload T) (U, error) {
return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
func (w *closureWorker[T, U]) BlockUntilReady() {}
func (w *closureWorker[T, U]) Interrupt() {}
func (w *closureWorker[T, U]) Terminate() {}

//------------------------------------------------------------------------------

// callbackWorker is a minimal Worker implementation that attempts to cast
// each job into func() and either calls it if successful or returns
// ErrJobNotFunc.
type callbackWorker struct{}
type callbackWorker[T, U any] struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
func (w *callbackWorker[T, U]) Process(payload T) (ret U, err error) {
f, ok := (interface{})(payload).(func())
if !ok {
return ErrJobNotFunc
err = ErrJobNotFunc
return
}
f()
return nil
return
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}
func (w *callbackWorker[T, U]) BlockUntilReady() {}
func (w *callbackWorker[T, U]) Interrupt() {}
func (w *callbackWorker[T, U]) Terminate() {}

//------------------------------------------------------------------------------

// Pool is a struct that manages a collection of workers, each with their own
// goroutine. The Pool can initialize, expand, compress and close the workers,
// as well as processing jobs with the workers synchronously.
type Pool struct {
type Pool[T, U any] struct {
queuedJobs int64

ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest
ctor func() Worker[T, U]
workers []*workerWrapper[T, U]
reqChan chan workRequest[T, U]

workerMut sync.Mutex
}
Expand All @@ -117,10 +118,10 @@ type Pool struct {
// provide a constructor function that creates new Worker types and when you
// change the size of the pool the constructor will be called to create each new
// Worker.
func New(n int, ctor func() Worker) *Pool {
p := &Pool{
func New[T, U any](n int, ctor func() Worker[T, U]) *Pool[T, U] {
p := &Pool[T, U]{
ctor: ctor,
reqChan: make(chan workRequest),
reqChan: make(chan workRequest[T, U]),
}
p.SetSize(n)

Expand All @@ -129,19 +130,19 @@ func New(n int, ctor func() Worker) *Pool {

// NewFunc creates a new Pool of workers where each worker will process using
// the provided func.
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
return New(n, func() Worker {
return &closureWorker{
func NewFunc[T, U any](n int, f func(T) (U, error)) *Pool[T, U] {
return New(n, func() Worker[T, U] {
return &closureWorker[T, U]{
processor: f,
}
})
}

// NewCallback creates a new Pool of workers where workers cast the job payload
// into a func() and runs it, or returns ErrNotFunc if the cast failed.
func NewCallback(n int) *Pool {
return New(n, func() Worker {
return &callbackWorker{}
func NewCallback[T, U any](n int) *Pool[T, U] {
return New(n, func() Worker[T, U] {
return &callbackWorker[T, U]{}
})
}

Expand All @@ -150,120 +151,150 @@ func NewCallback(n int) *Pool {
// Process will use the Pool to process a payload and synchronously return the
// result. Process can be called safely by any goroutines, but will panic if the
// Pool has been stopped.
func (p *Pool) Process(payload interface{}) interface{} {
func (p *Pool[T, U]) Process(payload T) (U, error) {
atomic.AddInt64(&p.queuedJobs, 1)

request, open := <-p.reqChan
if !open {
panic(ErrPoolNotRunning)
}

request.jobChan <- payload
request.jobChan <- struct {
data T
err error
}{payload, nil}

payload, open = <-request.retChan
var payload2 struct {
data U
err error
}
payload2, open = <-request.retChan
if !open {
panic(ErrWorkerClosed)
}

atomic.AddInt64(&p.queuedJobs, -1)
return payload
return payload2.data, payload2.err
}

// ProcessTimed will use the Pool to process a payload and synchronously return
// the result. If the timeout occurs before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessTimed can be
// called safely by any goroutines.
func (p *Pool) ProcessTimed(
payload interface{},
func (p *Pool[T, U]) ProcessTimed(
payload T,
timeout time.Duration,
) (interface{}, error) {
) (ret U, err error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)

tout := time.NewTimer(timeout)

var request workRequest
var request workRequest[T, U]
var open bool

select {
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
err = ErrPoolNotRunning
return
}
case <-tout.C:
return nil, ErrJobTimedOut
err = ErrJobTimedOut
return
}

select {
case request.jobChan <- payload:
case request.jobChan <- struct {
data T
err error
}{payload, nil}:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
err = ErrJobTimedOut
return
}

var payload2 struct {
data U
err error
}
select {
case payload, open = <-request.retChan:
case payload2, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
err = ErrWorkerClosed
return
}
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
err = ErrJobTimedOut
return
}

tout.Stop()
return payload, nil
return payload2.data, payload2.err
}

// ProcessCtx will use the Pool to process a payload and synchronously return
// the result. If the context cancels before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be
// called safely by any goroutines.
func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}, error) {
func (p *Pool[T, U]) ProcessCtx(ctx context.Context, payload T) (ret U, err error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)

var request workRequest
var request workRequest[T, U]
var open bool

select {
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
err = ErrPoolNotRunning
return
}
case <-ctx.Done():
return nil, ctx.Err()
err = ctx.Err()
return
}

select {
case request.jobChan <- payload:
case request.jobChan <- struct {
data T
err error
}{payload, nil}:
case <-ctx.Done():
request.interruptFunc()
return nil, ctx.Err()
err = ctx.Err()
return
}

var payload2 struct {
data U
err error
}
select {
case payload, open = <-request.retChan:
case payload2, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
err = ErrWorkerClosed
return
}
case <-ctx.Done():
request.interruptFunc()
return nil, ctx.Err()
err = ctx.Err()
return
}

return payload, nil
return payload2.data, payload2.err
}

// QueueLength returns the current count of pending queued jobs.
func (p *Pool) QueueLength() int64 {
func (p *Pool[T, U]) QueueLength() int64 {
return atomic.LoadInt64(&p.queuedJobs)
}

// SetSize changes the total number of workers in the Pool. This can be called
// by any goroutine at any time unless the Pool has been stopped, in which case
// a panic will occur.
func (p *Pool) SetSize(n int) {
func (p *Pool[T, U]) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()

Expand Down Expand Up @@ -293,15 +324,15 @@ func (p *Pool) SetSize(n int) {
}

// GetSize returns the current size of the pool.
func (p *Pool) GetSize() int {
func (p *Pool[T, U]) GetSize() int {
p.workerMut.Lock()
defer p.workerMut.Unlock()

return len(p.workers)
}

// Close will terminate all workers and close the job channel of this Pool.
func (p *Pool) Close() {
func (p *Pool[T, U]) Close() {
p.SetSize(0)
close(p.reqChan)
}
Expand Down
Loading