-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathconsumer_with_idle_trigger.go
129 lines (117 loc) · 3.52 KB
/
consumer_with_idle_trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package sqsclient
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"go.uber.org/zap"
)
type ConsumerWithIdleTrigger struct {
sqs *sqs.Client
handler HandlerWithIdleTrigger
wg *sync.WaitGroup
cfg Config
idleDurationTimeout time.Duration
sqsReceiveWaitTimeSeconds int32
}
func NewConsumerWithIdleTrigger(awsCfg aws.Config, cfg Config, handler HandlerWithIdleTrigger, idleDurationTimeout time.Duration, sqsReceiveWaitTimeSeconds int32) (*ConsumerWithIdleTrigger, error) {
if cfg.VisibilityTimeoutSeconds < 30 {
return nil, errors.New("VisibilityTimeoutSeconds must be greater or equal to 30")
}
return &ConsumerWithIdleTrigger{
sqs: sqs.NewFromConfig(awsCfg),
handler: handler,
wg: &sync.WaitGroup{},
cfg: cfg,
idleDurationTimeout: idleDurationTimeout,
sqsReceiveWaitTimeSeconds: sqsReceiveWaitTimeSeconds,
}, nil
}
func (c *ConsumerWithIdleTrigger) Consume(ctx context.Context) {
jobs := make(chan *Message)
for w := 1; w <= c.cfg.WorkersNum; w++ {
go c.worker(ctx, jobs)
c.wg.Add(1)
}
timeout := time.NewTimer(c.idleDurationTimeout)
defer timeout.Stop()
loop:
for {
select {
case <-ctx.Done():
zap.S().Info("closing jobs channel")
c.handler.Shutdown()
close(jobs)
break loop
case <-timeout.C:
zap.S().Info("No new messages for timeout duration, triggering timeout logic")
// a nil message should trigger a timeout
jobs <- newMessage(nil)
// Reset the timeout
timeout.Reset(c.idleDurationTimeout)
default:
output, err := c.sqs.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: &c.cfg.QueueURL,
MaxNumberOfMessages: c.cfg.BatchSize,
WaitTimeSeconds: c.sqsReceiveWaitTimeSeconds,
MessageAttributeNames: []string{"TraceID", "SpanID"},
VisibilityTimeout: c.cfg.VisibilityTimeoutSeconds,
})
if err != nil {
zap.S().With(zap.Error(err)).Error("could not receive messages from SQS")
continue
}
if len(output.Messages) > 0 {
// Reset the timeout since we received messages
if !timeout.Stop() {
select {
case <-timeout.C:
default:
}
}
timeout.Reset(c.idleDurationTimeout)
}
for _, m := range output.Messages {
jobs <- newMessage(&m)
}
}
}
c.wg.Wait()
}
func (c *ConsumerWithIdleTrigger) worker(ctx context.Context, messages <-chan *Message) {
for m := range messages {
if err := c.handleMsg(ctx, m); err != nil {
zap.S().With(zap.Error(err)).Error("error running handlers")
}
}
zap.S().Info("worker exiting")
c.wg.Done()
}
func (c *ConsumerWithIdleTrigger) handleMsg(ctx context.Context, m *Message) error {
if c.handler != nil {
if m.Message == nil {
if err := c.handler.IdleTimeout(ctx); err != nil {
return m.ErrorResponse(err)
}
} else {
if err := c.handler.Run(ctx, m); err != nil {
return m.ErrorResponse(err)
}
return c.delete(ctx, m) // Message consumed
}
}
m.Success()
return nil
}
func (c *ConsumerWithIdleTrigger) delete(ctx context.Context, m *Message) error {
_, err := c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{QueueUrl: &c.cfg.QueueURL, ReceiptHandle: m.ReceiptHandle})
if err != nil {
zap.S().With(zap.Error(err)).Error("error removing message")
return fmt.Errorf("unable to delete message from the queue: %w", err)
}
zap.S().Debug("message deleted")
return nil
}