Skip to content

Commit cc29172

Browse files
[checkout] fix kafka restart (#1590)
* fix kafka acks * Update src/checkoutservice/main.go Co-authored-by: Juliano Costa <julianocosta89@outlook.com> * align attribute names with semantic conventions * checkout: recover from kafka restart --------- Co-authored-by: Juliano Costa <julianocosta89@outlook.com>
1 parent d75c9d8 commit cc29172

File tree

3 files changed

+83
-36
lines changed

3 files changed

+83
-36
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ the release.
4747
([#1592](https://github.com/open-telemetry/opentelemetry-demo/pull/1592))
4848
* chore: Add service version to OTEL_RESOURCE_ATTRIBUTES
4949
([#1594](https://github.com/open-telemetry/opentelemetry-demo/pull/1594))
50+
* [checkout] increase Kafka resiliency and observability
51+
([#1590](https://github.com/open-telemetry/opentelemetry-demo/pull/1590))
5052

5153
## 1.9.0
5254

src/checkoutservice/kafka/producer.go

+10
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,18 @@ var (
1313
)
1414

1515
func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProducer, error) {
16+
sarama.Logger = log
17+
1618
saramaConfig := sarama.NewConfig()
19+
saramaConfig.Producer.Return.Successes = true
20+
saramaConfig.Producer.Return.Errors = true
21+
22+
// Sarama has an issue in a single broker kafka if the kafka broker is restarted.
23+
// This setting is to prevent that issue from manifesting itself, but may swallow failed messages.
24+
saramaConfig.Producer.RequiredAcks = sarama.NoResponse
25+
1726
saramaConfig.Version = ProtocolVersion
27+
1828
// So we can know the partition and offset of messages.
1929
saramaConfig.Producer.Return.Successes = true
2030

src/checkoutservice/main.go

+71-36
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ import (
2020

2121
"github.com/IBM/sarama"
2222
"github.com/google/uuid"
23+
otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
24+
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
25+
"github.com/open-feature/go-sdk/openfeature"
2326
"github.com/sirupsen/logrus"
2427
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
2528
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
2629
"go.opentelemetry.io/contrib/instrumentation/runtime"
2730
"go.opentelemetry.io/otel"
31+
otelcodes "go.opentelemetry.io/otel/codes"
2832
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
2933
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
3034
"go.opentelemetry.io/otel/propagation"
@@ -34,9 +38,6 @@ import (
3438
"google.golang.org/grpc"
3539
"google.golang.org/grpc/codes"
3640
"google.golang.org/grpc/credentials/insecure"
37-
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
38-
"github.com/open-feature/go-sdk/openfeature"
39-
otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
4041
healthpb "google.golang.org/grpc/health/grpc_health_v1"
4142
"google.golang.org/grpc/status"
4243
"google.golang.org/protobuf/proto"
@@ -440,13 +441,13 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,
440441
}
441442

442443
func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
443-
paymentService := cs.paymentSvcClient
444+
paymentService := cs.paymentSvcClient
444445
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
445-
badAddress := "badAddress:50051"
446-
c := mustCreateClient(context.Background(), badAddress)
446+
badAddress := "badAddress:50051"
447+
c := mustCreateClient(context.Background(), badAddress)
447448
paymentService = pb.NewPaymentServiceClient(c)
448-
}
449-
449+
}
450+
450451
paymentResp, err := paymentService.Charge(ctx, &pb.ChargeRequest{
451452
Amount: amount,
452453
CreditCard: paymentInfo})
@@ -504,18 +505,52 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
504505
span := createProducerSpan(ctx, &msg)
505506
defer span.End()
506507

507-
cs.KafkaProducerClient.Input() <- &msg
508-
successMsg := <-cs.KafkaProducerClient.Successes()
509-
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
508+
// Send message and handle response
509+
startTime := time.Now()
510+
select {
511+
case cs.KafkaProducerClient.Input() <- &msg:
512+
log.Infof("Message sent to Kafka: %v", msg)
513+
select {
514+
case successMsg := <-cs.KafkaProducerClient.Successes():
515+
span.SetAttributes(
516+
attribute.Bool("messaging.kafka.producer.success", true),
517+
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
518+
attribute.KeyValue(semconv.MessagingKafkaMessageOffset(int(successMsg.Offset))),
519+
)
520+
log.Infof("Successful to write message. offset: %v, duration: %v", successMsg.Offset, time.Since(startTime))
521+
case errMsg := <-cs.KafkaProducerClient.Errors():
522+
span.SetAttributes(
523+
attribute.Bool("messaging.kafka.producer.success", false),
524+
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
525+
)
526+
span.SetStatus(otelcodes.Error, errMsg.Err.Error())
527+
log.Errorf("Failed to write message: %v", errMsg.Err)
528+
case <-ctx.Done():
529+
span.SetAttributes(
530+
attribute.Bool("messaging.kafka.producer.success", false),
531+
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
532+
)
533+
span.SetStatus(otelcodes.Error, "Context cancelled: "+ctx.Err().Error())
534+
log.Warnf("Context canceled before success message received: %v", ctx.Err())
535+
}
536+
case <-ctx.Done():
537+
span.SetAttributes(
538+
attribute.Bool("messaging.kafka.producer.success", false),
539+
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
540+
)
541+
span.SetStatus(otelcodes.Error, "Failed to send: "+ctx.Err().Error())
542+
log.Errorf("Failed to send message to Kafka within context deadline: %v", ctx.Err())
543+
return
544+
}
510545

511546
ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
512547
if ffValue > 0 {
513548
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
514549
for i := 0; i < ffValue; i++ {
515-
go func(i int) {
516-
cs.KafkaProducerClient.Input() <- &msg
550+
go func(i int) {
551+
cs.KafkaProducerClient.Input() <- &msg
517552
_ = <-cs.KafkaProducerClient.Successes()
518-
}(i)
553+
}(i)
519554
}
520555
log.Infof("Done with #%d messages for overload simulation.", ffValue)
521556
}
@@ -548,29 +583,29 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
548583
}
549584

550585
func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
551-
client := openfeature.NewClient("checkout")
552-
553-
// Default value is set to false, but you could also make this a parameter.
554-
featureEnabled, _ := client.BooleanValue(
555-
ctx,
556-
featureFlagName,
557-
false,
558-
openfeature.EvaluationContext{},
559-
)
560-
561-
return featureEnabled
586+
client := openfeature.NewClient("checkout")
587+
588+
// Default value is set to false, but you could also make this a parameter.
589+
featureEnabled, _ := client.BooleanValue(
590+
ctx,
591+
featureFlagName,
592+
false,
593+
openfeature.EvaluationContext{},
594+
)
595+
596+
return featureEnabled
562597
}
563598

564599
func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
565-
client := openfeature.NewClient("checkout")
566-
567-
// Default value is set to 0, but you could also make this a parameter.
568-
featureFlagValue, _ := client.IntValue(
569-
ctx,
570-
featureFlagName,
571-
0,
572-
openfeature.EvaluationContext{},
573-
)
574-
575-
return int(featureFlagValue)
600+
client := openfeature.NewClient("checkout")
601+
602+
// Default value is set to 0, but you could also make this a parameter.
603+
featureFlagValue, _ := client.IntValue(
604+
ctx,
605+
featureFlagName,
606+
0,
607+
openfeature.EvaluationContext{},
608+
)
609+
610+
return int(featureFlagValue)
576611
}

0 commit comments

Comments
 (0)