Skip to content

Commit 61d6a5f

Browse files
improve baggage propagation (#1545)
Signed-off-by: Michael Beemer <beeme1mr@users.noreply.github.com> Co-authored-by: Austin Parker <austin@ap2.io>
1 parent faa5104 commit 61d6a5f

File tree

5 files changed

+60
-60
lines changed

5 files changed

+60
-60
lines changed

src/adservice/src/main/java/oteldemo/AdService.java

+9-17
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private static class AdServiceImpl extends oteldemo.AdServiceGrpc.AdServiceImplB
135135
private static final String ADSERVICE_FAILURE = "adServiceFailure";
136136
private static final String ADSERVICE_MANUAL_GC_FEATURE_FLAG = "adServiceManualGc";
137137
private static final String ADSERVICE_HIGH_CPU_FEATURE_FLAG = "adServiceHighCpu";
138-
Client ffClient = OpenFeatureAPI.getInstance().getClient();
138+
private static final Client ffClient = OpenFeatureAPI.getInstance().getClient();
139139

140140
private AdServiceImpl() {}
141141

@@ -149,8 +149,6 @@ private AdServiceImpl() {}
149149
@Override
150150
public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
151151
AdService service = AdService.getInstance();
152-
CPULoad cpuload = CPULoad.getInstance();
153-
cpuload.execute(getFeatureFlagEnabled(ADSERVICE_HIGH_CPU_FEATURE_FLAG));
154152

155153
// get the current span in context
156154
Span span = Span.current();
@@ -160,14 +158,19 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
160158
AdResponseType adResponseType;
161159

162160
Baggage baggage = Baggage.fromContextOrNull(Context.current());
161+
MutableContext evaluationContext = new MutableContext();
163162
if (baggage != null) {
164163
final String sessionId = baggage.getEntryValue("session.id");
165164
span.setAttribute("session.id", sessionId);
166-
ffClient.setEvaluationContext(new MutableContext().add("session", sessionId));
165+
evaluationContext.setTargetingKey(sessionId);
166+
evaluationContext.add("session", sessionId);
167167
} else {
168168
logger.info("no baggage found in context");
169169
}
170170

171+
CPULoad cpuload = CPULoad.getInstance();
172+
cpuload.execute(ffClient.getBooleanValue(ADSERVICE_HIGH_CPU_FEATURE_FLAG, false, evaluationContext));
173+
171174
span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString());
172175
span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount());
173176
if (req.getContextKeysCount() > 0) {
@@ -198,11 +201,11 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
198201
Attributes.of(
199202
adRequestTypeKey, adRequestType.name(), adResponseTypeKey, adResponseType.name()));
200203

201-
if (getFeatureFlagEnabled(ADSERVICE_FAILURE)) {
204+
if (ffClient.getBooleanValue(ADSERVICE_FAILURE, false, evaluationContext)) {
202205
throw new StatusRuntimeException(Status.UNAVAILABLE);
203206
}
204207

205-
if (getFeatureFlagEnabled(ADSERVICE_MANUAL_GC_FEATURE_FLAG)) {
208+
if (ffClient.getBooleanValue(ADSERVICE_MANUAL_GC_FEATURE_FLAG, false, evaluationContext)) {
206209
logger.warn("Feature Flag " + ADSERVICE_MANUAL_GC_FEATURE_FLAG + " enabled, performing a manual gc now");
207210
GarbageCollectionTrigger gct = new GarbageCollectionTrigger();
208211
gct.doExecute();
@@ -219,17 +222,6 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
219222
responseObserver.onError(e);
220223
}
221224
}
222-
223-
/**
224-
* Retrieves the status of a feature flag from the Feature Flag service.
225-
*
226-
* @param ff The name of the feature flag to retrieve.
227-
* @return {@code true} if the feature flag is enabled, {@code false} otherwise or in case of errors.
228-
*/
229-
boolean getFeatureFlagEnabled(String ff) {
230-
Boolean boolValue = ffClient.getBooleanValue(ff, false);
231-
return boolValue;
232-
}
233225
}
234226

235227
private static final ImmutableListMultimap<String, Ad> adsMap = createAdsMap();

src/flagd/demo.flagd.json

+2-11
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,8 @@
4747
"defaultVariant": "off",
4848
"targeting": {
4949
"fractional": [
50-
{
51-
"var": "session"
52-
},
53-
[
54-
"on",
55-
10
56-
],
57-
[
58-
"off",
59-
90
60-
]
50+
["on", 10],
51+
["off", 90]
6152
]
6253
}
6354
},

src/frontend/gateways/Api.gateway.ts

+31-15
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const { userId } = SessionGateway.getSession();
1212

1313
const basePath = '/api';
1414

15-
const ApiGateway = () => ({
15+
const Apis = () => ({
1616
getCart(currencyCode: string) {
1717
return request<IProductCart>({
1818
url: `${basePath}/cart`,
@@ -79,25 +79,41 @@ const ApiGateway = () => ({
7979
queryParams: {
8080
productIds,
8181
sessionId: userId,
82-
currencyCode
82+
currencyCode,
8383
},
8484
});
8585
},
8686
listAds(contextKeys: string[]) {
87-
// TODO: Figure out a better way to do this so session ID gets propagated to
88-
// all endpoints
89-
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
90-
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
91-
const newContext = propagation.setBaggage(context.active(), newBaggage);
92-
context.with(newContext, () => {
93-
return request<Ad[]>({
94-
url: `${basePath}/data`,
95-
queryParams: {
96-
contextKeys,
97-
},
98-
});
87+
return request<Ad[]>({
88+
url: `${basePath}/data`,
89+
queryParams: {
90+
contextKeys,
91+
},
9992
});
10093
},
10194
});
10295

103-
export default ApiGateway();
96+
/**
97+
* Extends all the API calls to set baggage automatically.
98+
*/
99+
const ApiGateway = new Proxy(Apis(), {
100+
get(target, prop, receiver) {
101+
const originalFunction = Reflect.get(target, prop, receiver);
102+
103+
if (typeof originalFunction !== 'function') {
104+
return originalFunction;
105+
}
106+
107+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
108+
return function (...args: any[]) {
109+
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
110+
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
111+
const newContext = propagation.setBaggage(context.active(), newBaggage);
112+
return context.with(newContext, () => {
113+
return Reflect.apply(originalFunction, undefined, args);
114+
});
115+
};
116+
},
117+
});
118+
119+
export default ApiGateway;

src/frontend/utils/telemetry/FrontendTracer.ts

+12-11
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,33 @@ import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'
1111
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
1212
import { SessionIdProcessor } from './SessionIdProcessor';
1313
import { detectResourcesSync } from '@opentelemetry/resources/build/src/detect-resources';
14+
import { ZoneContextManager } from '@opentelemetry/context-zone';
1415

15-
const { NEXT_PUBLIC_OTEL_SERVICE_NAME = '', NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '', IS_SYNTHETIC_REQUEST = '' } =
16-
typeof window !== 'undefined' ? window.ENV : {};
17-
18-
const FrontendTracer = async (collectorString: string) => {
19-
const { ZoneContextManager } = await import('@opentelemetry/context-zone');
16+
const {
17+
NEXT_PUBLIC_OTEL_SERVICE_NAME = '',
18+
NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '',
19+
IS_SYNTHETIC_REQUEST = '',
20+
} = typeof window !== 'undefined' ? window.ENV : {};
2021

22+
const FrontendTracer = (collectorString: string) => {
2123
let resource = new Resource({
2224
[SemanticResourceAttributes.SERVICE_NAME]: NEXT_PUBLIC_OTEL_SERVICE_NAME,
2325
});
2426

2527
const detectedResources = detectResourcesSync({ detectors: [browserDetector] });
2628
resource = resource.merge(detectedResources);
27-
const provider = new WebTracerProvider({
28-
resource
29-
});
29+
const provider = new WebTracerProvider({ resource });
3030

3131
provider.addSpanProcessor(new SessionIdProcessor());
3232

3333
provider.addSpanProcessor(
3434
new BatchSpanProcessor(
3535
new OTLPTraceExporter({
3636
url: NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT || collectorString || 'http://localhost:4318/v1/traces',
37-
}), {
38-
scheduledDelayMillis : 500
39-
}
37+
}),
38+
{
39+
scheduledDelayMillis: 500,
40+
}
4041
)
4142
);
4243

src/loadgenerator/locustfile.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@
99
import random
1010
import uuid
1111
import logging
12-
import sys
13-
from pythonjsonlogger import jsonlogger
12+
1413
from locust import HttpUser, task, between
1514
from locust_plugins.users.playwright import PlaywrightUser, pw, PageWithRetry, event
1615

1716
from opentelemetry import context, baggage, trace
1817
from opentelemetry.metrics import set_meter_provider
1918
from opentelemetry.sdk.metrics import MeterProvider
20-
from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader
19+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
2120
from opentelemetry.sdk.trace import TracerProvider
2221
from opentelemetry.sdk.trace.export import BatchSpanProcessor
2322
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
@@ -36,7 +35,6 @@
3635

3736
from openfeature import api
3837
from openfeature.contrib.provider.flagd import FlagdProvider
39-
from openfeature.exception import OpenFeatureError
4038

4139
from playwright.async_api import Route, Request
4240

@@ -172,7 +170,8 @@ def flood_home(self):
172170
self.client.get("/")
173171

174172
def on_start(self):
175-
ctx = baggage.set_baggage("synthetic_request", "true")
173+
ctx = baggage.set_baggage("session.id", str(uuid.uuid4()))
174+
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
176175
context.attach(ctx)
177176
self.index()
178177

@@ -210,8 +209,9 @@ async def add_product_to_cart(self, page: PageWithRetry):
210209

211210

212211
async def add_baggage_header(route: Route, request: Request):
212+
existing_baggage = request.headers.get('baggage', '')
213213
headers = {
214214
**request.headers,
215-
'baggage': 'synthetic_request=true'
215+
'baggage': ', '.join(filter(None, (existing_baggage, 'synthetic_request=true')))
216216
}
217217
await route.continue_(headers=headers)

0 commit comments

Comments
 (0)