Skip to content

Commit

Permalink
feat: SSE (#162)
Browse files Browse the repository at this point in the history
* SSE initial

* Debugging SocketTimeoutException

* Cleanup logging; update local example

* use setupscheduler

* make runnable final to prevent modification

* reduce sleep for example

* Use shutdown instead of close

* use new mainclass spec

* check if ssemanager exists before closing

* cleanup
  • Loading branch information
JamieSinn authored Oct 1, 2024
1 parent a36561a commit 6ad44fa
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 26 deletions.
10 changes: 6 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protobuf {
}

ext {
retrofit_version = "2.9.0"
retrofit_version = "2.11.0"
jackson_version = "2.15.3"
swagger_annotations_version = "2.2.18"
lombok_version = "1.18.30"
Expand All @@ -142,6 +142,7 @@ ext {
mockito_core_version = "5.6.0"
protobuf_version = "3.24.4"
openfeature_version = "1.7.0"
eventsource_version = "4.1.1"
}

dependencies {
Expand All @@ -163,6 +164,7 @@ dependencies {
implementation("com.google.protobuf:protobuf-java:$protobuf_version")

implementation("dev.openfeature:sdk:$openfeature_version")
implementation("com.launchdarkly:okhttp-eventsource:$eventsource_version")

compileOnly("org.projectlombok:lombok:$lombok_version")

Expand Down Expand Up @@ -191,17 +193,17 @@ configurations {
task runLocalExample(type: JavaExec) {
description = "Run the local bucketing example"
classpath = sourceSets.examples.runtimeClasspath
main = 'com.devcycle.examples.LocalExample'
mainClass = 'com.devcycle.examples.LocalExample'
}

task runCloudExample(type: JavaExec) {
description = "Run the cloud bucketing example"
classpath = sourceSets.examples.runtimeClasspath
main = 'com.devcycle.examples.CloudExample'
mainClass = 'com.devcycle.examples.CloudExample'
}

task runOpenFeatureExample(type: JavaExec) {
description = "Run the OpenFeature example"
classpath = sourceSets.examples.runtimeClasspath
main = 'com.devcycle.examples.OpenFeatureExample'
mainClass = 'com.devcycle.examples.OpenFeatureExample'
}
9 changes: 7 additions & 2 deletions src/examples/java/com/devcycle/examples/LocalExample.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.devcycle.examples;

import com.devcycle.sdk.server.common.logging.SimpleDevCycleLogger;
import com.devcycle.sdk.server.common.model.DevCycleUser;
import com.devcycle.sdk.server.local.api.DevCycleLocalClient;
import com.devcycle.sdk.server.local.model.DevCycleLocalOptions;
Expand All @@ -22,8 +23,11 @@ public static void main(String[] args) throws InterruptedException {
// The default value can be of type string, boolean, number, or JSON
Boolean defaultValue = false;

DevCycleLocalOptions options = DevCycleLocalOptions.builder().configPollingIntervalMs(60000)
.disableAutomaticEventLogging(false).disableCustomEventLogging(false).build();
DevCycleLocalOptions options = DevCycleLocalOptions.builder()
.configPollingIntervalMS(60000)
.customLogger(new SimpleDevCycleLogger(SimpleDevCycleLogger.Level.DEBUG))
.enableBetaRealtimeUpdates(true)
.build();

// Initialize DevCycle Client
DevCycleLocalClient client = new DevCycleLocalClient(server_sdk_key, options);
Expand All @@ -46,5 +50,6 @@ public static void main(String[] args) throws InterruptedException {
} else {
System.out.println("feature is NOT enabled");
}
Thread.sleep(10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ProjectConfig {

@Schema(description = "Project Settings")
private Object project;

Expand All @@ -30,4 +31,8 @@ public class ProjectConfig {

@Schema(description = "Variable Hashes for all Variables in this Project")
private Object variableHashes;
}

@Schema(description = "SSE Configuration")
private SSE sse;
}

18 changes: 18 additions & 0 deletions src/main/java/com/devcycle/sdk/server/common/model/SSE.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.devcycle.sdk.server.common.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class SSE {
private String hostname;
private String path;
}

18 changes: 18 additions & 0 deletions src/main/java/com/devcycle/sdk/server/common/model/SSEMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.devcycle.sdk.server.common.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class SSEMessage {
private String etag;
private double lastModified;
private String type;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import com.devcycle.sdk.server.common.api.IDevCycleApi;
import com.devcycle.sdk.server.common.exception.DevCycleException;
import com.devcycle.sdk.server.common.logging.DevCycleLogger;
import com.devcycle.sdk.server.common.model.ErrorResponse;
import com.devcycle.sdk.server.common.model.HttpResponseCode;
import com.devcycle.sdk.server.common.model.ProjectConfig;
import com.devcycle.sdk.server.common.model.*;
import com.devcycle.sdk.server.local.api.DevCycleLocalApiClient;
import com.devcycle.sdk.server.local.bucketing.LocalBucketing;
import com.devcycle.sdk.server.local.model.DevCycleLocalOptions;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.StartedEvent;
import retrofit2.Call;
import retrofit2.Response;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
Expand All @@ -26,46 +29,52 @@ public final class EnvironmentConfigManager {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int DEFAULT_POLL_INTERVAL_MS = 30000;
private static final int MIN_INTERVALS_MS = 1000;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
private ScheduledExecutorService scheduler;
private final IDevCycleApi configApiClient;
private final LocalBucketing localBucketing;
private SSEManager sseManager;
private boolean isSSEConnected = false;
private final DevCycleLocalOptions options;

private ProjectConfig config;
private String configETag = "";
private String configLastModified = "";

private final String sdkKey;
private final int pollingIntervalMS;
private static final int pollingIntervalSSEMS = 15 * 60 * 60 * 1000;
private boolean pollingEnabled = true;

public EnvironmentConfigManager(String sdkKey, LocalBucketing localBucketing, DevCycleLocalOptions options) {
this.sdkKey = sdkKey;
this.localBucketing = localBucketing;
this.options = options;

configApiClient = new DevCycleLocalApiClient(sdkKey, options).initialize();

int configPollingIntervalMS = options.getConfigPollingIntervalMS();
pollingIntervalMS = configPollingIntervalMS >= MIN_INTERVALS_MS ? configPollingIntervalMS
: DEFAULT_POLL_INTERVAL_MS;

setupScheduler();
scheduler = setupScheduler();
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS);
}

private void setupScheduler() {
Runnable getConfigRunnable = new Runnable() {
public void run() {
try {
if (pollingEnabled) {
getConfig();
}
} catch (DevCycleException e) {
DevCycleLogger.error("Failed to load config: " + e.getMessage());
private ScheduledExecutorService setupScheduler() {
return Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
}

private final Runnable getConfigRunnable = new Runnable() {
public void run() {
try {
if (pollingEnabled) {
getConfig();
}
} catch (DevCycleException e) {
DevCycleLogger.error("Failed to load config: " + e.getMessage());
}
};

scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS);
}
}
};

public boolean isConfigInitialized() {
return config != null;
Expand All @@ -74,9 +83,57 @@ public boolean isConfigInitialized() {
private ProjectConfig getConfig() throws DevCycleException {
Call<ProjectConfig> config = this.configApiClient.getConfig(this.sdkKey, this.configETag, this.configLastModified);
this.config = getResponseWithRetries(config, 1);
if (this.options.isEnableBetaRealtimeUpdates()) {
try {
URI uri = new URI(this.config.getSse().getHostname() + this.config.getSse().getPath());
if (sseManager == null) {
sseManager = new SSEManager(uri);
}
sseManager.restart(uri, this::handleSSEMessage, this::handleSSEError, this::handleSSEStarted);
} catch (URISyntaxException e) {
DevCycleLogger.warning("Failed to create SSEManager: " + e.getMessage());
}
}
return this.config;
}

private Void handleSSEMessage(MessageEvent messageEvent) {
DevCycleLogger.debug("Received message: " + messageEvent.getData());
if (!isSSEConnected)
{
handleSSEStarted(null);
}

String data = messageEvent.getData();
if (data == null || data.isEmpty() || data.equals("keepalive")) {
return null;
}
try {
SSEMessage message = OBJECT_MAPPER.readValue(data, SSEMessage.class);
if (message.getType() == null || message.getType().equals("refetchConfig") || message.getType().isEmpty()) {
DevCycleLogger.debug("Received refetchConfig message, fetching new config");
getConfigRunnable.run();
}
} catch (JsonProcessingException e) {
DevCycleLogger.warning("Failed to parse SSE message: " + e.getMessage());
}
return null;
}

private Void handleSSEError(FaultEvent faultEvent) {
DevCycleLogger.warning("Received error: " + faultEvent.getCause());
return null;
}

private Void handleSSEStarted(StartedEvent startedEvent) {
isSSEConnected = true;
DevCycleLogger.debug("SSE Connected - setting polling interval to " + pollingIntervalSSEMS);
scheduler.shutdown();
scheduler = setupScheduler();
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, pollingIntervalSSEMS, TimeUnit.MILLISECONDS);
return null;
}

private ProjectConfig getResponseWithRetries(Call<ProjectConfig> call, int maxRetries) throws DevCycleException {
// attempt 0 is the initial request, attempt > 0 are all retries
int attempt = 0;
Expand Down Expand Up @@ -206,6 +263,9 @@ private void stopPolling() {
}

public void cleanup() {
if (sseManager != null) {
sseManager.close();
}
stopPolling();
}
}
Loading

0 comments on commit 6ad44fa

Please sign in to comment.