7
7
package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"
8
8
9
9
import (
10
+ "context"
10
11
"math"
11
12
13
+ "gopkg.in/DataDog/dd-trace-go.v1/datastreams"
14
+ "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
12
15
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
13
16
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
14
17
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
@@ -76,6 +79,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
76
79
next := tracer .StartSpan (cfg .consumerSpanName , opts ... )
77
80
// reinject the span context so consumers can pick it up
78
81
tracer .Inject (next .Context (), carrier )
82
+ setConsumeCheckpoint (cfg .dataStreamsEnabled , cfg .groupID , msg )
79
83
80
84
wrapped .messages <- msg
81
85
@@ -127,8 +131,12 @@ type syncProducer struct {
127
131
// SendMessage calls sarama.SyncProducer.SendMessage and traces the request.
128
132
func (p * syncProducer ) SendMessage (msg * sarama.ProducerMessage ) (partition int32 , offset int64 , err error ) {
129
133
span := startProducerSpan (p .cfg , p .version , msg )
134
+ setProduceCheckpoint (p .cfg .dataStreamsEnabled , msg , p .version )
130
135
partition , offset , err = p .SyncProducer .SendMessage (msg )
131
136
finishProducerSpan (span , partition , offset , err )
137
+ if err == nil && p .cfg .dataStreamsEnabled {
138
+ tracer .TrackKafkaProduceOffset (msg .Topic , partition , offset )
139
+ }
132
140
return partition , offset , err
133
141
}
134
142
@@ -138,12 +146,19 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
138
146
// treated individually, so we create a span for each one
139
147
spans := make ([]ddtrace.Span , len (msgs ))
140
148
for i , msg := range msgs {
149
+ setProduceCheckpoint (p .cfg .dataStreamsEnabled , msg , p .version )
141
150
spans [i ] = startProducerSpan (p .cfg , p .version , msg )
142
151
}
143
152
err := p .SyncProducer .SendMessages (msgs )
144
153
for i , span := range spans {
145
154
finishProducerSpan (span , msgs [i ].Partition , msgs [i ].Offset , err )
146
155
}
156
+ if err == nil && p .cfg .dataStreamsEnabled {
157
+ // we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to.
158
+ for _ , msg := range msgs {
159
+ tracer .TrackKafkaProduceOffset (msg .Topic , msg .Partition , msg .Offset )
160
+ }
161
+ }
147
162
return err
148
163
}
149
164
@@ -221,6 +236,7 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
221
236
select {
222
237
case msg := <- wrapped .input :
223
238
span := startProducerSpan (cfg , saramaConfig .Version , msg )
239
+ setProduceCheckpoint (cfg .dataStreamsEnabled , msg , saramaConfig .Version )
224
240
p .Input () <- msg
225
241
if saramaConfig .Producer .Return .Successes {
226
242
spanID := span .Context ().SpanID ()
@@ -236,6 +252,10 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
236
252
// producer was closed, so exit
237
253
return
238
254
}
255
+ if cfg .dataStreamsEnabled {
256
+ // we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to.
257
+ tracer .TrackKafkaProduceOffset (msg .Topic , msg .Partition , msg .Offset )
258
+ }
239
259
if spanctx , spanFound := getSpanContext (msg ); spanFound {
240
260
spanID := spanctx .SpanID ()
241
261
if span , ok := spans [spanID ]; ok {
@@ -303,3 +323,57 @@ func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) {
303
323
304
324
return spanctx , true
305
325
}
326
+
327
+ func setProduceCheckpoint (enabled bool , msg * sarama.ProducerMessage , version sarama.KafkaVersion ) {
328
+ if ! enabled || msg == nil {
329
+ return
330
+ }
331
+ edges := []string {"direction:out" , "topic:" + msg .Topic , "type:kafka" }
332
+ carrier := NewProducerMessageCarrier (msg )
333
+ ctx , ok := tracer .SetDataStreamsCheckpointWithParams (datastreams .ExtractFromBase64Carrier (context .Background (), carrier ), options.CheckpointParams {PayloadSize : getProducerMsgSize (msg )}, edges ... )
334
+ if ! ok || ! version .IsAtLeast (sarama .V0_11_0_0 ) {
335
+ return
336
+ }
337
+ datastreams .InjectToBase64Carrier (ctx , carrier )
338
+ }
339
+
340
+ func setConsumeCheckpoint (enabled bool , groupID string , msg * sarama.ConsumerMessage ) {
341
+ if ! enabled || msg == nil {
342
+ return
343
+ }
344
+ edges := []string {"direction:in" , "topic:" + msg .Topic , "type:kafka" }
345
+ if groupID != "" {
346
+ edges = append (edges , "group:" + groupID )
347
+ }
348
+ carrier := NewConsumerMessageCarrier (msg )
349
+ ctx , ok := tracer .SetDataStreamsCheckpointWithParams (datastreams .ExtractFromBase64Carrier (context .Background (), carrier ), options.CheckpointParams {PayloadSize : getConsumerMsgSize (msg )}, edges ... )
350
+ if ! ok {
351
+ return
352
+ }
353
+ datastreams .InjectToBase64Carrier (ctx , carrier )
354
+ if groupID != "" {
355
+ // only track Kafka lag if a consumer group is set.
356
+ // since there is no ack mechanism, we consider that messages read are committed right away.
357
+ tracer .TrackKafkaCommitOffset (groupID , msg .Topic , msg .Partition , msg .Offset )
358
+ }
359
+ }
360
+
361
+ func getProducerMsgSize (msg * sarama.ProducerMessage ) (size int64 ) {
362
+ for _ , header := range msg .Headers {
363
+ size += int64 (len (header .Key ) + len (header .Value ))
364
+ }
365
+ if msg .Value != nil {
366
+ size += int64 (msg .Value .Length ())
367
+ }
368
+ if msg .Key != nil {
369
+ size += int64 (msg .Key .Length ())
370
+ }
371
+ return size
372
+ }
373
+
374
+ func getConsumerMsgSize (msg * sarama.ConsumerMessage ) (size int64 ) {
375
+ for _ , header := range msg .Headers {
376
+ size += int64 (len (header .Key ) + len (header .Value ))
377
+ }
378
+ return size + int64 (len (msg .Value )+ len (msg .Key ))
379
+ }
0 commit comments