Skip to content

Commit

Permalink
Extract NodeInfoMetrics to top-level class (elastic#99990)
Browse files Browse the repository at this point in the history
  • Loading branch information
NEUpanning authored Sep 28, 2023
1 parent d106fec commit a698f4d
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.node.info;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
* This class is a container that encapsulates the necessary information needed to indicate which node information is requested.
*/
public class NodesInfoMetrics implements Writeable {
private Set<String> requestedMetrics = Metric.allMetrics();

public NodesInfoMetrics() {}

public NodesInfoMetrics(StreamInput in) throws IOException {
requestedMetrics.clear();
requestedMetrics.addAll(Arrays.asList(in.readStringArray()));
}

public Set<String> requestedMetrics() {
return requestedMetrics;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(requestedMetrics.toArray(new String[0]));
}

/**
* An enumeration of the "core" sections of metrics that may be requested
* from the nodes information endpoint. Eventually this list will be
* pluggable.
*/
public enum Metric {
SETTINGS("settings"),
OS("os"),
PROCESS("process"),
JVM("jvm"),
THREAD_POOL("thread_pool"),
TRANSPORT("transport"),
HTTP("http"),
REMOTE_CLUSTER_SERVER("remote_cluster_server"),
PLUGINS("plugins"),
INGEST("ingest"),
AGGREGATIONS("aggregations"),
INDICES("indices");

private final String metricName;

Metric(String name) {
this.metricName = name;
}

public String metricName() {
return this.metricName;
}

public static Set<String> allMetrics() {
return Arrays.stream(values()).map(Metric::metricName).collect(Collectors.toSet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
* A request to get node (cluster) level information.
*/
public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {

private Set<String> requestedMetrics = Metric.allMetrics();
private NodesInfoMetrics nodesInfoMetrics;

/**
* Create a new NodeInfoRequest from a {@link StreamInput} object.
Expand All @@ -34,8 +32,7 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
*/
public NodesInfoRequest(StreamInput in) throws IOException {
super(in);
requestedMetrics.clear();
requestedMetrics.addAll(Arrays.asList(in.readStringArray()));
nodesInfoMetrics = new NodesInfoMetrics(in);
}

/**
Expand All @@ -45,40 +42,41 @@ public NodesInfoRequest(StreamInput in) throws IOException {
@SuppressWarnings("this-escape")
public NodesInfoRequest(String... nodesIds) {
super(nodesIds);
nodesInfoMetrics = new NodesInfoMetrics();
all();
}

/**
* Clears all info flags.
*/
public NodesInfoRequest clear() {
requestedMetrics.clear();
nodesInfoMetrics.requestedMetrics().clear();
return this;
}

/**
* Sets to return all the data.
*/
public NodesInfoRequest all() {
requestedMetrics.addAll(Metric.allMetrics());
nodesInfoMetrics.requestedMetrics().addAll(NodesInfoMetrics.Metric.allMetrics());
return this;
}

/**
* Get the names of requested metrics
*/
public Set<String> requestedMetrics() {
return Set.copyOf(requestedMetrics);
return Set.copyOf(nodesInfoMetrics.requestedMetrics());
}

/**
* Add metric
*/
public NodesInfoRequest addMetric(String metric) {
if (Metric.allMetrics().contains(metric) == false) {
if (NodesInfoMetrics.Metric.allMetrics().contains(metric) == false) {
throw new IllegalStateException("Used an illegal metric: " + metric);
}
requestedMetrics.add(metric);
nodesInfoMetrics.requestedMetrics().add(metric);
return this;
}

Expand All @@ -87,38 +85,38 @@ public NodesInfoRequest addMetric(String metric) {
*/
public NodesInfoRequest addMetrics(String... metrics) {
SortedSet<String> metricsSet = new TreeSet<>(Set.of(metrics));
if (Metric.allMetrics().containsAll(metricsSet) == false) {
metricsSet.removeAll(Metric.allMetrics());
if (NodesInfoMetrics.Metric.allMetrics().containsAll(metricsSet) == false) {
metricsSet.removeAll(NodesInfoMetrics.Metric.allMetrics());
String plural = metricsSet.size() == 1 ? "" : "s";
throw new IllegalStateException("Used illegal metric" + plural + ": " + metricsSet);
}
requestedMetrics.addAll(metricsSet);
nodesInfoMetrics.requestedMetrics().addAll(metricsSet);
return this;
}

/**
* Remove metric
*/
public NodesInfoRequest removeMetric(String metric) {
if (Metric.allMetrics().contains(metric) == false) {
if (NodesInfoMetrics.Metric.allMetrics().contains(metric) == false) {
throw new IllegalStateException("Used an illegal metric: " + metric);
}
requestedMetrics.remove(metric);
nodesInfoMetrics.requestedMetrics().remove(metric);
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(requestedMetrics);
nodesInfoMetrics.writeTo(out);
}

/**
* Helper method for creating NodesInfoRequests with desired metrics
* @param metrics the metrics to include in the request
* @return
*/
public static NodesInfoRequest requestWithMetrics(Metric... metrics) {
public static NodesInfoRequest requestWithMetrics(NodesInfoMetrics.Metric... metrics) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
for (var metric : metrics) {
Expand All @@ -127,37 +125,7 @@ public static NodesInfoRequest requestWithMetrics(Metric... metrics) {
return nodesInfoRequest;
}

/**
* An enumeration of the "core" sections of metrics that may be requested
* from the nodes information endpoint. Eventually this list list will be
* pluggable.
*/
public enum Metric {
SETTINGS("settings"),
OS("os"),
PROCESS("process"),
JVM("jvm"),
THREAD_POOL("thread_pool"),
TRANSPORT("transport"),
HTTP("http"),
REMOTE_CLUSTER_SERVER("remote_cluster_server"),
PLUGINS("plugins"),
INGEST("ingest"),
AGGREGATIONS("aggregations"),
INDICES("indices");

private final String metricName;

Metric(String name) {
this.metricName = name;
}

public String metricName() {
return this.metricName;
}

public static Set<String> allMetrics() {
return Arrays.stream(values()).map(Metric::metricName).collect(Collectors.toSet());
}
public NodesInfoMetrics getNodesInfoMetrics() {
return nodesInfoMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,83 +38,83 @@ public NodesInfoRequestBuilder all() {
* Should the node settings be returned.
*/
public NodesInfoRequestBuilder setSettings(boolean settings) {
addOrRemoveMetric(settings, NodesInfoRequest.Metric.SETTINGS);
addOrRemoveMetric(settings, NodesInfoMetrics.Metric.SETTINGS);
return this;
}

/**
* Should the node OS info be returned.
*/
public NodesInfoRequestBuilder setOs(boolean os) {
addOrRemoveMetric(os, NodesInfoRequest.Metric.OS);
addOrRemoveMetric(os, NodesInfoMetrics.Metric.OS);
return this;
}

/**
* Should the node OS process be returned.
*/
public NodesInfoRequestBuilder setProcess(boolean process) {
addOrRemoveMetric(process, NodesInfoRequest.Metric.PROCESS);
addOrRemoveMetric(process, NodesInfoMetrics.Metric.PROCESS);
return this;
}

/**
* Should the node JVM info be returned.
*/
public NodesInfoRequestBuilder setJvm(boolean jvm) {
addOrRemoveMetric(jvm, NodesInfoRequest.Metric.JVM);
addOrRemoveMetric(jvm, NodesInfoMetrics.Metric.JVM);
return this;
}

/**
* Should the node thread pool info be returned.
*/
public NodesInfoRequestBuilder setThreadPool(boolean threadPool) {
addOrRemoveMetric(threadPool, NodesInfoRequest.Metric.THREAD_POOL);
addOrRemoveMetric(threadPool, NodesInfoMetrics.Metric.THREAD_POOL);
return this;
}

/**
* Should the node Transport info be returned.
*/
public NodesInfoRequestBuilder setTransport(boolean transport) {
addOrRemoveMetric(transport, NodesInfoRequest.Metric.TRANSPORT);
addOrRemoveMetric(transport, NodesInfoMetrics.Metric.TRANSPORT);
return this;
}

/**
* Should the node HTTP info be returned.
*/
public NodesInfoRequestBuilder setHttp(boolean http) {
addOrRemoveMetric(http, NodesInfoRequest.Metric.HTTP);
addOrRemoveMetric(http, NodesInfoMetrics.Metric.HTTP);
return this;
}

/**
* Should the node plugins info be returned.
*/
public NodesInfoRequestBuilder setPlugins(boolean plugins) {
addOrRemoveMetric(plugins, NodesInfoRequest.Metric.PLUGINS);
addOrRemoveMetric(plugins, NodesInfoMetrics.Metric.PLUGINS);
return this;
}

/**
* Should the node ingest info be returned.
*/
public NodesInfoRequestBuilder setIngest(boolean ingest) {
addOrRemoveMetric(ingest, NodesInfoRequest.Metric.INGEST);
addOrRemoveMetric(ingest, NodesInfoMetrics.Metric.INGEST);
return this;
}

/**
* Should the node indices info be returned.
*/
public NodesInfoRequestBuilder setIndices(boolean indices) {
addOrRemoveMetric(indices, NodesInfoRequest.Metric.INDICES);
addOrRemoveMetric(indices, NodesInfoMetrics.Metric.INDICES);
return this;
}

private void addOrRemoveMetric(boolean includeMetric, NodesInfoRequest.Metric metric) {
private void addOrRemoveMetric(boolean includeMetric, NodesInfoMetrics.Metric metric) {
if (includeMetric) {
request.addMetric(metric.metricName());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) {
NodesInfoRequest request = nodeRequest.request;
Set<String> metrics = request.requestedMetrics();
return nodeService.info(
metrics.contains(NodesInfoRequest.Metric.SETTINGS.metricName()),
metrics.contains(NodesInfoRequest.Metric.OS.metricName()),
metrics.contains(NodesInfoRequest.Metric.PROCESS.metricName()),
metrics.contains(NodesInfoRequest.Metric.JVM.metricName()),
metrics.contains(NodesInfoRequest.Metric.THREAD_POOL.metricName()),
metrics.contains(NodesInfoRequest.Metric.TRANSPORT.metricName()),
metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()),
metrics.contains(NodesInfoRequest.Metric.REMOTE_CLUSTER_SERVER.metricName()),
metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()),
metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INDICES.metricName())
metrics.contains(NodesInfoMetrics.Metric.SETTINGS.metricName()),
metrics.contains(NodesInfoMetrics.Metric.OS.metricName()),
metrics.contains(NodesInfoMetrics.Metric.PROCESS.metricName()),
metrics.contains(NodesInfoMetrics.Metric.JVM.metricName()),
metrics.contains(NodesInfoMetrics.Metric.THREAD_POOL.metricName()),
metrics.contains(NodesInfoMetrics.Metric.TRANSPORT.metricName()),
metrics.contains(NodesInfoMetrics.Metric.HTTP.metricName()),
metrics.contains(NodesInfoMetrics.Metric.REMOTE_CLUSTER_SERVER.metricName()),
metrics.contains(NodesInfoMetrics.Metric.PLUGINS.metricName()),
metrics.contains(NodesInfoMetrics.Metric.INGEST.metricName()),
metrics.contains(NodesInfoMetrics.Metric.AGGREGATIONS.metricName()),
metrics.contains(NodesInfoMetrics.Metric.INDICES.metricName())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -108,7 +109,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
threadContext.markAsSystemContext();
if (request.remoteClusterServer) {
final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().clear()
.addMetrics(NodesInfoRequest.Metric.REMOTE_CLUSTER_SERVER.metricName());
.addMetrics(NodesInfoMetrics.Metric.REMOTE_CLUSTER_SERVER.metricName());
transportService.sendRequest(
transportService.getLocalNode(),
NodesInfoAction.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -66,7 +67,7 @@ protected void masterOperation(Task task, PutPipelineRequest request, ClusterSta
ingestService.putPipeline(request, listener, (nodeListener) -> {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
nodesInfoRequest.addMetric(NodesInfoMetrics.Metric.INGEST.metricName());
client.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener);
});
}
Expand Down
Loading

0 comments on commit a698f4d

Please sign in to comment.