Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doesn't work with example code when using NewPoolWithResults #3

Open
ethanquix opened this issue Aug 13, 2023 · 2 comments
Open

doesn't work with example code when using NewPoolWithResults #3

ethanquix opened this issue Aug 13, 2023 · 2 comments

Comments

@ethanquix
Copy link

If I take the example from the documentation but I add WithResult:

func main() {
	p, _ := workerpool.NewPoolWithResults(4, func(job workerpool.Job[float64], workerID int) (int, error) {
		result := math.Sqrt(job.Payload)
		fmt.Println("result:", result)
		return 0, nil
	})
	for i := 0; i < 100; i++ {
		p.Submit(float64(i))
	}
	p.StopAndWait()

	for result := range p.Results {
		fmt.Println(result)
	}
}

the code never stops and is stuck at iteration ~3

@cmitsakis
Copy link
Owner

Maybe I should improve the documentation to explain more how to use this library correctly.

Job submission and reading of results have to run concurrently so they have to run on different goroutines.

In your code you don't read from the p.Results channel (you think you do but the program never reaches the second loop), so after a couple of job submissions the p.Results channel is full and the workers of the pool cannot write their result to the p.Results channel so they all block (deadlock).

I fixed your code by putting the submission loop in a goroutine. And I also changed the handler function to return result, nil and used (float64, nil) as a return type.

Here is the fixed code:

package main

import (
	"fmt"
	"math"

	"go.mitsakis.org/workerpool"
)

func main() {
	p, _ := workerpool.NewPoolWithResults(4, func(job workerpool.Job[float64], workerID int) (float64, error) {
		result := math.Sqrt(job.Payload)
		fmt.Println("result:", result)
		return result, nil
	})
	go func() {
		for i := 0; i < 100; i++ {
			p.Submit(float64(i))
		}
		p.StopAndWait()
	}()

	for result := range p.Results {
		fmt.Println(result.Value)
	}
}

@ben-tbotlabs
Copy link

I ended up adding a helper for workerpool that made running over the inputs and collecting the outputs to be processed as a whole easier

package pool

import (
	"fmt"

	"go.mitsakis.org/workerpool"
)

type Result[O any] struct {
	Value O
	Error error
}

type IndexedInput[V any] struct {
	Index int
	Value V
}

func RunAndCollectResults[I, O any](numOfWorkers int, inputs []I, handler func(index int, input I) (O, error)) ([]Result[O], error) {
	outputs := make([]Result[O], len(inputs))

	p, err := workerpool.NewPoolWithResults(numOfWorkers, func(job workerpool.Job[IndexedInput[I]], workerID int) (O, error) {
		return handler(job.Payload.Index, job.Payload.Value)
	})
	if err != nil {
		return nil, err
	}

	go func() {
		for i, input := range inputs {
			indexed := IndexedInput[I]{Index: i, Value: input}
			p.Submit(indexed)
		}
		p.StopAndWait()
	}()

	// Apply the results to the outputs based on the index
	for r := range p.Results {
		outputs[r.Job.Payload.Index].Value = r.Value
		outputs[r.Job.Payload.Index].Error = r.Error
	}

	// If there is an error then return either the singlar error or something that indicates multiple errors
	count := 0
	for _, output := range outputs {
		if output.Error != nil {
			err = output.Error
			count++
		}
	}

	if count > 1 {
		err = fmt.Errorf("multiple errors: %d errors returned by handler", count)
	}

	return outputs, err
}

with an example in the test as

package pool_test

import (
	"errors"
	"math"
	"testing"

	"github.com/stretchr/testify/suite"

	"openmetagame.dev/pkg/pool"
)

type poolTestSuite struct {
	suite.Suite
}

func (s *poolTestSuite) TestStandard() {
	inputs := []float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}
	outputs, err := pool.RunAndCollectResults(4, inputs, func(index int, input float64) (float64, error) {
		return math.Sqrt(input), nil
	})

	expected := make([]pool.Result[float64], 0, len(inputs))
	for _, input := range inputs {
		expected = append(expected, pool.Result[float64]{Value: math.Sqrt(input), Error: nil})
	}

	s.Assertions.NoError(err)
	s.Assertions.ElementsMatch(expected, outputs)
}

func (s *poolTestSuite) TestHandlerErrors() {
	inputs := []float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}
	expectedErr := errors.New("foo")

	_, err := pool.RunAndCollectResults(4, inputs, func(index int, input float64) (float64, error) {
		return math.Sqrt(input), expectedErr
	})
	s.Assertions.Error(err)
	s.Assertions.Equal("multiple errors: 20 errors returned by handler", err.Error())
}

func TestSuite(t *testing.T) {
	suite.Run(t, new(poolTestSuite))
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants