Skip to content

Commit

Permalink
Cross-zone (rack) fallback implementation
Browse files Browse the repository at this point in the history
Currently cross-zone fallback only occurs when dyno client recognizes that a dynomite server is down. With this change any time an error occurs either obtaining a connection from the pool or with the execution of an operation the connection for the retry will be borrowed from a remote zone rather than the same local host pool where the error just occurred.

This change also contains the following minor fixes:
* Bug fix in failover metric reporting
* Automatic disabling of local zone affinity if the local zone has not been set. This often happens w/local laptop development + unit testing.
* Cleanup of 'datacenter' terminology to rack/zone. Initially datacenter was equivalent to zone/rack in Dynomite terminology. This changed however the dyno client code was never changed to reflect this which makes reading the code confusing.
  • Loading branch information
jcacciatore committed Jul 19, 2016
1 parent a95074d commit b2a9520
Show file tree
Hide file tree
Showing 18 changed files with 347 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigura
private final DynamicIntProperty connectTimeout;
private final DynamicIntProperty socketTimeout;
private final DynamicIntProperty poolShutdownDelay;
private final DynamicBooleanProperty localDcAffinity;
private final DynamicBooleanProperty localZoneAffinity;
private final DynamicIntProperty resetTimingsFrequency;
private final DynamicStringProperty configPublisherConfig;
private final DynamicIntProperty compressionThreshold;
Expand All @@ -66,7 +66,7 @@ public ArchaiusConnectionPoolConfiguration(String name) {
connectTimeout = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".connection.connectTimeout", super.getConnectTimeout());
socketTimeout = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".connection.socketTimeout", super.getSocketTimeout());
poolShutdownDelay = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".connection.poolShutdownDelay", super.getPoolShutdownDelay());
localDcAffinity = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".connection.localDcAffinity", super.localDcAffinity());
localZoneAffinity = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".connection.localZoneAffinity", super.localZoneAffinity());
resetTimingsFrequency = DynamicPropertyFactory.getInstance().getIntProperty(propertyPrefix + ".connection.metrics.resetFrequencySeconds", super.getTimingCountersResetFrequencySeconds());
configPublisherConfig = DynamicPropertyFactory.getInstance().getStringProperty(propertyPrefix + ".config.publisher.address", super.getConfigurationPublisherConfig());
failOnStartupIfNoHosts = DynamicPropertyFactory.getInstance().getBooleanProperty(propertyPrefix + ".config.startup.failIfNoHosts", super.getFailOnStartupIfNoHosts());
Expand Down Expand Up @@ -127,8 +127,8 @@ public int getPoolShutdownDelay() {
}

@Override
public boolean localDcAffinity() {
return localDcAffinity.get();
public boolean localZoneAffinity() {
return localZoneAffinity.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ enum CompressionStrategy {
*
* @return
*/
boolean localDcAffinity();
boolean localZoneAffinity();

/**
*
Expand Down Expand Up @@ -123,7 +123,9 @@ enum CompressionStrategy {
*
* @return
*/
String getLocalDC();
String getLocalRack();

String getLocalDataCenter();

/**
* Returns the amount of time the histogram accumulates data before it is cleared, in seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public String getRack() {
return rack;
}

public Host setRack(String datacenter) {
this.rack = datacenter;
public Host setRack(String rack) {
this.rack = rack;
return this;
}

Expand Down Expand Up @@ -123,6 +123,6 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return "Host [name=" + name + ", port=" + port + ", dc: " + rack + ", status: " + status.name() + "]";
return "Host [name=" + name + ", port=" + port + ", rack: " + rack + ", status: " + status.name() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* </ol>
*
* This class is intended to be used within a collection of {@link HostConnectionPool} tracked by a
* {@link ConnectionPool} for all the {@link Host}(s) within a cassandra cluster.
* {@link ConnectionPool} for all the {@link Host}(s) within a Dynomite cluster.
*
* @see {@link ConnectionPool} for references to this class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface RetryPolicy {
* Ask the policy is a retry can use a remote dc
* @return boolean
*/
boolean allowRemoteDCFallback();
boolean allowCrossZoneFallback();

/**
* Return the number of attempts since begin was called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public int getReplicationFactor() {
return replicationFactor;
}

public TokenStatus getTokensForRack(String rack) {
public List<TokenStatus> getTokensForRack(String rack) {
if (rack != null && map.containsKey(rack)) {
map.get(rack);
return map.get(rack);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfigurat
private int poolShutdownDelay = DEFAULT_POOL_SHUTDOWN_DELAY;
private int pingFrequencySeconds = DEFAULT_PING_FREQ_SECONDS;
private int flushTimingsFrequencySeconds = DEFAULT_FLUSH_TIMINGS_FREQ_SECONDS;
private boolean localDcAffinity = DEFAULT_LOCAL_DC_AFFINITY;
private boolean localZoneAffinity = DEFAULT_LOCAL_DC_AFFINITY;
private LoadBalancingStrategy lbStrategy = DEFAULT_LB_STRATEGY;
private String localDC;
private String localRack;
private String localDataCenter;
private String configPublisherAddress = DEFAULT_CONFIG_PUBLISHER_ADDRESS;
private boolean failOnStartupIfNoHosts = DEFAULT_FAIL_ON_STARTUP_IFNOHOSTS;
private int failOnStarupIfNoHostsSeconds = DEFAULT_FAIL_ON_STARTUP_IFNOHOSTS_SECONDS;
Expand All @@ -83,7 +84,8 @@ public RetryPolicy getRetryPolicy() {

public ConnectionPoolConfigurationImpl(String name) {
this.name = name;
this.localDC = ConfigUtils.getLocalZone();
this.localRack = ConfigUtils.getLocalZone();
this.localDataCenter = ConfigUtils.getDataCenter();
}

@Override
Expand Down Expand Up @@ -137,8 +139,8 @@ public int getPoolShutdownDelay() {
}

@Override
public boolean localDcAffinity() {
return localDcAffinity;
public boolean localZoneAffinity() {
return localZoneAffinity;
}

@Override
Expand All @@ -157,8 +159,13 @@ public int getPingFrequencySeconds() {
}

@Override
public String getLocalDC() {
return localDC;
public String getLocalRack() {
return localRack;
}

@Override
public String getLocalDataCenter() {
return localDataCenter;
}

@Override
Expand Down Expand Up @@ -236,8 +243,8 @@ public ConnectionPoolConfigurationImpl setPingFrequencySeconds(int seconds) {
return this;
}

public ConnectionPoolConfigurationImpl setLocalDcAffinity(boolean condition) {
localDcAffinity = condition;
public ConnectionPoolConfigurationImpl setLocalZoneAffinity(boolean condition) {
localZoneAffinity = condition;
return this;
}

Expand Down Expand Up @@ -350,9 +357,14 @@ public int getWindowCoveragePercentage() {
}
}

public ConnectionPoolConfigurationImpl setLocalDC(String dc) {
this.localDC = dc;
public ConnectionPoolConfigurationImpl setLocalRack(String rack) {
this.localRack = rack;
return this;
}

public ConnectionPoolConfigurationImpl setLocalDataCenter(String dc) {
this.localRack = dc;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public List<HostConnectionPool<CL>> getPools() {

@Override
public Future<Boolean> updateHosts(Collection<Host> hostsUp, Collection<Host> hostsDown) {
Logger.debug("Updating hosts: UP=%s, DOWN=%s", hostsUp, hostsDown);
Logger.debug(String.format("Updating hosts: UP=%s, DOWN=%s", hostsUp, hostsDown));
boolean condition = false;
if (hostsUp != null && !hostsUp.isEmpty()) {
for (Host hostUp : hostsUp) {
Expand Down Expand Up @@ -284,9 +284,13 @@ public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws Dy
do {
Connection<CL> connection = null;

try {
connection =
selectionStrategy.getConnection(op, cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS);
try {
connection = selectionStrategy.getConnectionUsingRetryPolicy(
op,
cpConfiguration.getMaxTimeoutWhenExhausted(),
TimeUnit.MILLISECONDS,
retry
);

OperationResult<R> result = connection.execute(op);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Map<String, String> getRuntimeConfiguration(String cpName) {
final Map<String, String> config = new LinkedHashMap<>();

// Rather than use reflection to iterate and find getters, simply provide the base configuration
config.put("localRack", cpConfig.getLocalDC());
config.put("localRack", cpConfig.getLocalRack());
config.put("compressionStrategy", cpConfig.getCompressionStrategy().name());
config.put("compressionThreshold", String.valueOf(cpConfig.getValueCompressionThreshold()));
config.put("connectTimeout", String.valueOf(cpConfig.getConnectTimeout()));
Expand All @@ -169,6 +169,8 @@ public Map<String, String> getRuntimeConfiguration(String cpName) {
config.put("timingCountersResetFrequencyInSecs",
String.valueOf(cpConfig.getTimingCountersResetFrequencySeconds()));
config.put("replicationFactor", String.valueOf(pool.getTopology().getReplicationFactor()));
config.put("retryPolicy", pool.getConfiguration().getRetryPolicyFactory().getRetryPolicy().toString());
config.put("localRackAffinity", String.valueOf(pool.getConfiguration().localZoneAffinity()));

return Collections.unmodifiableMap(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public class RetryNTimes implements RetryPolicy {

private int n;
private final AtomicInteger count = new AtomicInteger(0);
private final boolean allowRemoteDCFallback;
private final boolean allowCrossZoneFallback;

public RetryNTimes(int n, boolean allowFallback) {
this.n = n;
this.allowRemoteDCFallback = allowFallback;
this.allowCrossZoneFallback = allowFallback;
}

@Override
Expand Down Expand Up @@ -63,27 +63,35 @@ public int getAttemptCount() {
}

@Override
public boolean allowRemoteDCFallback() {
return allowRemoteDCFallback;
public boolean allowCrossZoneFallback() {
return allowCrossZoneFallback;
}

public static class RetryFactory implements RetryPolicyFactory {

int n;
boolean allowDCFallback;
boolean allowCrossZoneFallback;

public RetryFactory(int n) {
this(n, true);
}

public RetryFactory(int n, boolean allowFallback) {
this.n = n;
this.allowDCFallback = allowFallback;
this.allowCrossZoneFallback = allowFallback;
}

@Override
public RetryPolicy getRetryPolicy() {
return new RetryNTimes(n, allowDCFallback);
return new RetryNTimes(n, allowCrossZoneFallback);
}

@Override
public String toString() {
return "RetryFactory{" +
"n=" + n +
", allowCrossZoneFallback=" + allowCrossZoneFallback +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ public RetryPolicy getRetryPolicy() {
}

@Override
public boolean allowRemoteDCFallback() {
public boolean allowCrossZoneFallback() {
return false;
}

@Override
public String toString() {
return "RunOnce";
}
}
Loading

0 comments on commit b2a9520

Please sign in to comment.