From 03546f1d621b03bf395140c3f7857c352a31d43a Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 21 Aug 2019 15:01:45 -0500 Subject: [PATCH 1/2] [ML] Adding data frame analytics stats to _usage API --- .../ml/MachineLearningFeatureSetUsage.java | 30 +++++++++---- .../MachineLearningUsageTransportAction.java | 42 +++++++++++++++---- ...chineLearningInfoTransportActionTests.java | 32 ++++++++++++++ 3 files changed, 89 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java index da38d1d2903a8..2437f00337842 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -12,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackField; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -26,16 +28,23 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { public static final String MODEL_SIZE = "model_size"; public static final String CREATED_BY = "created_by"; public static final String NODE_COUNT = "node_count"; + public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs"; private final Map jobsUsage; private final Map datafeedsUsage; + private final Map analyticsUsage; private final int nodeCount; - public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map jobsUsage, - Map datafeedsUsage, int nodeCount) { + public MachineLearningFeatureSetUsage(boolean available, + boolean enabled, + Map jobsUsage, + Map datafeedsUsage, + Map analyticsUsage, + int nodeCount) { super(XPackField.MACHINE_LEARNING, available, enabled); this.jobsUsage = Objects.requireNonNull(jobsUsage); this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage); + this.analyticsUsage = Objects.requireNonNull(analyticsUsage); this.nodeCount = nodeCount; } @@ -43,6 +52,11 @@ public MachineLearningFeatureSetUsage(StreamInput in) throws IOException { super(in); this.jobsUsage = in.readMap(); this.datafeedsUsage = in.readMap(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.analyticsUsage = in.readMap(); + } else { + this.analyticsUsage = Collections.emptyMap(); + } this.nodeCount = in.readInt(); } @@ -51,18 +65,18 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeMap(jobsUsage); out.writeMap(datafeedsUsage); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeMap(analyticsUsage); + } out.writeInt(nodeCount); } @Override protected void innerXContent(XContentBuilder builder, Params params) throws IOException { super.innerXContent(builder, params); - if (jobsUsage != null) { - builder.field(JOBS_FIELD, jobsUsage); - } - if (datafeedsUsage != null) { - builder.field(DATAFEEDS_FIELD, datafeedsUsage); - } + builder.field(JOBS_FIELD, jobsUsage); + builder.field(DATAFEEDS_FIELD, datafeedsUsage); + builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage); if (nodeCount >= 0) { builder.field(NODE_COUNT, nodeCount); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java index be9c8d4c4fd6d..ab83d6943c93e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java @@ -26,9 +26,11 @@ import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse; import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -69,22 +71,34 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat ActionListener listener) { if (enabled == false) { MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), enabled, - Collections.emptyMap(), Collections.emptyMap(), 0); + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0); listener.onResponse(new XPackUsageFeatureResponse(usage)); return; } Map jobsUsage = new LinkedHashMap<>(); Map datafeedsUsage = new LinkedHashMap<>(); + Map analyticsUsage = new LinkedHashMap<>(); int nodeCount = mlNodeCount(state); + // Step 3. Extract usage from data frame analytics stats and return usage response + ActionListener dataframeAnalyticsListener = ActionListener.wrap( + response -> { + addDataFrameAnalyticsUsage(response, analyticsUsage); + MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), + enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount); + listener.onResponse(new XPackUsageFeatureResponse(usage)); + }, + listener::onFailure + ); + // Step 2. Extract usage from datafeeds stats and return usage response ActionListener datafeedStatsListener = ActionListener.wrap(response -> { addDatafeedsUsage(response, datafeedsUsage); - MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), - enabled, jobsUsage, datafeedsUsage, nodeCount); - listener.onResponse(new XPackUsageFeatureResponse(usage)); + GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest = + new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL); + client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener); }, listener::onFailure); @@ -184,19 +198,33 @@ private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response, Map Counter.newCounter()).addAndGet(1); } - datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count())); + datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count())); for (DatafeedState datafeedState : datafeedCountByState.keySet()) { datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT), - createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get())); + createCountUsageEntry(datafeedCountByState.get(datafeedState).get())); } } - private Map createDatafeedUsageEntry(long count) { + private Map createCountUsageEntry(long count) { Map usage = new HashMap<>(); usage.put(MachineLearningFeatureSetUsage.COUNT, count); return usage; } + private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response, + Map dataframeAnalyticsUsage) { + Map dataFrameAnalyticsStateCounterMap = new HashMap<>(); + + for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) { + dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1); + } + dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count())); + for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) { + dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT), + createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get())); + } + } + private static int mlNodeCount(final ClusterState clusterState) { int mlNodeCount = 0; for (DiscoveryNode node : clusterState.getNodes()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java index b3d4cc4f94d59..b2f69158aca05 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java @@ -34,10 +34,13 @@ import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -95,6 +98,7 @@ public void init() { when(clusterService.state()).thenReturn(clusterState); givenJobs(Collections.emptyList(), Collections.emptyList()); givenDatafeeds(Collections.emptyList()); + givenDataFrameAnalytics(Collections.emptyList()); } private MachineLearningUsageTransportAction newUsageAction(Settings settings) { @@ -165,6 +169,11 @@ public void testUsage() throws Exception { buildDatafeedStats(DatafeedState.STARTED), buildDatafeedStats(DatafeedState.STOPPED) )); + givenDataFrameAnalytics(Arrays.asList( + buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED), + buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED), + buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED) + )); var usageAction = newUsageAction(settings.build()); PlainActionFuture future = new PlainActionFuture<>(); @@ -230,6 +239,10 @@ public void testUsage() throws Exception { assertThat(source.getValue("datafeeds.started.count"), equalTo(2)); assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1)); + assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3)); + assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1)); + assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2)); + assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11)); assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2)); @@ -391,6 +404,19 @@ private void givenDatafeeds(List }).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any()); } + private void givenDataFrameAnalytics(List dataFrameAnalyticsStats) { + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response( + new QueryPage<>(dataFrameAnalyticsStats, + dataFrameAnalyticsStats.size(), + GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); + return Void.TYPE; + }).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any()); + } + private static Detector buildMinDetector(String fieldName) { Detector.Builder detectorBuilder = new Detector.Builder(); detectorBuilder.setFunction("min"); @@ -431,6 +457,12 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats return stats; } + private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) { + GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class); + when(stats.getState()).thenReturn(state); + return stats; + } + private static ForecastStats buildForecastStats(long numberOfForecasts) { return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts); } From 9e954d792cb37b78f43900bad67dc5988d350f26 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 22 Aug 2019 09:49:24 -0500 Subject: [PATCH 2/2] making the size of analytics stats 10k --- .../xpack/ml/MachineLearningUsageTransportAction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java index ab83d6943c93e..ab815e17fe0c8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse; import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction; +import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; @@ -98,6 +99,7 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat addDatafeedsUsage(response, datafeedsUsage); GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL); + dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000)); client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener); }, listener::onFailure);