From 11d7691bc1bb8d1c15238754a74d3e7ec70a49cd Mon Sep 17 00:00:00 2001 From: Michael Beemer Date: Tue, 23 Apr 2024 09:53:32 -0400 Subject: [PATCH] improve baggage propagation Signed-off-by: Michael Beemer --- .../src/main/java/oteldemo/AdService.java | 26 ++++------- src/flagd/demo.flagd.json | 13 +----- src/frontend/gateways/Api.gateway.ts | 46 +++++++++++++------ .../utils/telemetry/FrontendTracer.ts | 23 +++++----- src/loadgenerator/locustfile.py | 12 ++--- 5 files changed, 60 insertions(+), 60 deletions(-) diff --git a/src/adservice/src/main/java/oteldemo/AdService.java b/src/adservice/src/main/java/oteldemo/AdService.java index 69eeb0bdad..5f03996896 100644 --- a/src/adservice/src/main/java/oteldemo/AdService.java +++ b/src/adservice/src/main/java/oteldemo/AdService.java @@ -135,7 +135,7 @@ private static class AdServiceImpl extends oteldemo.AdServiceGrpc.AdServiceImplB private static final String ADSERVICE_FAILURE = "adServiceFailure"; private static final String ADSERVICE_MANUAL_GC_FEATURE_FLAG = "adServiceManualGc"; private static final String ADSERVICE_HIGH_CPU_FEATURE_FLAG = "adServiceHighCpu"; - Client ffClient = OpenFeatureAPI.getInstance().getClient(); + private static final Client ffClient = OpenFeatureAPI.getInstance().getClient(); private AdServiceImpl() {} @@ -149,8 +149,6 @@ private AdServiceImpl() {} @Override public void getAds(AdRequest req, StreamObserver responseObserver) { AdService service = AdService.getInstance(); - CPULoad cpuload = CPULoad.getInstance(); - cpuload.execute(getFeatureFlagEnabled(ADSERVICE_HIGH_CPU_FEATURE_FLAG)); // get the current span in context Span span = Span.current(); @@ -160,14 +158,19 @@ public void getAds(AdRequest req, StreamObserver responseObserver) { AdResponseType adResponseType; Baggage baggage = Baggage.fromContextOrNull(Context.current()); + MutableContext evaluationContext = new MutableContext(); if (baggage != null) { final String sessionId = baggage.getEntryValue("session.id"); span.setAttribute("session.id", sessionId); - ffClient.setEvaluationContext(new MutableContext().add("session", sessionId)); + evaluationContext.setTargetingKey(sessionId); + evaluationContext.add("session", sessionId); } else { logger.info("no baggage found in context"); } + CPULoad cpuload = CPULoad.getInstance(); + cpuload.execute(ffClient.getBooleanValue(ADSERVICE_HIGH_CPU_FEATURE_FLAG, false, evaluationContext)); + span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString()); span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount()); if (req.getContextKeysCount() > 0) { @@ -198,11 +201,11 @@ public void getAds(AdRequest req, StreamObserver responseObserver) { Attributes.of( adRequestTypeKey, adRequestType.name(), adResponseTypeKey, adResponseType.name())); - if (getFeatureFlagEnabled(ADSERVICE_FAILURE)) { + if (ffClient.getBooleanValue(ADSERVICE_FAILURE, false, evaluationContext)) { throw new StatusRuntimeException(Status.UNAVAILABLE); } - if (getFeatureFlagEnabled(ADSERVICE_MANUAL_GC_FEATURE_FLAG)) { + if (ffClient.getBooleanValue(ADSERVICE_MANUAL_GC_FEATURE_FLAG, false, evaluationContext)) { logger.warn("Feature Flag " + ADSERVICE_MANUAL_GC_FEATURE_FLAG + " enabled, performing a manual gc now"); GarbageCollectionTrigger gct = new GarbageCollectionTrigger(); gct.doExecute(); @@ -219,17 +222,6 @@ public void getAds(AdRequest req, StreamObserver responseObserver) { responseObserver.onError(e); } } - - /** - * Retrieves the status of a feature flag from the Feature Flag service. - * - * @param ff The name of the feature flag to retrieve. - * @return {@code true} if the feature flag is enabled, {@code false} otherwise or in case of errors. - */ - boolean getFeatureFlagEnabled(String ff) { - Boolean boolValue = ffClient.getBooleanValue(ff, false); - return boolValue; - } } private static final ImmutableListMultimap adsMap = createAdsMap(); diff --git a/src/flagd/demo.flagd.json b/src/flagd/demo.flagd.json index d4c38a390c..10ddb0e24a 100644 --- a/src/flagd/demo.flagd.json +++ b/src/flagd/demo.flagd.json @@ -47,17 +47,8 @@ "defaultVariant": "off", "targeting": { "fractional": [ - { - "var": "session" - }, - [ - "on", - 10 - ], - [ - "off", - 90 - ] + ["on", 10], + ["off", 90] ] } }, diff --git a/src/frontend/gateways/Api.gateway.ts b/src/frontend/gateways/Api.gateway.ts index e2742e0572..eb9438ccd2 100644 --- a/src/frontend/gateways/Api.gateway.ts +++ b/src/frontend/gateways/Api.gateway.ts @@ -12,7 +12,7 @@ const { userId } = SessionGateway.getSession(); const basePath = '/api'; -const ApiGateway = () => ({ +const Apis = () => ({ getCart(currencyCode: string) { return request({ url: `${basePath}/cart`, @@ -79,25 +79,41 @@ const ApiGateway = () => ({ queryParams: { productIds, sessionId: userId, - currencyCode + currencyCode, }, }); }, listAds(contextKeys: string[]) { - // TODO: Figure out a better way to do this so session ID gets propagated to - // all endpoints - const baggage = propagation.getActiveBaggage() || propagation.createBaggage(); - const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId }); - const newContext = propagation.setBaggage(context.active(), newBaggage); - context.with(newContext, () => { - return request({ - url: `${basePath}/data`, - queryParams: { - contextKeys, - }, - }); + return request({ + url: `${basePath}/data`, + queryParams: { + contextKeys, + }, }); }, }); -export default ApiGateway(); +/** + * Extends all the API calls to set baggage automatically. + */ +const ApiGateway = new Proxy(Apis(), { + get(target, prop, receiver) { + const originalFunction = Reflect.get(target, prop, receiver); + + if (typeof originalFunction !== 'function') { + return originalFunction; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function (...args: any[]) { + const baggage = propagation.getActiveBaggage() || propagation.createBaggage(); + const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId }); + const newContext = propagation.setBaggage(context.active(), newBaggage); + return context.with(newContext, () => { + return Reflect.apply(originalFunction, undefined, args); + }); + }; + }, +}); + +export default ApiGateway; diff --git a/src/frontend/utils/telemetry/FrontendTracer.ts b/src/frontend/utils/telemetry/FrontendTracer.ts index c7ccc83c8e..de9e8a30ec 100644 --- a/src/frontend/utils/telemetry/FrontendTracer.ts +++ b/src/frontend/utils/telemetry/FrontendTracer.ts @@ -11,22 +11,22 @@ import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions' import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'; import { SessionIdProcessor } from './SessionIdProcessor'; import { detectResourcesSync } from '@opentelemetry/resources/build/src/detect-resources'; +import { ZoneContextManager } from '@opentelemetry/context-zone'; -const { NEXT_PUBLIC_OTEL_SERVICE_NAME = '', NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '', IS_SYNTHETIC_REQUEST = '' } = - typeof window !== 'undefined' ? window.ENV : {}; - -const FrontendTracer = async (collectorString: string) => { - const { ZoneContextManager } = await import('@opentelemetry/context-zone'); +const { + NEXT_PUBLIC_OTEL_SERVICE_NAME = '', + NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '', + IS_SYNTHETIC_REQUEST = '', +} = typeof window !== 'undefined' ? window.ENV : {}; +const FrontendTracer = (collectorString: string) => { let resource = new Resource({ [SemanticResourceAttributes.SERVICE_NAME]: NEXT_PUBLIC_OTEL_SERVICE_NAME, }); const detectedResources = detectResourcesSync({ detectors: [browserDetector] }); resource = resource.merge(detectedResources); - const provider = new WebTracerProvider({ - resource - }); + const provider = new WebTracerProvider({ resource }); provider.addSpanProcessor(new SessionIdProcessor()); @@ -34,9 +34,10 @@ const FrontendTracer = async (collectorString: string) => { new BatchSpanProcessor( new OTLPTraceExporter({ url: NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT || collectorString || 'http://localhost:4318/v1/traces', - }), { - scheduledDelayMillis : 500 - } + }), + { + scheduledDelayMillis: 500, + } ) ); diff --git a/src/loadgenerator/locustfile.py b/src/loadgenerator/locustfile.py index 77c33796c8..d6ddff9485 100644 --- a/src/loadgenerator/locustfile.py +++ b/src/loadgenerator/locustfile.py @@ -9,15 +9,14 @@ import random import uuid import logging -import sys -from pythonjsonlogger import jsonlogger + from locust import HttpUser, task, between from locust_plugins.users.playwright import PlaywrightUser, pw, PageWithRetry, event from opentelemetry import context, baggage, trace from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter @@ -36,7 +35,6 @@ from openfeature import api from openfeature.contrib.provider.flagd import FlagdProvider -from openfeature.exception import OpenFeatureError from playwright.async_api import Route, Request @@ -172,7 +170,8 @@ def flood_home(self): self.client.get("/") def on_start(self): - ctx = baggage.set_baggage("synthetic_request", "true") + ctx = baggage.set_baggage("session.id", str(uuid.uuid4())) + ctx = baggage.set_baggage("synthetic_request", "true", context=ctx) context.attach(ctx) self.index() @@ -210,8 +209,9 @@ async def add_product_to_cart(self, page: PageWithRetry): async def add_baggage_header(route: Route, request: Request): + existing_baggage = request.headers.get('baggage', '') headers = { **request.headers, - 'baggage': 'synthetic_request=true' + 'baggage': ', '.join(filter(None, (existing_baggage, 'synthetic_request=true'))) } await route.continue_(headers=headers)