From f3beefc03d793e474d015799efa4b6456fc5a555 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 11 Aug 2020 09:39:17 +0100 Subject: [PATCH] DFA Get Stats can return multiple responses if more than one error occurs (#60900) If the search for get stats with multiple job Ids fails the listener is called for each failure. This change waits for all responses then returns the first error if there was one. --- ...sportGetDataFrameAnalyticsStatsAction.java | 14 ++++++++++++- .../action/TransportGetJobsStatsAction.java | 20 +++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 06a79c44ec6b4..65441ecfed1f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -167,6 +168,7 @@ void gatherStatsForStoppedTasks(List configs, GetDataF AtomicInteger counter = new AtomicInteger(stoppedConfigs.size()); AtomicArray jobStats = new AtomicArray<>(stoppedConfigs.size()); + AtomicReference searchException = new AtomicReference<>(); for (int i = 0; i < stoppedConfigs.size(); i++) { final int slot = i; DataFrameAnalyticsConfig config = stoppedConfigs.get(i); @@ -174,6 +176,10 @@ void gatherStatsForStoppedTasks(List configs, GetDataF stats -> { jobStats.set(slot, stats); if (counter.decrementAndGet() == 0) { + if (searchException.get() != null) { + listener.onFailure(searchException.get()); + return; + } List allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results()); allTasksStats.addAll(jobStats.asList()); Collections.sort(allTasksStats, Comparator.comparing(Stats::getId)); @@ -181,7 +187,13 @@ void gatherStatsForStoppedTasks(List configs, GetDataF allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); } }, - listener::onFailure) + e -> { + // take the first error + searchException.compareAndSet(null, e); + if (counter.decrementAndGet() == 0) { + listener.onFailure(e); + } + }) ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index b96e6ca99c8b0..bab56bcc2753c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -46,6 +46,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -143,7 +144,17 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc } AtomicInteger counter = new AtomicInteger(closedJobIds.size()); + AtomicReference searchException = new AtomicReference<>(); AtomicArray jobStats = new AtomicArray<>(closedJobIds.size()); + + Consumer errorHandler = e -> { + // take the first error + searchException.compareAndSet(null, e); + if (counter.decrementAndGet() == 0) { + listener.onFailure(e); + } + }; + PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE); for (int i = 0; i < closedJobIds.size(); i++) { int slot = i; @@ -159,14 +170,19 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState, null, assignmentExplanation, null, timingStats)); if (counter.decrementAndGet() == 0) { + if (searchException.get() != null) { + // there was an error + listener.onFailure(searchException.get()); + return; + } List results = response.getResponse().results(); results.addAll(jobStats.asList()); Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId)); listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); } - }, listener::onFailure); - }, listener::onFailure); + }, errorHandler); + }, errorHandler); } }