Skip to content

Commit

Permalink
Add support for CLUSTER FAILOVER.
Browse files Browse the repository at this point in the history
  • Loading branch information
SreedharReddyKallu authored and mp911de committed Apr 18, 2023
1 parent b3f83e6 commit 10f096b
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ public RedisFuture<String> clusterFailover(boolean force) {
return dispatch(commandBuilder.clusterFailover(force));
}

@Override
public RedisFuture<String> clusterFailover(boolean force, boolean takeOver) {
return dispatch(commandBuilder.clusterFailover(force, takeOver));
}

@Override
public RedisFuture<String> clusterFlushslots() {
return dispatch(commandBuilder.clusterFlushslots());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ public Mono<String> clusterFailover(boolean force) {
return createMono(() -> commandBuilder.clusterFailover(force));
}

@Override
public Mono<String> clusterFailover(boolean force, boolean takeOver) {
return createMono(() -> commandBuilder.clusterFailover(force, takeOver));
}

@Override
public Mono<String> clusterFlushslots() {
return createMono(commandBuilder::clusterFlushslots);
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,17 @@ Command<K, V, String> clusterFailover(boolean force) {
return createCommand(CLUSTER, new StatusOutput<>(codec), args);
}

Command<K, V, String> clusterFailover(boolean force, boolean takeOver) {

CommandArgs<K, V> args = new CommandArgs<>(codec).add(FAILOVER);
if (force) {
args.add(FORCE);
} else if (takeOver) {
args.add(TAKEOVER);
}
return createCommand(CLUSTER, new StatusOutput<>(codec), args);
}

Command<K, V, String> clusterFlushslots() {

CommandArgs<K, V> args = new CommandArgs<>(codec).add(FLUSHSLOTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public interface RedisClusterAsyncCommands<K, V> extends BaseRedisAsyncCommands<
*/
RedisFuture<String> clusterFailover(boolean force);

/**
* Failover a cluster node. Turns the currently connected node into a master and the master into its replica.
*
* @param force do not coordinate with master if {@code true}
* @param takeOver do not coordinate with the rest of the cluster if {@code true}
* force will take precedence over takeOver if both are set.
* @return String simple-string-reply
*/
RedisFuture<String> clusterFailover(boolean force, boolean takeOver);

/**
* Delete all the slots associated with the specified node. The number of deleted slots is returned.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ public interface RedisClusterReactiveCommands<K, V> extends BaseRedisReactiveCom
*/
Mono<String> clusterFailover(boolean force);

/**
* Failover a cluster node. Turns the currently connected node into a master and the master into its replica.
*
* @param force do not coordinate with master if {@code true}
* @param takeOver do not coordinate with the rest of the cluster if {@code true}
* @return String simple-string-reply
*/
Mono<String> clusterFailover(boolean force, boolean takeOver);

/**
* Delete all the slots associated with the specified node. The number of deleted slots is returned.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ public interface RedisClusterCommands<K, V> extends BaseRedisCommands<K, V>, Red
*/
String clusterFailover(boolean force);

/**
* Failover a cluster node. Turns the currently connected node into a master and the master into its replica.
*
* @param force do not coordinate with master if {@code true}
* @param takeOver do not coordinate with the rest of the cluster if {@code true}
* force will take precedence over takeOver if both are set.
* @return String simple-string-reply
*/
String clusterFailover(boolean force, boolean takeOver);

/**
* Delete all the slots associated with the specified node. The number of deleted slots is returned.
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandKeyword.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public enum CommandKeyword implements ProtocolKeyword {

RESETSTAT, RESTART, RETRYCOUNT, REWRITE, RIGHT, SAVECONFIG, SDSLEN, SETNAME, SETSLOT, SHARDS, SLOTS, STABLE,

MIGRATING, IMPORTING, SAVE, SKIPME, SLAVES, STREAM, STORE, SUM, SEGFAULT, SETUSER, TRACKING, TYPE, UNBLOCK, USERS, USAGE, WEIGHTS, WHOAMI,
MIGRATING, IMPORTING, SAVE, SKIPME, SLAVES, STREAM, STORE, SUM, SEGFAULT, SETUSER, TAKEOVER, TRACKING, TYPE, UNBLOCK, USERS, USAGE, WEIGHTS, WHOAMI,

WITHSCORES, WITHVALUES, XOR, XX, YES;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,32 @@ public void testClusterFailover() {
assertThat(redis5Node.is(RedisClusterNode.NodeFlag.REPLICA)).isTrue();
assertThat(redis6Node.is(RedisClusterNode.NodeFlag.UPSTREAM)).isTrue();

}
@Test
public void testClusterFailoverWithTakeOver() {

log.info("Cluster node 5 is master");
log.info("Cluster nodes seen from node 5:\n" + redissync5.clusterNodes());
log.info("Cluster nodes seen from node 6:\n" + redissync6.clusterNodes());

Wait.untilTrue(() -> getOwnPartition(redissync5).is(RedisClusterNode.NodeFlag.UPSTREAM)).waitOrTimeout();
Wait.untilTrue(() -> getOwnPartition(redissync6).is(RedisClusterNode.NodeFlag.REPLICA)).waitOrTimeout();

String failover = redissync6.clusterFailover(false, true);
assertThat(failover).isEqualTo("OK");

Wait.untilTrue(() -> getOwnPartition(redissync6).is(RedisClusterNode.NodeFlag.UPSTREAM)).waitOrTimeout();
Wait.untilTrue(() -> getOwnPartition(redissync5).is(RedisClusterNode.NodeFlag.REPLICA)).waitOrTimeout();

log.info("Cluster nodes seen from node 5 after clusterFailover:\n" + redissync5.clusterNodes());
log.info("Cluster nodes seen from node 6 after clusterFailover:\n" + redissync6.clusterNodes());

RedisClusterNode redis5Node = getOwnPartition(redissync5);
RedisClusterNode redis6Node = getOwnPartition(redissync6);

assertThat(redis5Node.is(RedisClusterNode.NodeFlag.REPLICA)).isTrue();
assertThat(redis6Node.is(RedisClusterNode.NodeFlag.UPSTREAM)).isTrue();

}

@Test
Expand Down

0 comments on commit 10f096b

Please sign in to comment.