-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathengine.go
47 lines (36 loc) · 1.08 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package subee
import (
"context"
"github.com/pkg/errors"
)
// Engine is the framework instance.
type Engine struct {
*Config
subscriber Subscriber
}
// New creates a Engine intstance with Consumer.
func New(subscriber Subscriber, consumer Consumer, opts ...Option) *Engine {
return newEngine(subscriber, nil, consumer, opts...)
}
// NewBatch creates a Engine intstance with BatchConsumer.
func NewBatch(subscriber Subscriber, consumer BatchConsumer, opts ...Option) *Engine {
return newEngine(subscriber, consumer, nil, opts...)
}
func newEngine(subscriber Subscriber, bConsumer BatchConsumer, consumer Consumer, opts ...Option) *Engine {
cfg := newDefaultConfig()
cfg.BatchConsumer = bConsumer
cfg.Consumer = consumer
cfg.apply(opts)
e := &Engine{
Config: cfg,
subscriber: subscriber,
}
return e
}
// Start starts Subscriber and Consumer process.
func (e *Engine) Start(ctx context.Context) error {
e.Logger.Print("Start Pub/Sub worker")
defer e.Logger.Print("Finish Pub/Sub worker")
ctx = setLogger(ctx, e.Logger)
return errors.WithStack(newProcess(e).Start(ctx))
}