Skip to content

Commit

Permalink
Merge pull request #126 from jcacciatore/bugfix-dualwrites
Browse files Browse the repository at this point in the history
Bug fixes for dual write functionality
  • Loading branch information
ipapapa authored Aug 14, 2016
2 parents 7dfef40 + 67c5a92 commit 7f50a46
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public ArchaiusConnectionPoolConfiguration(String name) {
retryPolicyFactory = parseRetryPolicyFactory(propertyPrefix);
compressionStrategy = parseCompressionStrategy(propertyPrefix);

isDualWriteEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + "dualwrite.enabled", super.isDualWriteEnabled());
dualWriteClusterName = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + "dualwrite.cluster", super.getDualWriteClusterName());
dualWritePercentage = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + "dualwrite.percentage", super.getDualWritePercentage());
isDualWriteEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".dualwrite.enabled", super.isDualWriteEnabled());
dualWriteClusterName = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".dualwrite.cluster", super.getDualWriteClusterName());
dualWritePercentage = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".dualwrite.percentage", super.getDualWritePercentage());
}


Expand Down Expand Up @@ -168,6 +168,46 @@ public boolean getFailOnStartupIfNoHosts() {
return failOnStartupIfNoHosts.get();
}

@Override
public boolean isDualWriteEnabled() {
return isDualWriteEnabled.get();
}

@Override
public String getDualWriteClusterName() {
return dualWriteClusterName.get();
}

@Override
public int getDualWritePercentage() {
return dualWritePercentage.get();
}

@Override
public String toString() {
return "ArchaiusConnectionPoolConfiguration{" +
"name=" + getName() +
", port=" + port +
", maxConnsPerHost=" + maxConnsPerHost +
", maxTimeoutWhenExhausted=" + maxTimeoutWhenExhausted +
", maxFailoverCount=" + maxFailoverCount +
", connectTimeout=" + connectTimeout +
", socketTimeout=" + socketTimeout +
", poolShutdownDelay=" + poolShutdownDelay +
", localZoneAffinity=" + localZoneAffinity +
", resetTimingsFrequency=" + resetTimingsFrequency +
", configPublisherConfig=" + configPublisherConfig +
", compressionThreshold=" + compressionThreshold +
", loadBalanceStrategy=" + loadBalanceStrategy +
", compressionStrategy=" + compressionStrategy +
", errorRateConfig=" + errorRateConfig +
", retryPolicyFactory=" + retryPolicyFactory +
", failOnStartupIfNoHosts=" + failOnStartupIfNoHosts +
", isDualWriteEnabled=" + isDualWriteEnabled +
", dualWriteClusterName=" + dualWriteClusterName +
", dualWritePercentage=" + dualWritePercentage +
'}';
}

private LoadBalancingStrategy parseLBStrategy(String propertyPrefix) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public EurekaHostsSupplier(String applicationName, DiscoveryClient dClient) {
this.discoveryClient = dClient;
}

public static EurekaHostsSupplier newInstance(String applicationName, EurekaHostsSupplier hostsSupplier) {
return new EurekaHostsSupplier(applicationName, hostsSupplier.getDiscoveryClient());
}

@Override
public List<Host> getHosts() {
return getUpdateFromEureka();
Expand Down Expand Up @@ -116,4 +120,11 @@ public String toString() {
return EurekaHostsSupplier.class.getName();
}

public String getApplicationName() {
return applicationName;
}

public DiscoveryClient getDiscoveryClient() {
return discoveryClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public ConnectionPoolConfigurationImpl(String name) {
* @param config
*/
public ConnectionPoolConfigurationImpl(ConnectionPoolConfigurationImpl config) {
this.name = config.getName() + "-shadow";
this.name = config.getName() + "_shadow";

this.compressionStrategy = config.getCompressionStrategy();
this.valueCompressionThreshold = config.getValueCompressionThreshold();
Expand All @@ -123,9 +123,9 @@ public ConnectionPoolConfigurationImpl(ConnectionPoolConfigurationImpl config) {
this.socketTimeout = config.getSocketTimeout();
this.errorMonitorFactory = config.getErrorMonitorFactory();
this.tokenSupplier = config.getTokenSupplier();
this.isDualWriteEnabled = config.isDualWriteEnabled;
this.dualWriteClusterName = config.dualWriteClusterName;
this.dualWritePercentage = config.dualWritePercentage;
this.isDualWriteEnabled = config.isDualWriteEnabled();
this.dualWriteClusterName = config.getDualWriteClusterName();
this.dualWritePercentage = config.getDualWritePercentage();
}

@Override
Expand Down Expand Up @@ -252,7 +252,40 @@ public int getDualWritePercentage() {
return dualWritePercentage;
}

// ALL SETTERS
@Override
public String toString() {
return "ConnectionPoolConfigurationImpl{" +
"name=" + name +
", hostSupplier=" + hostSupplier +
", tokenSupplier=" + tokenSupplier +
", hostConnectionPoolFactory=" + hostConnectionPoolFactory +
", name='" + name + '\'' +
", port=" + port +
", maxConnsPerHost=" + maxConnsPerHost +
", maxTimeoutWhenExhausted=" + maxTimeoutWhenExhausted +
", maxFailoverCount=" + maxFailoverCount +
", connectTimeout=" + connectTimeout +
", socketTimeout=" + socketTimeout +
", poolShutdownDelay=" + poolShutdownDelay +
", pingFrequencySeconds=" + pingFrequencySeconds +
", flushTimingsFrequencySeconds=" + flushTimingsFrequencySeconds +
", localZoneAffinity=" + localZoneAffinity +
", lbStrategy=" + lbStrategy +
", localRack='" + localRack + '\'' +
", localDataCenter='" + localDataCenter + '\'' +
", failOnStartupIfNoHosts=" + failOnStartupIfNoHosts +
", failOnStarupIfNoHostsSeconds=" + failOnStarupIfNoHostsSeconds +
", compressionStrategy=" + compressionStrategy +
", valueCompressionThreshold=" + valueCompressionThreshold +
", isDualWriteEnabled=" + isDualWriteEnabled +
", dualWriteClusterName='" + dualWriteClusterName + '\'' +
", dualWritePercentage=" + dualWritePercentage +
", retryFactory=" + retryFactory +
", errorMonitorFactory=" + errorMonitorFactory +
'}';
}

// ALL SETTERS
public ConnectionPoolConfigurationImpl setMaxConnsPerHost(int maxConnsPerHost) {
this.maxConnsPerHost = maxConnsPerHost;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,74 +174,106 @@ public void setRange(int range) {

@Override
public Long append(final String key, final String value) {
return this.d_append(key, value).getResult();
}

@Override
public OperationResult<Long> d_append(final String key, final String value) {
writeAsync(key, new Callable<OperationResult<Long>>() {
@Override
public OperationResult<Long> call() throws Exception {
return d_append(key, value);
return DynoDualWriterClient.super.d_append(key, value);
}
});

return targetClient.append(key, value);
return targetClient.d_append(key, value);
}

@Override
public String hmset(final String key, final Map<String, String> hash) {
return this.d_hmset(key, hash).getResult();
}

@Override
public OperationResult<String> d_hmset(final String key, final Map<String, String> hash) {
writeAsync(key, new Callable<OperationResult<String>>(){
@Override
public OperationResult<String> call() throws Exception {
return d_hmset(key, hash);
return DynoDualWriterClient.super.d_hmset(key, hash);
}
});

return targetClient.hmset(key, hash);
return targetClient.d_hmset(key, hash);
}

@Override
public Long sadd(final String key, final String... members) {
return this.d_sadd(key, members).getResult();
}

@Override
public OperationResult<Long> d_sadd(final String key, final String... members) {
writeAsync(key, new Callable<OperationResult<Long>>() {
@Override
public OperationResult<Long> call() throws Exception {
return d_sadd(key, members);
return DynoDualWriterClient.super.d_sadd(key, members);
}
});

return targetClient.sadd(key, members);
return targetClient.d_sadd(key, members);

}

@Override
public Long hset(final String key, final String field, final String value) {
return this.d_hset(key, field, value).getResult();
}

@Override
public OperationResult<Long> d_hset(final String key, final String field, final String value) {
writeAsync(key, new Callable<OperationResult<Long>>() {
@Override
public OperationResult<Long> call() throws Exception {
return d_hset(key, field, value);
return DynoDualWriterClient.super.d_hset(key, field, value);
}
});

return targetClient.hset(key, field, value);
return targetClient.d_hset(key, field, value);
}

@Override
public String set(final String key, final String value) {
return this.d_set(key, value).getResult();
}

@Override
public OperationResult<String> d_set(final String key, final String value) {
writeAsync(key, new Callable<OperationResult<String>>() {
@Override
public OperationResult<String> call() throws Exception {
return d_set(key, value);
return DynoDualWriterClient.super.d_set(key, value);
}
});

return targetClient.set(key, value);
return targetClient.d_set(key, value);
}

@Override
public String setex(final String key, int seconds, String value) {
return this.d_setex(key, seconds, value).getResult();
}

@Override
public OperationResult<String> d_setex(final String key, final Integer seconds, final String value) {
writeAsync(key, new Callable<OperationResult<String>>(){
@Override
public OperationResult<String> call() throws Exception {
return d_get(key);
return DynoDualWriterClient.super.d_setex(key, seconds, value);
}
});

return targetClient.setex(key, seconds, value);
return targetClient.d_setex(key, seconds, value);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public String decompressValue(String value, ConnectionContext ctx) {
}

public TopologyView getTopologyView() {
return (TopologyView) this.getConnPool();
return this.getConnPool();
}

@Override
Expand Down Expand Up @@ -3293,6 +3293,7 @@ public DynoJedisClient build() {

if (cpConfig == null) {
cpConfig = new ArchaiusConnectionPoolConfiguration(appName);
Logger.info("Dyno Client runtime properties: " + cpConfig.toString());
}

if (cpConfig.isDualWriteEnabled()) {
Expand All @@ -3306,6 +3307,7 @@ private DynoDualWriterClient buildDynoDualWriterClient() {
DynoJedisClient targetClient = buildDynoJedisClient();

ConnectionPoolConfigurationImpl shadowConfig = new ConnectionPoolConfigurationImpl(cpConfig);
Logger.info("Dyno Client Shadow Config runtime properties: " + shadowConfig.toString());

// Ensure that if the shadow cluster is down it will not block client application startup
shadowConfig.setFailOnStartupIfNoHosts(false);
Expand All @@ -3315,14 +3317,18 @@ private DynoDualWriterClient buildDynoDualWriterClient() {
}

HostSupplier shadowSupplier = null;
if (discoveryClient != null) {
if (dualWriteClusterName == null) {
dualWriteClusterName = shadowConfig.getDualWriteClusterName();
if (dualWriteHostSupplier == null) {
if (hostSupplier != null && hostSupplier instanceof EurekaHostsSupplier) {
EurekaHostsSupplier eurekaSupplier = (EurekaHostsSupplier) hostSupplier;
shadowSupplier = EurekaHostsSupplier.newInstance(shadowConfig.getDualWriteClusterName(), eurekaSupplier);
} else if (discoveryClient != null) {
shadowSupplier = new EurekaHostsSupplier(shadowConfig.getDualWriteClusterName(), discoveryClient);
} else {
throw new DynoConnectException("HostSupplier for DualWrite cluster is REQUIRED if you are not " +
"using EurekaHostsSupplier implementation or using a DiscoveryClient");
}
shadowSupplier = new EurekaHostsSupplier(dualWriteClusterName, discoveryClient);
} else if (dualWriteHostSupplier == null) {
throw new DynoConnectException("HostSupplier not provided for either target cluster or shadow cluster."+
" Cannot initialize EurekaHostsSupplier since it requires a DiscoveryClient");
} else {
shadowSupplier = dualWriteHostSupplier;
}

shadowConfig.withHostSupplier(shadowSupplier);
Expand Down

0 comments on commit 7f50a46

Please sign in to comment.