Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why not to get cluster state with ClusterStateRequest using RestClient #667

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 177 additions & 9 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@

package org.opensearch.sdk;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

Expand All @@ -22,6 +27,7 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import org.apache.hc.core5.function.Factory;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
Expand All @@ -34,6 +40,8 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionType;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
Expand All @@ -49,14 +57,15 @@
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.IndicesOptions.WildcardStates;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Cancellable;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.ClusterClient;
import org.opensearch.client.GetAliasesResponse;
import org.opensearch.client.IndicesClient;
import org.opensearch.client.RequestOptions;
Expand All @@ -79,6 +88,18 @@
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;

import javax.net.ssl.SSLEngine;

Expand Down Expand Up @@ -371,7 +392,7 @@ public SDKRestClient admin() {
* @return An instance of a cluster admin client.
*/
public SDKClusterAdminClient cluster() {
return new SDKClusterAdminClient(restHighLevelClient.cluster());
return new SDKClusterAdminClient(restHighLevelClient);
}

/**
Expand Down Expand Up @@ -521,20 +542,167 @@ public void close() throws IOException {
*/
public static class SDKClusterAdminClient {

private final ClusterClient clusterClient;
private final RestHighLevelClient restHighLevelClient;

/**
* Instantiate this class using a {@link ClusterClient}.
* Instantiate this class.
*
* @param clusterClient The client to wrap
* @param restHighLevelClient The Rest client to use for wrapped functions
*/
public SDKClusterAdminClient(ClusterClient clusterClient) {
this.clusterClient = clusterClient;
public SDKClusterAdminClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

// TODO: Implement state()
// https://github.com/opensearch-project/opensearch-sdk-java/issues/354
/**
* The state of the cluster.
*
* @param request The cluster state request.
* @param listener A listener to be notified with a result
* @return a cancellable async operation
*/
Cancellable state(ClusterStateRequest request, ActionListener<ClusterStateResponse> listener) {
Request restRequest = new Request("GET", "/_cluster/state");
Map<String, String> params = new HashMap<>();

// add any metrics, comma-delimited
EnumSet<ClusterState.Metric> metrics = EnumSet.noneOf(ClusterState.Metric.class);
if (request.routingTable()) {
metrics.add(ClusterState.Metric.ROUTING_TABLE);
}
if (request.nodes()) {
metrics.add(ClusterState.Metric.NODES);
}
if (request.metadata()) {
metrics.add(ClusterState.Metric.METADATA);
}
if (request.blocks()) {
metrics.add(ClusterState.Metric.BLOCKS);
}
if (request.customs()) {
metrics.add(ClusterState.Metric.CUSTOMS);
}
if (!metrics.isEmpty()) {
params.put("metric", metrics.stream().map(ClusterState.Metric::toString).collect(Collectors.joining(",")));
}

// add any indices; if none will default to _all
if (request.indices().length > 0) {
params.put("indices", Arrays.stream(request.indices()).collect(Collectors.joining(",")));
}

// add inidces options
IndicesOptions options = request.indicesOptions();
String expandWildcards = options.getExpandWildcards()
.stream()
.map(WildcardStates::name)
.map(String::toLowerCase)
.collect(Collectors.joining(","));
if (!expandWildcards.isEmpty()) {
params.put("expand_wildcards", expandWildcards);
}
params.put("ignore_unavailable", Boolean.toString(options.ignoreUnavailable()));
params.put("allow_no_indices", Boolean.toString(options.allowNoIndices()));
params.put("ignore_throttled", Boolean.toString(options.ignoreThrottled()));

// Add other values
params.put("wait_for_timeout", request.waitForTimeout().getStringRep());
params.put("wait_for_metadata_version", Long.toString(request.waitForMetadataVersion()));
params.put("cluster_manager_timeout", request.clusterManagerNodeTimeout().getStringRep());

// Add params to request and we're done with the request
restRequest.addParameters(params);

ResponseListener responseListener = new ResponseListener() {
@Override
public void onSuccess(Response response) {
HttpEntity body = response.getEntity();
XContentType type = XContentType.fromFormat(body.getContentType());
if (type != null) {
try {
ClusterName clusterName = null;
long version = 0L;
String stateUUID = null;
Metadata metadata = null;
RoutingTable routingTable = null;
DiscoveryNodes nodes = null;
ClusterBlocks blocks = null;
ImmutableOpenMap<String, Custom> customs = null;
int minimumClusterManagerNodesOnPublishingClusterManager = 0;

XContentParser parser = type.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, body.getContent());
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case "cluster_name":
clusterName = new ClusterName(parser.text());
break;
case "version":
version = parser.longValue();
break;
case "state_uuid":
stateUUID = parser.text();
break;
case "metadata":
case "meta-data":
metadata = Metadata.fromXContent(parser);
break;
case "routing_table":
fieldName = parser.currentName(); // "indices"
parser.nextToken();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
fieldName = parser.currentName();
parser.nextToken();
// NOT TODO: I'm giving up on trying to parse this mess
}
break;
case "routing_nodes":
// Same horrible nested parsing as routing_table
break;
case "blocks":
// Nobody bothered to do ClusterBlock fromXContent either
break;
case "nodes":
// Guess what? No DiscoveryNode fromXContent
break;
case "I don't even know what key to look for for customs as they are subclasses":
// Yeah. I'm giving up
break;
default:
}

}
ClusterState state = new ClusterState(
clusterName,
version,
stateUUID,
metadata,
routingTable,
nodes,
blocks,
customs,
minimumClusterManagerNodesOnPublishingClusterManager,
false
);

ClusterStateResponse clusterStateResponse = new ClusterStateResponse(clusterName, state, false);
listener.onResponse(clusterStateResponse);
} catch (UnsupportedOperationException | IOException e) {
listener.onFailure(e);
}
}
}

@Override
public void onFailure(Exception exception) {
listener.onFailure(exception);
}
};
return restHighLevelClient.getLowLevelClient().performRequestAsync(restRequest, responseListener);
}
}

/**
Expand Down