Skip to content

Commit

Permalink
Read min cluster version directly from DiscoveryNodes (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#581)

* Simplify min cluster version lookup

Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Oct 20, 2022
1 parent cdb709a commit e8a4212
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 40 deletions.
23 changes: 6 additions & 17 deletions src/main/java/org/opensearch/knn/index/KNNClusterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@

package org.opensearch.knn.index;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;

/**
* Class abstracts information related to underlying OpenSearch cluster
Expand Down Expand Up @@ -48,22 +45,14 @@ public void initialize(final ClusterService clusterService) {
* @return minimal installed OpenSearch version, default to Version.CURRENT which is typically the latest version
*/
public Version getClusterMinVersion() {
Version minVersion = Version.CURRENT;
ImmutableOpenMap<String, DiscoveryNode> clusterDiscoveryNodes = ImmutableOpenMap.of();
log.debug("Reading cluster min version");
try {
clusterDiscoveryNodes = this.clusterService.state().getNodes().getNodes();
return this.clusterService.state().getNodes().getMinNodeVersion();
} catch (Exception exception) {
log.error("Cannot get cluster nodes", exception);
log.error(
String.format("Failed to get cluster minimum node version, returning current node version %s instead.", Version.CURRENT),
exception
);
return Version.CURRENT;
}
for (final ObjectCursor<DiscoveryNode> discoveryNodeCursor : clusterDiscoveryNodes.values()) {
final Version nodeVersion = discoveryNodeCursor.value.getVersion();
if (nodeVersion.before(minVersion)) {
minVersion = nodeVersion;
log.debug("Update cluster min version to {} based on node {}", nodeVersion, discoveryNodeCursor.value.toString());
}
}
log.debug("Return cluster min version {}", minVersion);
return minVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.knn.KNNTestCase;

import java.util.List;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.knn.index.KNNClusterTestUtils.mockClusterService;

public class KNNClusterContextTests extends KNNTestCase {

public void testSingleNodeCluster() {
ClusterService clusterService = mockClusterService(List.of(Version.V_2_4_0));
ClusterService clusterService = mockClusterService(Version.V_2_4_0);

final KNNClusterContext knnClusterContext = KNNClusterContext.instance();
knnClusterContext.initialize(clusterService);
Expand All @@ -29,7 +27,7 @@ public void testSingleNodeCluster() {
}

public void testMultipleNodesCluster() {
ClusterService clusterService = mockClusterService(List.of(Version.V_3_0_0, Version.V_2_3_0, Version.V_3_0_0));
ClusterService clusterService = mockClusterService(Version.V_2_3_0);

final KNNClusterContext knnClusterContext = KNNClusterContext.instance();
knnClusterContext.initialize(clusterService);
Expand Down
19 changes: 3 additions & 16 deletions src/test/java/org/opensearch/knn/index/KNNClusterTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;

import java.util.List;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength;

/**
* Collection of util methods required for testing and related to OpenSearch cluster setup and functionality
Expand All @@ -25,24 +20,16 @@ public class KNNClusterTestUtils {

/**
* Create new mock for ClusterService
* @param versions list of versions for cluster nodes
* @param version min version for cluster nodes
* @return
*/
public static ClusterService mockClusterService(final List<Version> versions) {
public static ClusterService mockClusterService(final Version version) {
ClusterService clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(clusterState.getNodes()).thenReturn(discoveryNodes);
ImmutableOpenMap.Builder<String, DiscoveryNode> builder = ImmutableOpenMap.builder();
for (Version version : versions) {
DiscoveryNode clusterNode = mock(DiscoveryNode.class);
when(clusterNode.getVersion()).thenReturn(version);
builder.put(randomAlphaOfLength(10), clusterNode);
}
ImmutableOpenMap<String, DiscoveryNode> mapOfNodes = builder.build();
when(discoveryNodes.getNodes()).thenReturn(mapOfNodes);

when(discoveryNodes.getMinNodeVersion()).thenReturn(version);
return clusterService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testFromXcontent() throws Exception {
}

public void testFromXcontent_WithFilter() throws Exception {
final ClusterService clusterService = mockClusterService(List.of(Version.CURRENT));
final ClusterService clusterService = mockClusterService(Version.CURRENT);

final KNNClusterContext knnClusterContext = KNNClusterContext.instance();
knnClusterContext.initialize(clusterService);
Expand All @@ -125,7 +125,7 @@ public void testFromXcontent_WithFilter() throws Exception {
}

public void testFromXcontent_WithFilter_UnsupportedClusterVersion() throws Exception {
final ClusterService clusterService = mockClusterService(List.of(Version.V_2_3_0));
final ClusterService clusterService = mockClusterService(Version.V_2_3_0);

final KNNClusterContext knnClusterContext = KNNClusterContext.instance();
knnClusterContext.initialize(clusterService);
Expand Down Expand Up @@ -266,7 +266,7 @@ private void assertSerialization(final Version version, final Optional<QueryBuil
? new KNNQueryBuilder(FIELD_NAME, QUERY_VECTOR, K, queryBuilderOptional.get())
: new KNNQueryBuilder(FIELD_NAME, QUERY_VECTOR, K);

final ClusterService clusterService = mockClusterService(List.of(version));
final ClusterService clusterService = mockClusterService(version);

final KNNClusterContext knnClusterContext = KNNClusterContext.instance();
knnClusterContext.initialize(clusterService);
Expand Down

0 comments on commit e8a4212

Please sign in to comment.