Skip to content

Commit cf7bac7

Browse files
chore(checkoutservice): add producer interceptor for tracing (#1400)
* chore(checkoutservice): add producer interceptor for tracing * chore(checkoutservice): update changelog --------- Co-authored-by: Pierre Tessier <pierre@pierretessier.com>
1 parent ef31bfd commit cf7bac7

File tree

5 files changed

+66
-33
lines changed

5 files changed

+66
-33
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ the release.
77

88
## Unreleased
99

10+
* [checkoutservice] add producer interceptor for tracing
11+
([#1400](https://github.com/open-telemetry/opentelemetry-demo/pull/1400))
1012
* [chore] increase memory for Collector and Jaeger
1113
([#1396](https://github.com/open-telemetry/opentelemetry-demo/pull/1396))
1214
* [chore] fix Make targets for restart and redeploy

src/accountingservice/kafka/trace_interceptor.go

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func NewOTelInterceptor(groupID string) *OTelInterceptor {
2828

2929
oi.fixedAttrs = []attribute.KeyValue{
3030
semconv.MessagingSystemKafka,
31+
semconv.MessagingOperationReceive,
3132
semconv.MessagingKafkaConsumerGroup(groupID),
3233
semconv.NetworkTransportTCP,
3334
}

src/checkoutservice/kafka/producer.go

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProd
1717
saramaConfig.Version = ProtocolVersion
1818
// So we can know the partition and offset of messages.
1919
saramaConfig.Producer.Return.Successes = true
20+
saramaConfig.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor()}
2021

2122
producer, err := sarama.NewAsyncProducer(brokers, saramaConfig)
2223
if err != nil {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
package kafka
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/propagation"
12+
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
13+
"go.opentelemetry.io/otel/trace"
14+
15+
"github.com/IBM/sarama"
16+
)
17+
18+
type OTelInterceptor struct {
19+
tracer trace.Tracer
20+
fixedAttrs []attribute.KeyValue
21+
}
22+
23+
// NewOTelInterceptor processes span for intercepted messages and add some
24+
// headers with the span data.
25+
func NewOTelInterceptor() *OTelInterceptor {
26+
oi := OTelInterceptor{}
27+
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/checkoutservice/sarama")
28+
29+
oi.fixedAttrs = []attribute.KeyValue{
30+
semconv.MessagingSystemKafka,
31+
semconv.MessagingOperationPublish,
32+
semconv.NetworkTransportTCP,
33+
}
34+
return &oi
35+
}
36+
37+
func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) {
38+
spanContext, span := oi.tracer.Start(
39+
context.Background(),
40+
fmt.Sprintf("%s publish", msg.Topic),
41+
trace.WithSpanKind(trace.SpanKindProducer),
42+
trace.WithAttributes(
43+
semconv.PeerService("kafka"),
44+
semconv.NetworkTransportTCP,
45+
semconv.MessagingSystemKafka,
46+
semconv.MessagingDestinationName(msg.Topic),
47+
semconv.MessagingOperationPublish,
48+
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
49+
),
50+
)
51+
defer span.End()
52+
53+
carrier := propagation.MapCarrier{}
54+
propagator := otel.GetTextMapPropagator()
55+
propagator.Inject(spanContext, carrier)
56+
57+
for key, value := range carrier {
58+
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)})
59+
}
60+
}

src/checkoutservice/main.go

+2-33
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"encoding/json"
99
"fmt"
10-
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
1110
"net"
1211
"net/http"
1312
"os"
@@ -311,7 +310,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq
311310

312311
// send to kafka only if kafka broker address is set
313312
if cs.kafkaBrokerSvcAddr != "" {
314-
cs.sendToPostProcessor(ctx, orderResult)
313+
cs.sendToPostProcessor(orderResult)
315314
}
316315

317316
resp := &pb.PlaceOrderResponse{Order: orderResult}
@@ -474,7 +473,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i
474473
return resp.GetTrackingId(), nil
475474
}
476475

477-
func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) {
476+
func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) {
478477
message, err := proto.Marshal(result)
479478
if err != nil {
480479
log.Errorf("Failed to marshal message to protobuf: %+v", err)
@@ -486,37 +485,7 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
486485
Value: sarama.ByteEncoder(message),
487486
}
488487

489-
// Inject tracing info into message
490-
span := createProducerSpan(ctx, &msg)
491-
defer span.End()
492-
493488
cs.KafkaProducerClient.Input() <- &msg
494489
successMsg := <-cs.KafkaProducerClient.Successes()
495490
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
496491
}
497-
498-
func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
499-
spanContext, span := tracer.Start(
500-
ctx,
501-
fmt.Sprintf("%s publish", msg.Topic),
502-
trace.WithSpanKind(trace.SpanKindProducer),
503-
trace.WithAttributes(
504-
semconv.PeerService("kafka"),
505-
semconv.NetworkTransportTCP,
506-
semconv.MessagingSystemKafka,
507-
semconv.MessagingDestinationName(msg.Topic),
508-
semconv.MessagingOperationPublish,
509-
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
510-
),
511-
)
512-
513-
carrier := propagation.MapCarrier{}
514-
propagator := otel.GetTextMapPropagator()
515-
propagator.Inject(spanContext, carrier)
516-
517-
for key, value := range carrier {
518-
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)})
519-
}
520-
521-
return span
522-
}

0 commit comments

Comments
 (0)