Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Submit() API for submitting async tasks. There are good reasons for asynchronous APIs to come back. #46

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ if err == context.DeadlineExceeded {
}
```

## Submitting Async Tasks

`pool.Process()` is a sync API, and it will not return until the task is finished. Sometimes we need to submit async task. For async task, one simple way is: (see issue #9)

```go
go func() {
foo := pool.Process(MyTask)
}()
```

However, the above method cannot control the number growth of goroutines. If there are 4 workers but 10000 tasks, the above method will create 10000 goroutines. But we wish these are only 4 worker goroutines running these tasks. In this case, `pool.Submit()` is a better choice:
``` go
pool.Submit(func() {
// put your task here
MyTask()
})
```

## Changing Pool Size

The size of a Tunny pool can be changed at any time with `SetSize(int)`:
Expand Down
26 changes: 24 additions & 2 deletions tunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,18 @@ type Worker interface {
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()

// bindPool is called when a pool create a worker. this worker will bind
// that pool
BindPool(p *Pool) Worker
}

//------------------------------------------------------------------------------

// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
pool *Pool
processor func(interface{}) interface{}
}

Expand All @@ -77,13 +82,16 @@ func (w *closureWorker) Process(payload interface{}) interface{} {
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
func (w *closureWorker) BindPool(p *Pool) Worker {w.pool = p; return w}

//------------------------------------------------------------------------------

// callbackWorker is a minimal Worker implementation that attempts to cast
// each job into func() and either calls it if successful or returns
// ErrJobNotFunc.
type callbackWorker struct{}
type callbackWorker struct{
pool *Pool
}

func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
Expand All @@ -97,6 +105,7 @@ func (w *callbackWorker) Process(payload interface{}) interface{} {
func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}
func (w *callbackWorker) BindPool(p *Pool) Worker {w.pool = p; return w}

//------------------------------------------------------------------------------

Expand Down Expand Up @@ -255,6 +264,19 @@ func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}
return payload, nil
}

//
func (p *Pool) Submit(payload interface{}) bool {
atomic.AddInt64(&p.queuedJobs, 1)

request, open := <-p.reqChan
if !open {
panic(ErrPoolNotRunning)
}

request.asyncJobChan <- payload
return true
}

// QueueLength returns the current count of pending queued jobs.
func (p *Pool) QueueLength() int64 {
return atomic.LoadInt64(&p.queuedJobs)
Expand All @@ -274,7 +296,7 @@ func (p *Pool) SetSize(n int) {

// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor().BindPool(p)))
}

// Asynchronously stop all workers > N
Expand Down
72 changes: 72 additions & 0 deletions tunny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,74 @@ func TestParallelJobs(t *testing.T) {
}
}

func TestSubmitJob(t *testing.T) {
pool := NewCallback(10)
defer pool.Close()

jobGroup := sync.WaitGroup{}
jobGroup.Add(10)

var counter int32
for i := 0; i < 10; i++ {
ok := pool.Submit(func() {
time.Sleep(time.Millisecond)
atomic.AddInt32(&counter, 1)
jobGroup.Done()
})
if ok != true {
t.Error("Failed to submit callback")
}
}
if exp, act := int32(10), counter; exp != act {
t.Logf("Haven't finish, so result is wrong: %v != %v", act, exp)
}
jobGroup.Wait()
if exp, act := int32(10), counter; exp != act {
t.Errorf("Wrong result: %v != %v", act, exp)
}
}

func TestSubmitJobBlock(t *testing.T) {
pool := NewCallback(10)
defer pool.Close()

jobGroup := sync.WaitGroup{}
jobGroup.Add(11)

var counter int32
t1 := time.Now()
for i := 0; i < 10; i++ {
ok := pool.Submit(func() {
time.Sleep(time.Millisecond)
atomic.AddInt32(&counter, 1)
jobGroup.Done()
})
if ok != true {
t.Error("Failed to submit callback")
}
}

t2 := time.Now()

ok := pool.Submit(func() {
time.Sleep(time.Millisecond)
atomic.AddInt32(&counter, 1)
jobGroup.Done()
})
if ok != true {
t.Error("Failed to submit callback")
}
t3 := time.Now()

if t3.Sub(t2).Microseconds() - t2.Sub(t1).Microseconds() < 800 {
t.Error("Job submitting didn't cause block")
}
jobGroup.Wait()
if exp, act := int32(11), counter; exp != act {
t.Errorf("Wrong result: %v != %v", act, exp)
}
}

//------------------------------------------------------------------------------

type mockWorker struct {
Expand Down Expand Up @@ -270,6 +338,10 @@ func (m *mockWorker) Terminate() {
m.terminated = true
}

func (m *mockWorker) BindPool(p *Pool) Worker {
return m
}

func TestCustomWorker(t *testing.T) {
pool := New(1, func() Worker {
return &mockWorker{
Expand Down
11 changes: 10 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package tunny

import "sync/atomic"

//------------------------------------------------------------------------------

// workRequest is a struct containing context representing a workers intention
Expand All @@ -28,6 +30,9 @@ type workRequest struct {
// jobChan is used to send the payload to this worker.
jobChan chan<- interface{}

// jobChan is used to send the payload to this worker.
asyncJobChan chan<- interface{}

// retChan is used to read the result from this worker.
retChan <-chan interface{}

Expand Down Expand Up @@ -80,7 +85,7 @@ func (w *workerWrapper) interrupt() {
}

func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
jobChan, retChan, asyncJobChan := make(chan interface{}), make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
Expand All @@ -94,6 +99,7 @@ func (w *workerWrapper) run() {
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
asyncJobChan: asyncJobChan,
interruptFunc: w.interrupt,
}:
select {
Expand All @@ -104,6 +110,9 @@ func (w *workerWrapper) run() {
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case payload := <-asyncJobChan:
_ = w.worker.Process(payload)
atomic.AddInt64(&w.worker.(*callbackWorker).pool.queuedJobs, -1)
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
Expand Down