Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent baggage propagation #1545

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions src/adservice/src/main/java/oteldemo/AdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private static class AdServiceImpl extends oteldemo.AdServiceGrpc.AdServiceImplB
private static final String ADSERVICE_FAILURE = "adServiceFailure";
private static final String ADSERVICE_MANUAL_GC_FEATURE_FLAG = "adServiceManualGc";
private static final String ADSERVICE_HIGH_CPU_FEATURE_FLAG = "adServiceHighCpu";
Client ffClient = OpenFeatureAPI.getInstance().getClient();
private static final Client ffClient = OpenFeatureAPI.getInstance().getClient();

private AdServiceImpl() {}

Expand All @@ -149,8 +149,6 @@ private AdServiceImpl() {}
@Override
public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
AdService service = AdService.getInstance();
CPULoad cpuload = CPULoad.getInstance();
cpuload.execute(getFeatureFlagEnabled(ADSERVICE_HIGH_CPU_FEATURE_FLAG));

// get the current span in context
Span span = Span.current();
Expand All @@ -160,14 +158,19 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
AdResponseType adResponseType;

Baggage baggage = Baggage.fromContextOrNull(Context.current());
MutableContext evaluationContext = new MutableContext();
if (baggage != null) {
final String sessionId = baggage.getEntryValue("session.id");
span.setAttribute("session.id", sessionId);
ffClient.setEvaluationContext(new MutableContext().add("session", sessionId));
evaluationContext.setTargetingKey(sessionId);
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
evaluationContext.add("session", sessionId);
} else {
logger.info("no baggage found in context");
}

CPULoad cpuload = CPULoad.getInstance();
cpuload.execute(ffClient.getBooleanValue(ADSERVICE_HIGH_CPU_FEATURE_FLAG, false, evaluationContext));

span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString());
span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount());
if (req.getContextKeysCount() > 0) {
Expand Down Expand Up @@ -198,11 +201,11 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
Attributes.of(
adRequestTypeKey, adRequestType.name(), adResponseTypeKey, adResponseType.name()));

if (getFeatureFlagEnabled(ADSERVICE_FAILURE)) {
if (ffClient.getBooleanValue(ADSERVICE_FAILURE, false, evaluationContext)) {
throw new StatusRuntimeException(Status.UNAVAILABLE);
}

if (getFeatureFlagEnabled(ADSERVICE_MANUAL_GC_FEATURE_FLAG)) {
if (ffClient.getBooleanValue(ADSERVICE_MANUAL_GC_FEATURE_FLAG, false, evaluationContext)) {
logger.warn("Feature Flag " + ADSERVICE_MANUAL_GC_FEATURE_FLAG + " enabled, performing a manual gc now");
GarbageCollectionTrigger gct = new GarbageCollectionTrigger();
gct.doExecute();
Expand All @@ -219,17 +222,6 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
responseObserver.onError(e);
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return {@code true} if the feature flag is enabled, {@code false} otherwise or in case of errors.
*/
boolean getFeatureFlagEnabled(String ff) {
Boolean boolValue = ffClient.getBooleanValue(ff, false);
return boolValue;
}
}

private static final ImmutableListMultimap<String, Ad> adsMap = createAdsMap();
Expand Down
13 changes: 2 additions & 11 deletions src/flagd/demo.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,8 @@
"defaultVariant": "off",
"targeting": {
"fractional": [
{
"var": "session"
},
[
"on",
10
],
[
"off",
90
]
["on", 10],
["off", 90]
]
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
}
},
Expand Down
46 changes: 31 additions & 15 deletions src/frontend/gateways/Api.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { userId } = SessionGateway.getSession();

const basePath = '/api';

const ApiGateway = () => ({
const Apis = () => ({
getCart(currencyCode: string) {
return request<IProductCart>({
url: `${basePath}/cart`,
Expand Down Expand Up @@ -79,25 +79,41 @@ const ApiGateway = () => ({
queryParams: {
productIds,
sessionId: userId,
currencyCode
currencyCode,
},
});
},
listAds(contextKeys: string[]) {
// TODO: Figure out a better way to do this so session ID gets propagated to
// all endpoints
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
const newContext = propagation.setBaggage(context.active(), newBaggage);
context.with(newContext, () => {
return request<Ad[]>({
url: `${basePath}/data`,
queryParams: {
contextKeys,
},
});
return request<Ad[]>({
url: `${basePath}/data`,
queryParams: {
contextKeys,
},
});
},
});

export default ApiGateway();
/**
* Extends all the API calls to set baggage automatically.
*/
const ApiGateway = new Proxy(Apis(), {
get(target, prop, receiver) {
const originalFunction = Reflect.get(target, prop, receiver);

if (typeof originalFunction !== 'function') {
return originalFunction;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return function (...args: any[]) {
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
const newContext = propagation.setBaggage(context.active(), newBaggage);
return context.with(newContext, () => {
return Reflect.apply(originalFunction, undefined, args);
});
};
},
});

export default ApiGateway;
23 changes: 12 additions & 11 deletions src/frontend/utils/telemetry/FrontendTracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,33 @@ import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { SessionIdProcessor } from './SessionIdProcessor';
import { detectResourcesSync } from '@opentelemetry/resources/build/src/detect-resources';
import { ZoneContextManager } from '@opentelemetry/context-zone';
austinlparker marked this conversation as resolved.
Show resolved Hide resolved

const { NEXT_PUBLIC_OTEL_SERVICE_NAME = '', NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '', IS_SYNTHETIC_REQUEST = '' } =
typeof window !== 'undefined' ? window.ENV : {};

const FrontendTracer = async (collectorString: string) => {
const { ZoneContextManager } = await import('@opentelemetry/context-zone');
const {
NEXT_PUBLIC_OTEL_SERVICE_NAME = '',
NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '',
IS_SYNTHETIC_REQUEST = '',
} = typeof window !== 'undefined' ? window.ENV : {};

const FrontendTracer = (collectorString: string) => {
let resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: NEXT_PUBLIC_OTEL_SERVICE_NAME,
});

const detectedResources = detectResourcesSync({ detectors: [browserDetector] });
resource = resource.merge(detectedResources);
const provider = new WebTracerProvider({
resource
});
const provider = new WebTracerProvider({ resource });

provider.addSpanProcessor(new SessionIdProcessor());

provider.addSpanProcessor(
new BatchSpanProcessor(
new OTLPTraceExporter({
url: NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT || collectorString || 'http://localhost:4318/v1/traces',
}), {
scheduledDelayMillis : 500
}
}),
{
scheduledDelayMillis: 500,
}
)
);

Expand Down
12 changes: 6 additions & 6 deletions src/loadgenerator/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
import random
import uuid
import logging
import sys
from pythonjsonlogger import jsonlogger

from locust import HttpUser, task, between
from locust_plugins.users.playwright import PlaywrightUser, pw, PageWithRetry, event

from opentelemetry import context, baggage, trace
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
Expand All @@ -36,7 +35,6 @@

from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
from openfeature.exception import OpenFeatureError

from playwright.async_api import Route, Request

Expand Down Expand Up @@ -172,7 +170,8 @@ def flood_home(self):
self.client.get("/")

def on_start(self):
ctx = baggage.set_baggage("synthetic_request", "true")
ctx = baggage.set_baggage("session.id", str(uuid.uuid4()))
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
context.attach(ctx)
self.index()

Expand Down Expand Up @@ -210,8 +209,9 @@ async def add_product_to_cart(self, page: PageWithRetry):


async def add_baggage_header(route: Route, request: Request):
existing_baggage = request.headers.get('baggage', '')
headers = {
**request.headers,
'baggage': 'synthetic_request=true'
'baggage': ', '.join(filter(None, (existing_baggage, 'synthetic_request=true')))
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
}
await route.continue_(headers=headers)
Loading