Skip to content

Commit e0500b2

Browse files
[kafka][checkoutservice][frauddetectionservice] add kafkaQueueProblems featureflag (#1528)
* Add kafkaQueueProblems featureflag Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike The result of that featureflag can be observed with numerous metrics in grafana (e.g. kafka_consumer_lag_avg) * changed feature flag to int value for more configurability also adjusted the resource limit for the frauddetection service since it kept dying * addressed PR comments * addressed PR comment --------- Co-authored-by: Austin Parker <austin@ap2.io>
1 parent 05982b2 commit e0500b2

File tree

6 files changed

+98
-9
lines changed

6 files changed

+98
-9
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ the release.
2121
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
2222
* [frontend] Pass down image optimization requests to imageprovider
2323
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
24+
* [kafka] add kafkaQueueProblems feature flag
25+
([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528))
2426
* [otelcollector] Add `redisreceiver`
2527
([#1537](https://github.com/open-telemetry/opentelemetry-demo/pull/1537))
2628

docker-compose.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,11 @@ services:
247247
deploy:
248248
resources:
249249
limits:
250-
memory: 200M
250+
memory: 300M
251251
restart: unless-stopped
252252
environment:
253+
- FLAGD_HOST
254+
- FLAGD_PORT
253255
- KAFKA_SERVICE_ADDR
254256
- OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP}
255257
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE

src/checkoutservice/main.go

+41-8
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func main() {
160160
}
161161

162162
openfeature.SetProvider(flagd.NewProvider())
163+
openfeature.AddHooks(otelhooks.NewTracesHook())
163164

164165
tracer = tp.Tracer("checkoutservice")
165166

@@ -316,6 +317,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq
316317

317318
// send to kafka only if kafka broker address is set
318319
if cs.kafkaBrokerSvcAddr != "" {
320+
log.Infof("sending to postProcessor")
319321
cs.sendToPostProcessor(ctx, orderResult)
320322
}
321323

@@ -439,7 +441,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,
439441

440442
func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
441443
paymentService := cs.paymentSvcClient
442-
if cs.checkPaymentFailure(ctx) {
444+
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
443445
badAddress := "badAddress:50051"
444446
c := mustCreateClient(context.Background(), badAddress)
445447
paymentService = pb.NewPaymentServiceClient(c)
@@ -505,6 +507,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
505507
cs.KafkaProducerClient.Input() <- &msg
506508
successMsg := <-cs.KafkaProducerClient.Successes()
507509
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
510+
511+
ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
512+
if ffValue > 0 {
513+
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
514+
for i := 0; i < ffValue; i++ {
515+
go func(i int) {
516+
cs.KafkaProducerClient.Input() <- &msg
517+
_ = <-cs.KafkaProducerClient.Successes()
518+
}(i)
519+
}
520+
log.Infof("Done with #%d messages for overload simulation.", ffValue)
521+
}
508522
}
509523

510524
func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
@@ -533,11 +547,30 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
533547
return span
534548
}
535549

536-
func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool {
537-
openfeature.AddHooks(otelhooks.NewTracesHook())
538-
client := openfeature.NewClient("checkout")
539-
failureEnabled, _ := client.BooleanValue(
540-
ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{},
541-
)
542-
return failureEnabled
550+
func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
551+
client := openfeature.NewClient("checkout")
552+
553+
// Default value is set to false, but you could also make this a parameter.
554+
featureEnabled, _ := client.BooleanValue(
555+
ctx,
556+
featureFlagName,
557+
false,
558+
openfeature.EvaluationContext{},
559+
)
560+
561+
return featureEnabled
562+
}
563+
564+
func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
565+
client := openfeature.NewClient("checkout")
566+
567+
// Default value is set to 0, but you could also make this a parameter.
568+
featureFlagValue, _ := client.IntValue(
569+
ctx,
570+
featureFlagName,
571+
0,
572+
openfeature.EvaluationContext{},
573+
)
574+
575+
return int(featureFlagValue)
543576
}

src/flagd/demo.flagd.json

+9
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@
6161
]
6262
}
6363
},
64+
"kafkaQueueProblems": {
65+
"description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike",
66+
"state": "ENABLED",
67+
"variants": {
68+
"on": 100,
69+
"off": 0
70+
},
71+
"defaultVariant": "off"
72+
},
6473
"cartServiceFailure": {
6574
"description": "Fail cart service",
6675
"state": "ENABLED",

src/frauddetectionservice/build.gradle.kts

+8
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ dependencies {
4242
implementation("org.apache.logging.log4j:log4j-core:2.21.1")
4343
implementation("org.slf4j:slf4j-api:2.0.9")
4444
implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}")
45+
implementation("dev.openfeature:sdk:1.7.4")
46+
implementation("dev.openfeature.contrib.providers:flagd:0.7.0")
4547

4648
if (JavaVersion.current().isJava9Compatible) {
4749
// Workaround for @javax.annotation.Generated
@@ -50,6 +52,12 @@ dependencies {
5052
}
5153
}
5254

55+
tasks {
56+
shadowJar {
57+
mergeServiceFiles()
58+
}
59+
}
60+
5361
tasks.test {
5462
useJUnitPlatform()
5563
}

src/frauddetectionservice/src/main/kotlin/frauddetectionservice/main.kt

+35
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,26 @@ import oteldemo.Demo.*
1515
import java.time.Duration.ofMillis
1616
import java.util.*
1717
import kotlin.system.exitProcess
18+
import dev.openfeature.contrib.providers.flagd.FlagdOptions
19+
import dev.openfeature.contrib.providers.flagd.FlagdProvider
20+
import dev.openfeature.sdk.Client
21+
import dev.openfeature.sdk.EvaluationContext
22+
import dev.openfeature.sdk.ImmutableContext
23+
import dev.openfeature.sdk.Value
24+
import dev.openfeature.sdk.OpenFeatureAPI
1825

1926
const val topic = "orders"
2027
const val groupID = "frauddetectionservice"
2128

2229
private val logger: Logger = LogManager.getLogger(groupID)
2330

2431
fun main() {
32+
val options = FlagdOptions.builder()
33+
.withGlobalTelemetry(true)
34+
.build()
35+
val flagdProvider = FlagdProvider(options)
36+
OpenFeatureAPI.getInstance().setProvider(flagdProvider)
37+
2538
val props = Properties()
2639
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
2740
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
@@ -44,10 +57,32 @@ fun main() {
4457
.poll(ofMillis(100))
4558
.fold(totalCount) { accumulator, record ->
4659
val newCount = accumulator + 1
60+
if (getFeatureFlagValue("kafkaQueueProblems") > 0) {
61+
logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second")
62+
Thread.sleep(1000)
63+
}
4764
val orders = OrderResult.parseFrom(record.value())
4865
logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount")
4966
newCount
5067
}
5168
}
5269
}
5370
}
71+
72+
/**
73+
* Retrieves the status of a feature flag from the Feature Flag service.
74+
*
75+
* @param ff The name of the feature flag to retrieve.
76+
* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors.
77+
*/
78+
fun getFeatureFlagValue(ff: String): Int {
79+
val client = OpenFeatureAPI.getInstance().client
80+
// TODO: Plumb the actual session ID from the frontend via baggage?
81+
val uuid = UUID.randomUUID()
82+
83+
val clientAttrs = mutableMapOf<String, Value>()
84+
clientAttrs["session"] = Value(uuid.toString())
85+
client.evaluationContext = ImmutableContext(clientAttrs)
86+
val intValue = client.getIntegerValue(ff, 0)
87+
return intValue
88+
}

0 commit comments

Comments
 (0)