Skip to content

Commit

Permalink
Introduce nodeFilter Predicate to filter Partitions #1942
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jan 7, 2022
1 parent a9a9917 commit 64801c7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
48 changes: 42 additions & 6 deletions src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.Predicate;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.SslOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.ProtocolVersion;
Expand All @@ -46,19 +48,20 @@ public class ClusterClientOptions extends ClientOptions {

public static final boolean DEFAULT_VALIDATE_CLUSTER_MEMBERSHIP = true;

public static final Predicate<RedisClusterNode> DEFAULT_NODE_FILTER = node -> true;

private final int maxRedirects;

private final ClusterTopologyRefreshOptions topologyRefreshOptions;

private final boolean validateClusterNodeMembership;

private final Predicate<RedisClusterNode> nodeFilter;

protected ClusterClientOptions(Builder builder) {

super(builder);

this.validateClusterNodeMembership = builder.validateClusterNodeMembership;
this.maxRedirects = builder.maxRedirects;

ClusterTopologyRefreshOptions refreshOptions = builder.topologyRefreshOptions;

if (refreshOptions == null) {
Expand All @@ -70,15 +73,19 @@ protected ClusterClientOptions(Builder builder) {
}

this.topologyRefreshOptions = refreshOptions;
this.maxRedirects = builder.maxRedirects;
this.validateClusterNodeMembership = builder.validateClusterNodeMembership;
this.nodeFilter = builder.nodeFilter;
}

protected ClusterClientOptions(ClusterClientOptions original) {

super(original);

this.validateClusterNodeMembership = original.validateClusterNodeMembership;
this.maxRedirects = original.maxRedirects;
this.topologyRefreshOptions = original.topologyRefreshOptions;
this.validateClusterNodeMembership = original.validateClusterNodeMembership;
this.nodeFilter = original.nodeFilter;
}

/**
Expand Down Expand Up @@ -147,9 +154,11 @@ public static class Builder extends ClientOptions.Builder {

private boolean closeStaleConnections = DEFAULT_CLOSE_STALE_CONNECTIONS;

private int maxRedirects = DEFAULT_MAX_REDIRECTS;

private boolean validateClusterNodeMembership = DEFAULT_VALIDATE_CLUSTER_MEMBERSHIP;

private int maxRedirects = DEFAULT_MAX_REDIRECTS;
private Predicate<RedisClusterNode> nodeFilter = DEFAULT_NODE_FILTER;

private ClusterTopologyRefreshOptions topologyRefreshOptions = null;

Expand Down Expand Up @@ -290,6 +299,21 @@ public Builder validateClusterNodeMembership(boolean validateClusterNodeMembersh
return this;
}

/**
* Provide a {@link Predicate node filter} to filter cluster nodes from
* {@link io.lettuce.core.cluster.models.partitions.Partitions}.
*
* @param nodeFilter must not be {@code null}.
* @return {@code this}
* @since 6.1.6
*/
public Builder nodeFilter(Predicate<RedisClusterNode> nodeFilter) {

LettuceAssert.notNull(nodeFilter, "NodeFilter must not be null");
this.nodeFilter = nodeFilter;
return this;
}

/**
* Create a new instance of {@link ClusterClientOptions}
*
Expand Down Expand Up @@ -323,7 +347,7 @@ public ClusterClientOptions.Builder mutate() {
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions())
.topologyRefreshOptions(getTopologyRefreshOptions())
.validateClusterNodeMembership(isValidateClusterNodeMembership());
.validateClusterNodeMembership(isValidateClusterNodeMembership()).nodeFilter(getNodeFilter());

return builder;
}
Expand Down Expand Up @@ -389,4 +413,16 @@ public boolean isValidateClusterNodeMembership() {
return validateClusterNodeMembership;
}

/**
* The {@link Predicate node filter} to filter Redis Cluster nodes from
* {@link io.lettuce.core.cluster.models.partitions.Partitions}.
*
* @return the {@link Predicate node filter} to filter Redis Cluster nodes from
* {@link io.lettuce.core.cluster.models.partitions.Partitions}.
* @since 6.1.6
*/
public Predicate<RedisClusterNode> getNodeFilter() {
return nodeFilter;
}

}
17 changes: 17 additions & 0 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,23 @@ protected CompletableFuture<Partitions> loadPartitionsAsync() {
}
});

Predicate<RedisClusterNode> nodeFilter = getClusterClientOptions().getNodeFilter();

if (nodeFilter != ClusterClientOptions.DEFAULT_NODE_FILTER) {
return future.thenApply(partitions -> {

List<RedisClusterNode> toRemove = new ArrayList<>();
for (RedisClusterNode partition : partitions) {
if (!nodeFilter.test(partition)) {
toRemove.add(partition);
}
}

partitions.removeAll(toRemove);
return partitions;
});
}

return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package io.lettuce.core.cluster;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;

import java.nio.charset.StandardCharsets;
import java.util.function.Predicate;

import org.junit.jupiter.api.Test;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.protocol.ProtocolVersion;

/**
Expand All @@ -34,9 +36,10 @@ class ClusterClientOptionsUnitTests {
@Test
void testCopy() {

Predicate<RedisClusterNode> nodeFilter = it -> true;
ClusterClientOptions options = ClusterClientOptions.builder().autoReconnect(false).requestQueueSize(100)
.suspendReconnectOnProtocolFailure(true).maxRedirects(1234).validateClusterNodeMembership(false)
.protocolVersion(ProtocolVersion.RESP2).build();
.protocolVersion(ProtocolVersion.RESP2).nodeFilter(nodeFilter).build();

ClusterClientOptions copy = ClusterClientOptions.copyOf(options);

Expand All @@ -51,6 +54,7 @@ void testCopy() {
assertThat(copy.isSuspendReconnectOnProtocolFailure()).isEqualTo(options.isSuspendReconnectOnProtocolFailure());
assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects());
assertThat(copy.getScriptCharset()).isEqualTo(StandardCharsets.UTF_8);
assertThat(copy.getNodeFilter()).isEqualTo(nodeFilter);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,22 @@ void clusterNeedsAuthButNotSupplied() {
}
}

@Test
void appliesNodeFilter() {

RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(),
RedisURI.Builder.redis(host, ClusterTestSettings.port1).build());
try {

clusterClient.setOptions(
ClusterClientOptions.builder().nodeFilter(it -> it.is(RedisClusterNode.NodeFlag.UPSTREAM)).build());
Partitions partitions = clusterClient.getPartitions();
assertThat(partitions).hasSize(2).allMatch(it -> it.is(RedisClusterNode.NodeFlag.UPSTREAM));
} finally {
FastShutdown.shutdown(clusterClient);
}
}

@Test
void noClusterNodeAvailable() {

Expand Down

0 comments on commit 64801c7

Please sign in to comment.