From 67c5a924ca088c8eeb799a75458843fb635b4448 Mon Sep 17 00:00:00 2001 From: Jason Cacciatore Date: Fri, 12 Aug 2016 22:32:14 -0700 Subject: [PATCH] Bug fixes for dual write functionality * Property based settings now work properly * Fixed bug with injecting HostSupplier * Now support dual write functionality if client app is using d_ methods in DynoJedisClient --- .../ArchaiusConnectionPoolConfiguration.java | 46 ++++++++++++++- .../dyno/contrib/EurekaHostsSupplier.java | 11 ++++ .../impl/ConnectionPoolConfigurationImpl.java | 43 ++++++++++++-- .../dyno/jedis/DynoDualWriterClient.java | 56 +++++++++++++++---- .../netflix/dyno/jedis/DynoJedisClient.java | 22 +++++--- 5 files changed, 150 insertions(+), 28 deletions(-) diff --git a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java index 46ee753a..9dfef5c8 100644 --- a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java +++ b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/ArchaiusConnectionPoolConfiguration.java @@ -81,9 +81,9 @@ public ArchaiusConnectionPoolConfiguration(String name) { retryPolicyFactory = parseRetryPolicyFactory(propertyPrefix); compressionStrategy = parseCompressionStrategy(propertyPrefix); - isDualWriteEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + "dualwrite.enabled", super.isDualWriteEnabled()); - dualWriteClusterName = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + "dualwrite.cluster", super.getDualWriteClusterName()); - dualWritePercentage = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + "dualwrite.percentage", super.getDualWritePercentage()); + isDualWriteEnabled = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".dualwrite.enabled", super.isDualWriteEnabled()); + dualWriteClusterName = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".dualwrite.cluster", super.getDualWriteClusterName()); + dualWritePercentage = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".dualwrite.percentage", super.getDualWritePercentage()); } @@ -168,6 +168,46 @@ public boolean getFailOnStartupIfNoHosts() { return failOnStartupIfNoHosts.get(); } + @Override + public boolean isDualWriteEnabled() { + return isDualWriteEnabled.get(); + } + + @Override + public String getDualWriteClusterName() { + return dualWriteClusterName.get(); + } + + @Override + public int getDualWritePercentage() { + return dualWritePercentage.get(); + } + + @Override + public String toString() { + return "ArchaiusConnectionPoolConfiguration{" + + "name=" + getName() + + ", port=" + port + + ", maxConnsPerHost=" + maxConnsPerHost + + ", maxTimeoutWhenExhausted=" + maxTimeoutWhenExhausted + + ", maxFailoverCount=" + maxFailoverCount + + ", connectTimeout=" + connectTimeout + + ", socketTimeout=" + socketTimeout + + ", poolShutdownDelay=" + poolShutdownDelay + + ", localZoneAffinity=" + localZoneAffinity + + ", resetTimingsFrequency=" + resetTimingsFrequency + + ", configPublisherConfig=" + configPublisherConfig + + ", compressionThreshold=" + compressionThreshold + + ", loadBalanceStrategy=" + loadBalanceStrategy + + ", compressionStrategy=" + compressionStrategy + + ", errorRateConfig=" + errorRateConfig + + ", retryPolicyFactory=" + retryPolicyFactory + + ", failOnStartupIfNoHosts=" + failOnStartupIfNoHosts + + ", isDualWriteEnabled=" + isDualWriteEnabled + + ", dualWriteClusterName=" + dualWriteClusterName + + ", dualWritePercentage=" + dualWritePercentage + + '}'; + } private LoadBalancingStrategy parseLBStrategy(String propertyPrefix) { diff --git a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/EurekaHostsSupplier.java b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/EurekaHostsSupplier.java index 93d4184a..0b164189 100644 --- a/dyno-contrib/src/main/java/com/netflix/dyno/contrib/EurekaHostsSupplier.java +++ b/dyno-contrib/src/main/java/com/netflix/dyno/contrib/EurekaHostsSupplier.java @@ -56,6 +56,10 @@ public EurekaHostsSupplier(String applicationName, DiscoveryClient dClient) { this.discoveryClient = dClient; } + public static EurekaHostsSupplier newInstance(String applicationName, EurekaHostsSupplier hostsSupplier) { + return new EurekaHostsSupplier(applicationName, hostsSupplier.getDiscoveryClient()); + } + @Override public List getHosts() { return getUpdateFromEureka(); @@ -116,4 +120,11 @@ public String toString() { return EurekaHostsSupplier.class.getName(); } + public String getApplicationName() { + return applicationName; + } + + public DiscoveryClient getDiscoveryClient() { + return discoveryClient; + } } diff --git a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java index 7a93e60d..7fbba892 100644 --- a/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java +++ b/dyno-core/src/main/java/com/netflix/dyno/connectionpool/impl/ConnectionPoolConfigurationImpl.java @@ -103,7 +103,7 @@ public ConnectionPoolConfigurationImpl(String name) { * @param config */ public ConnectionPoolConfigurationImpl(ConnectionPoolConfigurationImpl config) { - this.name = config.getName() + "-shadow"; + this.name = config.getName() + "_shadow"; this.compressionStrategy = config.getCompressionStrategy(); this.valueCompressionThreshold = config.getValueCompressionThreshold(); @@ -123,9 +123,9 @@ public ConnectionPoolConfigurationImpl(ConnectionPoolConfigurationImpl config) { this.socketTimeout = config.getSocketTimeout(); this.errorMonitorFactory = config.getErrorMonitorFactory(); this.tokenSupplier = config.getTokenSupplier(); - this.isDualWriteEnabled = config.isDualWriteEnabled; - this.dualWriteClusterName = config.dualWriteClusterName; - this.dualWritePercentage = config.dualWritePercentage; + this.isDualWriteEnabled = config.isDualWriteEnabled(); + this.dualWriteClusterName = config.getDualWriteClusterName(); + this.dualWritePercentage = config.getDualWritePercentage(); } @Override @@ -252,7 +252,40 @@ public int getDualWritePercentage() { return dualWritePercentage; } - // ALL SETTERS + @Override + public String toString() { + return "ConnectionPoolConfigurationImpl{" + + "name=" + name + + ", hostSupplier=" + hostSupplier + + ", tokenSupplier=" + tokenSupplier + + ", hostConnectionPoolFactory=" + hostConnectionPoolFactory + + ", name='" + name + '\'' + + ", port=" + port + + ", maxConnsPerHost=" + maxConnsPerHost + + ", maxTimeoutWhenExhausted=" + maxTimeoutWhenExhausted + + ", maxFailoverCount=" + maxFailoverCount + + ", connectTimeout=" + connectTimeout + + ", socketTimeout=" + socketTimeout + + ", poolShutdownDelay=" + poolShutdownDelay + + ", pingFrequencySeconds=" + pingFrequencySeconds + + ", flushTimingsFrequencySeconds=" + flushTimingsFrequencySeconds + + ", localZoneAffinity=" + localZoneAffinity + + ", lbStrategy=" + lbStrategy + + ", localRack='" + localRack + '\'' + + ", localDataCenter='" + localDataCenter + '\'' + + ", failOnStartupIfNoHosts=" + failOnStartupIfNoHosts + + ", failOnStarupIfNoHostsSeconds=" + failOnStarupIfNoHostsSeconds + + ", compressionStrategy=" + compressionStrategy + + ", valueCompressionThreshold=" + valueCompressionThreshold + + ", isDualWriteEnabled=" + isDualWriteEnabled + + ", dualWriteClusterName='" + dualWriteClusterName + '\'' + + ", dualWritePercentage=" + dualWritePercentage + + ", retryFactory=" + retryFactory + + ", errorMonitorFactory=" + errorMonitorFactory + + '}'; + } + + // ALL SETTERS public ConnectionPoolConfigurationImpl setMaxConnsPerHost(int maxConnsPerHost) { this.maxConnsPerHost = maxConnsPerHost; return this; diff --git a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoDualWriterClient.java b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoDualWriterClient.java index dc1d24ea..1622267e 100644 --- a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoDualWriterClient.java +++ b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoDualWriterClient.java @@ -174,74 +174,106 @@ public void setRange(int range) { @Override public Long append(final String key, final String value) { + return this.d_append(key, value).getResult(); + } + + @Override + public OperationResult d_append(final String key, final String value) { writeAsync(key, new Callable>() { @Override public OperationResult call() throws Exception { - return d_append(key, value); + return DynoDualWriterClient.super.d_append(key, value); } }); - return targetClient.append(key, value); + return targetClient.d_append(key, value); } @Override public String hmset(final String key, final Map hash) { + return this.d_hmset(key, hash).getResult(); + } + + @Override + public OperationResult d_hmset(final String key, final Map hash) { writeAsync(key, new Callable>(){ @Override public OperationResult call() throws Exception { - return d_hmset(key, hash); + return DynoDualWriterClient.super.d_hmset(key, hash); } }); - return targetClient.hmset(key, hash); + return targetClient.d_hmset(key, hash); } @Override public Long sadd(final String key, final String... members) { + return this.d_sadd(key, members).getResult(); + } + + @Override + public OperationResult d_sadd(final String key, final String... members) { writeAsync(key, new Callable>() { @Override public OperationResult call() throws Exception { - return d_sadd(key, members); + return DynoDualWriterClient.super.d_sadd(key, members); } }); - return targetClient.sadd(key, members); + return targetClient.d_sadd(key, members); + } @Override public Long hset(final String key, final String field, final String value) { + return this.d_hset(key, field, value).getResult(); + } + + @Override + public OperationResult d_hset(final String key, final String field, final String value) { writeAsync(key, new Callable>() { @Override public OperationResult call() throws Exception { - return d_hset(key, field, value); + return DynoDualWriterClient.super.d_hset(key, field, value); } }); - return targetClient.hset(key, field, value); + return targetClient.d_hset(key, field, value); } @Override public String set(final String key, final String value) { + return this.d_set(key, value).getResult(); + } + + @Override + public OperationResult d_set(final String key, final String value) { writeAsync(key, new Callable>() { @Override public OperationResult call() throws Exception { - return d_set(key, value); + return DynoDualWriterClient.super.d_set(key, value); } }); - return targetClient.set(key, value); + return targetClient.d_set(key, value); } @Override public String setex(final String key, int seconds, String value) { + return this.d_setex(key, seconds, value).getResult(); + } + + @Override + public OperationResult d_setex(final String key, final Integer seconds, final String value) { writeAsync(key, new Callable>(){ @Override public OperationResult call() throws Exception { - return d_get(key); + return DynoDualWriterClient.super.d_setex(key, seconds, value); } }); - return targetClient.setex(key, seconds, value); + return targetClient.d_setex(key, seconds, value); + } @Override diff --git a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java index 89e15b94..cc8313ae 100644 --- a/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java +++ b/dyno-jedis/src/main/java/com/netflix/dyno/jedis/DynoJedisClient.java @@ -189,7 +189,7 @@ public String decompressValue(String value, ConnectionContext ctx) { } public TopologyView getTopologyView() { - return (TopologyView) this.getConnPool(); + return this.getConnPool(); } @Override @@ -3293,6 +3293,7 @@ public DynoJedisClient build() { if (cpConfig == null) { cpConfig = new ArchaiusConnectionPoolConfiguration(appName); + Logger.info("Dyno Client runtime properties: " + cpConfig.toString()); } if (cpConfig.isDualWriteEnabled()) { @@ -3306,6 +3307,7 @@ private DynoDualWriterClient buildDynoDualWriterClient() { DynoJedisClient targetClient = buildDynoJedisClient(); ConnectionPoolConfigurationImpl shadowConfig = new ConnectionPoolConfigurationImpl(cpConfig); + Logger.info("Dyno Client Shadow Config runtime properties: " + shadowConfig.toString()); // Ensure that if the shadow cluster is down it will not block client application startup shadowConfig.setFailOnStartupIfNoHosts(false); @@ -3315,14 +3317,18 @@ private DynoDualWriterClient buildDynoDualWriterClient() { } HostSupplier shadowSupplier = null; - if (discoveryClient != null) { - if (dualWriteClusterName == null) { - dualWriteClusterName = shadowConfig.getDualWriteClusterName(); + if (dualWriteHostSupplier == null) { + if (hostSupplier != null && hostSupplier instanceof EurekaHostsSupplier) { + EurekaHostsSupplier eurekaSupplier = (EurekaHostsSupplier) hostSupplier; + shadowSupplier = EurekaHostsSupplier.newInstance(shadowConfig.getDualWriteClusterName(), eurekaSupplier); + } else if (discoveryClient != null) { + shadowSupplier = new EurekaHostsSupplier(shadowConfig.getDualWriteClusterName(), discoveryClient); + } else { + throw new DynoConnectException("HostSupplier for DualWrite cluster is REQUIRED if you are not " + + "using EurekaHostsSupplier implementation or using a DiscoveryClient"); } - shadowSupplier = new EurekaHostsSupplier(dualWriteClusterName, discoveryClient); - } else if (dualWriteHostSupplier == null) { - throw new DynoConnectException("HostSupplier not provided for either target cluster or shadow cluster."+ - " Cannot initialize EurekaHostsSupplier since it requires a DiscoveryClient"); + } else { + shadowSupplier = dualWriteHostSupplier; } shadowConfig.withHostSupplier(shadowSupplier);