Skip to content

Commit

Permalink
re-add request decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
mredolatti committed Sep 12, 2024
1 parent 2299e38 commit fa2b6a7
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 105 deletions.
44 changes: 7 additions & 37 deletions client/src/main/java/io/split/client/RequestDecorator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

import io.split.client.dtos.RequestContext;

import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.Header;

import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Set;
import java.util.List;
import java.util.stream.Collectors;

public final class RequestDecorator {
CustomHeaderDecorator _headerDecorator;
Expand All @@ -36,42 +32,16 @@ public RequestDecorator(CustomHeaderDecorator headerDecorator) {
: headerDecorator;
}

public HttpRequest decorateHeaders(HttpRequest request) {
public RequestContext decorateHeaders(RequestContext request) {
try {
Map<String, List<String>> headers = _headerDecorator
.getHeaderOverrides(new RequestContext(convertToMap(request.getHeaders())));
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
if (isHeaderAllowed(entry.getKey())) {
List<String> values = entry.getValue();
for (int i = 0; i < values.size(); i++) {
if (i == 0) {
request.setHeader(entry.getKey(), values.get(i));
} else {
request.addHeader(entry.getKey(), values.get(i));
}
}
}
}
return new RequestContext(_headerDecorator.getHeaderOverrides(request)
.entrySet()
.stream()
.filter(e -> !forbiddenHeaders.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Problem adding custom headers to request decorator: %s", e), e);
}

return request;
}

private boolean isHeaderAllowed(String headerName) {
return !forbiddenHeaders.contains(headerName.toLowerCase());
}

private Map<String, List<String>> convertToMap(Header[] to_convert) {
Map<String, List<String>> to_return = new HashMap<String, List<String>>();
for (Integer i = 0; i < to_convert.length; i++) {
if (!to_return.containsKey(to_convert[i].getName())) {
to_return.put(to_convert[i].getName(), new ArrayList<String>());
}
to_return.get(to_convert[i].getName()).add(to_convert[i].getValue());
}
return to_return;
}
}
7 changes: 3 additions & 4 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,12 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
// SDKReadinessGates
_gates = new SDKReadinessGates();

RequestDecorator decorator = new RequestDecorator(config.customHeaderDecorator());
// HttpClient
_requestDecorator = new RequestDecorator(config.customHeaderDecorator());
if (config.alternativeHTTPModule() == null) {
_splitHttpClient = buildSplitHttpClient(apiToken, config, _sdkMetadata, _requestDecorator);
_splitHttpClient = buildSplitHttpClient(apiToken, config, _sdkMetadata, decorator);
} else {
_splitHttpClient = config.alternativeHTTPModule().createClient(apiToken, _sdkMetadata); // ,
// _requestDecorator);
_splitHttpClient = config.alternativeHTTPModule().createClient(apiToken, _sdkMetadata, decorator);
}

// Roots
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.split.client.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpRequest;

import io.split.client.RequestDecorator;
import io.split.client.dtos.RequestContext;

public class ApacheRequestDecorator {

public static HttpRequest decorate(HttpRequest request, RequestDecorator decorator) {

RequestContext ctx = new RequestContext(convertToMap(request.getHeaders()));
for (Map.Entry<String, List<String>> entry : decorator.decorateHeaders(ctx).headers().entrySet()) {
List<String> values = entry.getValue();
for (int i = 0; i < values.size(); i++) {
if (i == 0) {
request.setHeader(entry.getKey(), values.get(i));
} else {
request.addHeader(entry.getKey(), values.get(i));
}
}
}

return request;
}

private static Map<String, List<String>> convertToMap(Header[] to_convert) {
Map<String, List<String>> to_return = new HashMap<String, List<String>>();
for (Integer i = 0; i < to_convert.length; i++) {
if (!to_return.containsKey(to_convert[i].getName())) {
to_return.put(to_convert[i].getName(), new ArrayList<String>());
}
to_return.get(to_convert[i].getName()).add(to_convert[i].getValue());
}
return to_return;
}
}
52 changes: 32 additions & 20 deletions client/src/main/java/io/split/engine/sse/client/SSEClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Strings;
import io.split.client.RequestDecorator;
import io.split.client.utils.ApacheRequestDecorator;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
Expand Down Expand Up @@ -64,11 +65,11 @@ private enum ConnectionState {
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

public SSEClient(Function<RawEvent, Void> eventCallback,
Function<StatusMessage, Void> statusCallback,
CloseableHttpClient client,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory,
RequestDecorator requestDecorator) {
Function<StatusMessage, Void> statusCallback,
CloseableHttpClient client,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory,
RequestDecorator requestDecorator) {
_eventCallback = eventCallback;
_statusCallback = statusCallback;
_client = client;
Expand Down Expand Up @@ -96,7 +97,7 @@ public boolean open(URI uri) {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(e.getMessage() == null){
if (e.getMessage() == null) {
_log.info("The thread was interrupted while opening SSEClient");
return false;
}
Expand Down Expand Up @@ -152,31 +153,41 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
_log.debug(exc.getMessage());
if (SOCKET_CLOSED_MESSAGE.equals(exc.getMessage())) { // Connection closed by us
_statusCallback.apply(StatusMessage.FORCED_STOP);
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
_telemetryRuntimeProducer.recordStreamingEvents(
new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(),
System.currentTimeMillis()));
return;
}
// Connection closed by server
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
_telemetryRuntimeProducer
.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(),
System.currentTimeMillis()));
return;
} catch (IOException exc) { // Other type of connection error
if(!_forcedStop.get()) {
if (!_forcedStop.get()) {
_log.debug(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
_telemetryRuntimeProducer.recordStreamingEvents(
new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(),
System.currentTimeMillis()));
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
return;
}

_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
_telemetryRuntimeProducer
.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(),
System.currentTimeMillis()));
}
}
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
_telemetryRuntimeProducer
.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(),
System.currentTimeMillis()));
_log.warn(e.getMessage(), e);
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
} finally {
Expand All @@ -194,12 +205,13 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {

private boolean establishConnection(URI uri, CountDownLatch signal) {
HttpGet request = new HttpGet(uri);
request = (HttpGet) _requestDecorator.decorateHeaders(request);
request = (HttpGet) ApacheRequestDecorator.decorate(request, _requestDecorator);
_ongoingRequest.set(request);
try {
_ongoingResponse.set(_client.execute(_ongoingRequest.get()));
if (_ongoingResponse.get().getCode() != 200) {
_log.error(String.format("Establishing connection, code error: %s. The url is %s", _ongoingResponse.get().getCode(), uri.toURL()));
_log.error(String.format("Establishing connection, code error: %s. The url is %s",
_ongoingResponse.get().getCode(), uri.toURL()));
return false;
}
_state.set(ConnectionState.OPEN);
Expand Down Expand Up @@ -236,4 +248,4 @@ private void handleMessage(String message) {
RawEvent e = RawEvent.fromString(message);
_eventCallback.apply(e);
}
}
}
3 changes: 2 additions & 1 deletion client/src/main/java/io/split/service/CustomHttpModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
import java.io.IOException;

public interface CustomHttpModule {
public SplitHttpClient createClient(String apiToken, SDKMetadata sdkMetadata) throws IOException;
public SplitHttpClient createClient(String apiToken, SDKMetadata sdkMetadata, RequestDecorator decorator)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.service;

import io.split.client.RequestDecorator;
import io.split.client.utils.ApacheRequestDecorator;
import io.split.client.utils.SDKMetadata;
import io.split.client.utils.Utils;
import io.split.engine.common.FetchOptions;
Expand Down Expand Up @@ -76,7 +77,7 @@ public SplitHttpResponse get(URI uri, FetchOptions options, Map<String, List<Str
request.setHeader(HEADER_CACHE_CONTROL_NAME, HEADER_CACHE_CONTROL_VALUE);
}

_requestDecorator.decorateHeaders(request);
request = (HttpGet) ApacheRequestDecorator.decorate(request, _requestDecorator);

response = _client.execute(request);

Expand Down Expand Up @@ -121,7 +122,7 @@ public SplitHttpResponse post(URI uri, String body, Map<String, List<String>> ad
}
}
request.setEntity(HttpEntities.create(body, ContentType.APPLICATION_JSON));
request = (HttpPost) _requestDecorator.decorateHeaders(request);
request = (HttpPost) ApacheRequestDecorator.decorate(request, _requestDecorator);

response = _client.execute(request);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.split.httpmodules.okhttp;

import io.split.client.RequestDecorator;
import io.split.client.dtos.SplitHttpResponse;
import io.split.client.utils.SDKMetadata;
import io.split.engine.common.FetchOptions;
Expand All @@ -15,12 +16,14 @@
import java.net.HttpURLConnection;
import java.net.Proxy;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class OkHttpClientImpl implements SplitHttpClient {
protected OkHttpClient httpClient;
Expand All @@ -34,20 +37,17 @@ public class OkHttpClientImpl implements SplitHttpClient {
private static final String HEADER_CLIENT_VERSION = "SplitSDKVersion";
private String _apikey;
protected SDKMetadata _metadata;
private final RequestDecorator _decorator;

public OkHttpClientImpl(String apiToken, SDKMetadata sdkMetadata,
Proxy proxy, String proxyAuthKerberosPrincipalName, boolean debugEnabled,
int readTimeout, int connectionTimeout) throws IOException {
int readTimeout, int connectionTimeout, RequestDecorator decorator) throws IOException {
_apikey = apiToken;
_metadata = sdkMetadata;
setHttpClient(proxy, proxyAuthKerberosPrincipalName, debugEnabled,
readTimeout, connectionTimeout);
}

protected void setHttpClient(Proxy proxy, String proxyAuthKerberosPrincipalName, boolean debugEnabled,
int readTimeout, int connectionTimeout) throws IOException {
_decorator = decorator;
httpClient = initializeClient(proxy, proxyAuthKerberosPrincipalName, debugEnabled,
readTimeout, connectionTimeout);

}

protected OkHttpClient initializeClient(Proxy proxy, String proxyAuthKerberosPrincipalName, boolean debugEnabled,
Expand Down Expand Up @@ -86,8 +86,8 @@ public SplitHttpResponse get(URI uri, FetchOptions options, Map<String, List<Str
try {
okhttp3.Request.Builder requestBuilder = getRequestBuilder();
requestBuilder.url(uri.toString());
setBasicHeaders(requestBuilder);
setAdditionalAndDecoratedHeaders(requestBuilder, additionalHeaders);
Map<String, List<String>> headers = mergeHeaders(buildBasicHeaders(), additionalHeaders);
requestBuilder = OkHttpRequestDecorator.decorate(headers, requestBuilder, _decorator);
if (options.cacheControlHeadersEnabled()) {
requestBuilder.addHeader(HEADER_CACHE_CONTROL_NAME, HEADER_CACHE_CONTROL_VALUE);
}
Expand Down Expand Up @@ -128,8 +128,8 @@ public SplitHttpResponse post(URI url, String entity,
try {
okhttp3.Request.Builder requestBuilder = getRequestBuilder();
requestBuilder.url(url.toString());
setBasicHeaders(requestBuilder);
setAdditionalAndDecoratedHeaders(requestBuilder, additionalHeaders);
Map<String, List<String>> headers = mergeHeaders(buildBasicHeaders(), additionalHeaders);
requestBuilder = OkHttpRequestDecorator.decorate(headers, requestBuilder, _decorator);
requestBuilder.addHeader("Accept-Encoding", "gzip");
requestBuilder.addHeader("Content-Type", "application/json");
RequestBody postBody = RequestBody.create(MediaType.parse("application/json; charset=utf-16"), entity);
Expand Down Expand Up @@ -168,25 +168,31 @@ protected Request getRequest(okhttp3.Request.Builder requestBuilder) {
return requestBuilder.build();
}

protected void setBasicHeaders(okhttp3.Request.Builder requestBuilder) {
requestBuilder.addHeader(HEADER_API_KEY, "Bearer " + _apikey);
requestBuilder.addHeader(HEADER_CLIENT_VERSION, _metadata.getSdkVersion());
requestBuilder.addHeader(HEADER_CLIENT_MACHINE_IP, _metadata.getMachineIp());
requestBuilder.addHeader(HEADER_CLIENT_MACHINE_NAME, _metadata.getMachineName());
requestBuilder.addHeader(HEADER_CLIENT_KEY, _apikey.length() > 4
private Map<String, List<String>> buildBasicHeaders() {
Map<String, List<String>> h = new HashMap<>();
h.put(HEADER_API_KEY, Collections.singletonList("Bearer " + _apikey));
h.put(HEADER_CLIENT_VERSION, Collections.singletonList(_metadata.getSdkVersion()));
h.put(HEADER_CLIENT_MACHINE_IP, Collections.singletonList(_metadata.getMachineIp()));
h.put(HEADER_CLIENT_MACHINE_NAME, Collections.singletonList(_metadata.getMachineName()));
h.put(HEADER_CLIENT_KEY, Collections.singletonList(_apikey.length() > 4
? _apikey.substring(_apikey.length() - 4)
: _apikey);
: _apikey));
return h;
}

protected void setAdditionalAndDecoratedHeaders(okhttp3.Request.Builder requestBuilder,
Map<String, List<String>> additionalHeaders) {
if (additionalHeaders != null) {
for (Map.Entry<String, List<String>> entry : additionalHeaders.entrySet()) {
for (String value : entry.getValue()) {
requestBuilder.addHeader(entry.getKey(), value);
}
}
private static Map<String, List<String>> mergeHeaders(Map<String, List<String>> headers,
Map<String, List<String>> toAdd) {
if (toAdd == null || toAdd.size() == 0) {
return headers;
}

for (Map.Entry<String, List<String>> entry : toAdd.entrySet()) {
headers.computeIfPresent(entry.getKey(),
(k, oldValue) -> Stream.concat(oldValue.stream(), entry.getValue().stream())
.collect(Collectors.toList()));
}

return headers;
}

protected SplitHttpResponse.Header[] getResponseHeaders(Response response) {
Expand Down
Loading

0 comments on commit fa2b6a7

Please sign in to comment.