Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Adding data frame analytics stats to _usage API #45820

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
*/
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

Expand All @@ -26,23 +28,35 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
public static final String MODEL_SIZE = "model_size";
public static final String CREATED_BY = "created_by";
public static final String NODE_COUNT = "node_count";
public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs";

private final Map<String, Object> jobsUsage;
private final Map<String, Object> datafeedsUsage;
private final Map<String, Object> analyticsUsage;
private final int nodeCount;

public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
Map<String, Object> datafeedsUsage, int nodeCount) {
public MachineLearningFeatureSetUsage(boolean available,
boolean enabled,
Map<String, Object> jobsUsage,
Map<String, Object> datafeedsUsage,
Map<String, Object> analyticsUsage,
int nodeCount) {
super(XPackField.MACHINE_LEARNING, available, enabled);
this.jobsUsage = Objects.requireNonNull(jobsUsage);
this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
this.analyticsUsage = Objects.requireNonNull(analyticsUsage);
this.nodeCount = nodeCount;
}

public MachineLearningFeatureSetUsage(StreamInput in) throws IOException {
super(in);
this.jobsUsage = in.readMap();
this.datafeedsUsage = in.readMap();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.analyticsUsage = in.readMap();
} else {
this.analyticsUsage = Collections.emptyMap();
}
this.nodeCount = in.readInt();
}

Expand All @@ -51,18 +65,18 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(jobsUsage);
out.writeMap(datafeedsUsage);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeMap(analyticsUsage);
}
out.writeInt(nodeCount);
}

@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
if (jobsUsage != null) {
builder.field(JOBS_FIELD, jobsUsage);
}
if (datafeedsUsage != null) {
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
}
builder.field(JOBS_FIELD, jobsUsage);
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage);
if (nodeCount >= 0) {
builder.field(NODE_COUNT, nodeCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
Expand Down Expand Up @@ -69,22 +71,34 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
ActionListener<XPackUsageFeatureResponse> listener) {
if (enabled == false) {
MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), enabled,
Collections.emptyMap(), Collections.emptyMap(), 0);
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0);
listener.onResponse(new XPackUsageFeatureResponse(usage));
return;
}

Map<String, Object> jobsUsage = new LinkedHashMap<>();
Map<String, Object> datafeedsUsage = new LinkedHashMap<>();
Map<String, Object> analyticsUsage = new LinkedHashMap<>();
int nodeCount = mlNodeCount(state);

// Step 3. Extract usage from data frame analytics stats and return usage response
ActionListener<GetDataFrameAnalyticsStatsAction.Response> dataframeAnalyticsListener = ActionListener.wrap(
response -> {
addDataFrameAnalyticsUsage(response, analyticsUsage);
MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(),
enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount);
listener.onResponse(new XPackUsageFeatureResponse(usage));
},
listener::onFailure
);

// Step 2. Extract usage from datafeeds stats and return usage response
ActionListener<GetDatafeedsStatsAction.Response> datafeedStatsListener =
ActionListener.wrap(response -> {
addDatafeedsUsage(response, datafeedsUsage);
MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(),
enabled, jobsUsage, datafeedsUsage, nodeCount);
listener.onResponse(new XPackUsageFeatureResponse(usage));
GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest =
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL);
client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener);
},
listener::onFailure);

Expand Down Expand Up @@ -184,19 +198,33 @@ private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response, Map<St
ds -> Counter.newCounter()).addAndGet(1);
}

datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count()));
datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
for (DatafeedState datafeedState : datafeedCountByState.keySet()) {
datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT),
createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get()));
createCountUsageEntry(datafeedCountByState.get(datafeedState).get()));
}
}

private Map<String, Object> createDatafeedUsageEntry(long count) {
private Map<String, Object> createCountUsageEntry(long count) {
Map<String, Object> usage = new HashMap<>();
usage.put(MachineLearningFeatureSetUsage.COUNT, count);
return usage;
}

private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response,
Map<String, Object> dataframeAnalyticsUsage) {
Map<DataFrameAnalyticsState, Counter> dataFrameAnalyticsStateCounterMap = new HashMap<>();

for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) {
dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1);
}
dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) {
dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT),
createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get()));
}
}

private static int mlNodeCount(final ClusterState clusterState) {
int mlNodeCount = 0;
for (DiscoveryNode node : clusterState.getNodes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
Expand Down Expand Up @@ -95,6 +98,7 @@ public void init() {
when(clusterService.state()).thenReturn(clusterState);
givenJobs(Collections.emptyList(), Collections.emptyList());
givenDatafeeds(Collections.emptyList());
givenDataFrameAnalytics(Collections.emptyList());
}

private MachineLearningUsageTransportAction newUsageAction(Settings settings) {
Expand Down Expand Up @@ -165,6 +169,11 @@ public void testUsage() throws Exception {
buildDatafeedStats(DatafeedState.STARTED),
buildDatafeedStats(DatafeedState.STOPPED)
));
givenDataFrameAnalytics(Arrays.asList(
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED)
));

var usageAction = newUsageAction(settings.build());
PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
Expand Down Expand Up @@ -230,6 +239,10 @@ public void testUsage() throws Exception {
assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));

assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3));
assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1));
assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2));

assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11));
assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2));

Expand Down Expand Up @@ -391,6 +404,19 @@ private void givenDatafeeds(List<GetDatafeedsStatsAction.Response.DatafeedStats>
}).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any());
}

private void givenDataFrameAnalytics(List<GetDataFrameAnalyticsStatsAction.Response.Stats> dataFrameAnalyticsStats) {
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener =
(ActionListener<GetDataFrameAnalyticsStatsAction.Response>) invocationOnMock.getArguments()[2];
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(
new QueryPage<>(dataFrameAnalyticsStats,
dataFrameAnalyticsStats.size(),
GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
return Void.TYPE;
}).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any());
}

private static Detector buildMinDetector(String fieldName) {
Detector.Builder detectorBuilder = new Detector.Builder();
detectorBuilder.setFunction("min");
Expand Down Expand Up @@ -431,6 +457,12 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats
return stats;
}

private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class);
when(stats.getState()).thenReturn(state);
return stats;
}

private static ForecastStats buildForecastStats(long numberOfForecasts) {
return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts);
}
Expand Down