-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfanout.go
42 lines (38 loc) · 1.43 KB
/
fanout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package pipeline
import (
"context"
"sync"
)
/*
NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines.
The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
The step waits until all pipelines are finished.
If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances.
Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler.
However, the error returned from ParallelResultHandler is wrapped in context.Canceled.
*/
func NewFanOutStep[T context.Context](name string, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T] {
step := Step[T]{Name: name}
step.Action = func(ctx T) error {
pipelineChan := make(chan *Pipeline[T])
m := sync.Map{}
var wg sync.WaitGroup
i := uint64(0)
go pipelineSupplier(ctx, pipelineChan)
for pipe := range pipelineChan {
p := pipe
wg.Add(1)
n := i
i++
go func() {
defer wg.Done()
m.Store(n, p.RunWithContext(ctx))
}()
}
wg.Wait()
res := collectResults(ctx, handler, &m)
return setResultErrorFromContext(ctx, name, res)
}
return step
}