From 4379a3c52b79f66d87472df9f804eddbffae10dc Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 2 Oct 2019 08:58:56 +0100 Subject: [PATCH] [ML] Throttle the delete-by-query of expired results (#47177) Due to #47003 many clusters will have built up a large backlog of expired results. On upgrading to a version where that bug is fixed users could find that the first ML daily maintenance task deletes a very large amount of documents. This change introduces throttling to the delete-by-query that the ML daily maintenance uses to delete expired results to limit it to deleting an average 200 documents per second. (There is no throttling for state/forecast documents as these are expected to be lower volume.) Additionally a rough time limit of 8 hours is applied to the whole delete expired data action. (This is only rough as it won't stop part way through a single operation - it only checks the timeout between operations.) Relates #47103 --- .../ml/action/DeleteExpiredDataAction.java | 4 + .../xpack/ml/MlDailyMaintenanceService.java | 8 +- .../TransportDeleteExpiredDataAction.java | 51 +++++++-- .../ml/job/persistence/JobDataDeleter.java | 14 +++ .../AbstractExpiredJobDataRemover.java | 18 ++- .../retention/ExpiredForecastsRemover.java | 20 +++- .../ExpiredModelSnapshotsRemover.java | 3 +- .../job/retention/ExpiredResultsRemover.java | 9 ++ .../xpack/ml/job/retention/MlDataRemover.java | 4 +- .../ml/job/retention/UnusedStateRemover.java | 18 ++- ...TransportDeleteExpiredDataActionTests.java | 104 ++++++++++++++++++ .../AbstractExpiredJobDataRemoverTests.java | 44 ++++++-- .../ExpiredModelSnapshotsRemoverTests.java | 44 ++++++-- .../retention/ExpiredResultsRemoverTests.java | 25 ++++- 14 files changed, 316 insertions(+), 50 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java index 5810b946e96ed..eb2d3e94c9084 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java @@ -65,6 +65,10 @@ public Response(StreamInput in) throws IOException { deleted = in.readBoolean(); } + public boolean isDeleted() { + return deleted; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(deleted); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index dd95d0a5e9b30..f8cc3fd35a644 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -116,7 +116,13 @@ private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( - response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), + response -> { + if (response.isDeleted()) { + LOGGER.info("Successfully completed [ML] maintenance tasks"); + } else { + LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great"); + } + }, e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); scheduleNext(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index c33dd591a9120..c799d57d74f6b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -26,34 +26,54 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.function.Supplier; public class TransportDeleteExpiredDataAction extends HandledTransportAction { + // TODO: make configurable in the request + static final Duration MAX_DURATION = Duration.ofHours(8); + private final ThreadPool threadPool; + private final String executor; private final Client client; private final ClusterService clusterService; + private final Clock clock; @Inject public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, Client client, ClusterService clusterService) { - super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new); + this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService, + Clock.systemUTC()); + } + + TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService, + ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) { + super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor); this.threadPool = threadPool; + this.executor = executor; this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN); this.clusterService = clusterService; + this.clock = clock; } @Override protected void doExecute(Task task, DeleteExpiredDataAction.Request request, ActionListener listener) { logger.info("Deleting expired data"); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener)); + Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION); + Supplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier)); } - private void deleteExpiredData(ActionListener listener) { + private void deleteExpiredData(ActionListener listener, + Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( new ExpiredResultsRemover(client, auditor), @@ -62,25 +82,32 @@ private void deleteExpiredData(ActionListener new UnusedStateRemover(client, clusterService) ); Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); - deleteExpiredData(dataRemoversIterator, listener); + deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true); } - private void deleteExpiredData(Iterator mlDataRemoversIterator, - ActionListener listener) { - if (mlDataRemoversIterator.hasNext()) { + void deleteExpiredData(Iterator mlDataRemoversIterator, + ActionListener listener, + Supplier isTimedOutSupplier, + boolean haveAllPreviousDeletionsCompleted) { + if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); ActionListener nextListener = ActionListener.wrap( - booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure); + booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse), + listener::onFailure); // Removing expired ML data and artifacts requires multiple operations. // These are queued up and executed sequentially in the action listener, // the chained calls must all run the ML utility thread pool NOT the thread // the previous action returned in which in the case of a transport_client_boss // thread is a disaster. - remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener, - false)); + remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), + isTimedOutSupplier); } else { - logger.info("Completed deletion of expired data"); - listener.onResponse(new DeleteExpiredDataAction.Response(true)); + if (haveAllPreviousDeletionsCompleted) { + logger.info("Completed deletion of expired ML data"); + } else { + logger.info("Halted deletion of expired ML data until next invocation"); + } + listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted)); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 07c75f75f7e93..b1439849f98ce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -79,6 +80,9 @@ public void deleteModelSnapshots(List modelSnapshots, ActionListe .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0]))); + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + try { executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener); } catch (Exception e) { @@ -101,6 +105,10 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener li .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); deleteByQueryHolder.dbqRequest.setQuery(query); + + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); } @@ -116,6 +124,9 @@ public void deleteInterimResults() { QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true); deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb)); + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get(); } catch (Exception e) { @@ -134,6 +145,9 @@ public void deleteDatafeedTimingStats(ActionListener liste .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId))); + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index be50114fc46e0..2650f3018d951 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -40,11 +41,12 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { } @Override - public void remove(ActionListener listener) { - removeData(newJobIterator(), listener); + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + removeData(newJobIterator(), listener, isTimedOutSupplier); } - private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener) { + private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener, + Supplier isTimedOutSupplier) { if (jobIterator.hasNext() == false) { listener.onResponse(true); return; @@ -56,13 +58,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener removeData(jobIterator, listener), listener::onFailure)); + removeDataBefore(job, cutoffEpochMs, + ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); } private WrappedBatchedJobsIterator newJobIterator() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 3556a3bc9a961..a80b00aaa0792 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -44,6 +45,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Supplier; /** * Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired. @@ -71,10 +73,10 @@ public ExpiredForecastsRemover(Client client, ThreadPool threadPool) { } @Override - public void remove(ActionListener listener) { + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs); ActionListener forecastStatsHandler = ActionListener.wrap( - searchResponse -> deleteForecasts(searchResponse, listener), + searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier), e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e))); SearchSourceBuilder source = new SearchSourceBuilder(); @@ -84,13 +86,16 @@ public void remove(ActionListener listener) { source.size(MAX_FORECASTS); source.trackTotalHits(true); + // _doc is the most efficient sort order and will also disable scoring + source.sort(ElasticsearchMappings.ES_DOC); + SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN); searchRequest.source(source); client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); } - private void deleteForecasts(SearchResponse searchResponse, ActionListener listener) { + private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { List forecastsToDelete; try { forecastsToDelete = findForecastsToDelete(searchResponse); @@ -99,6 +104,11 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener() { @Override @@ -157,6 +167,10 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); + + // _doc is the most efficient sort order and will also disable scoring + request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + return request; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 74ff0c8dc8776..1153407d5125e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; @@ -88,7 +89,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 3cdcdff2ed9c6..6a17382db0e8c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; @@ -88,6 +89,11 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + // Delete the documents gradually. + // With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes. + request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE); + request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5); + request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE); @@ -95,6 +101,9 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) .mustNot(excludeFilter); request.setQuery(query); + + // _doc is the most efficient sort order and will also disable scoring + request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); return request; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 03238cdfc8815..485d8e9bfa22d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -7,6 +7,8 @@ import org.elasticsearch.action.ActionListener; +import java.util.function.Supplier; + public interface MlDataRemover { - void remove(ActionListener listener); + void remove(ActionListener listener, Supplier isTimedOutSupplier); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 4d2c9b76438ed..c603502afd5d6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; @@ -32,6 +33,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; /** * If for any reason a job is deleted by some of its state documents @@ -51,13 +53,17 @@ public UnusedStateRemover(Client client, ClusterService clusterService) { } @Override - public void remove(ActionListener listener) { + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { try { List unusedStateDocIds = findUnusedStateDocIds(); - if (unusedStateDocIds.size() > 0) { - executeDeleteUnusedStateDocs(unusedStateDocIds, listener); + if (isTimedOutSupplier.get()) { + listener.onResponse(false); } else { - listener.onResponse(true); + if (unusedStateDocIds.size() > 0) { + executeDeleteUnusedStateDocs(unusedStateDocIds, listener); + } else { + listener.onResponse(true); + } } } catch (Exception e) { listener.onFailure(e); @@ -106,6 +112,10 @@ private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListe DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0]))); + + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( response -> { if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java new file mode 100644 index 0000000000000..fab6c8ec67ef7 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; +import org.junit.After; +import org.junit.Before; + +import java.time.Clock; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportDeleteExpiredDataActionTests extends ESTestCase { + + private ThreadPool threadPool; + private TransportDeleteExpiredDataAction transportDeleteExpiredDataAction; + + /** + * A data remover that only checks for timeouts. + */ + private static class DummyDataRemover implements MlDataRemover { + + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + listener.onResponse(isTimedOutSupplier.get() == false); + } + } + + @Before + public void setup() { + threadPool = new TestThreadPool("TransportDeleteExpiredDataActionTests thread pool"); + TransportService transportService = mock(TransportService.class); + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + when(client.threadPool()).thenReturn(threadPool); + ClusterService clusterService = mock(ClusterService.class); + transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService, + new ActionFilters(Collections.emptySet()), client, clusterService, Clock.systemUTC()); + } + + @After + public void teardown() { + threadPool.shutdown(); + } + + public void testDeleteExpiredDataIterationNoTimeout() { + + final int numRemovers = randomIntBetween(2, 5); + + List removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList()); + + AtomicBoolean succeeded = new AtomicBoolean(); + ActionListener finalListener = ActionListener.wrap( + response -> succeeded.set(response.isDeleted()), + e -> fail(e.getMessage()) + ); + + Supplier isTimedOutSupplier = () -> false; + + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + + assertTrue(succeeded.get()); + } + + public void testDeleteExpiredDataIterationWithTimeout() { + + final int numRemovers = randomIntBetween(2, 5); + AtomicInteger removersRemaining = new AtomicInteger(randomIntBetween(0, numRemovers - 1)); + + List removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList()); + + AtomicBoolean succeeded = new AtomicBoolean(); + ActionListener finalListener = ActionListener.wrap( + response -> succeeded.set(response.isDeleted()), + e -> fail(e.getMessage()) + ); + + Supplier isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0); + + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + + assertFalse(succeeded.get()); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 53242f07677a8..c5a24fc9e0609 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -86,25 +86,24 @@ private static SearchResponse createSearchResponse(List to return searchResponse; } - @SuppressWarnings("unchecked") public void testRemoveGivenNoJobs() throws IOException { SearchResponse response = createSearchResponse(Collections.emptyList()); + @SuppressWarnings("unchecked") ActionFuture future = mock(ActionFuture.class); when(future.actionGet()).thenReturn(response); when(client.search(any())).thenReturn(future); TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); - remover.remove(listener); + remover.remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertEquals(remover.getRetentionDaysCallCount, 0); + assertEquals(0, remover.getRetentionDaysCallCount); } - @SuppressWarnings("unchecked") - public void testRemoveGivenMulipleBatches() throws IOException { + public void testRemoveGivenMultipleBatches() throws IOException { // This is testing AbstractExpiredJobDataRemover.WrappedBatchedJobsIterator int totalHits = 7; List responses = new ArrayList<>(); @@ -127,18 +126,45 @@ public void testRemoveGivenMulipleBatches() throws IOException { AtomicInteger searchCount = new AtomicInteger(0); + @SuppressWarnings("unchecked") ActionFuture future = mock(ActionFuture.class); doAnswer(invocationOnMock -> responses.get(searchCount.getAndIncrement())).when(future).actionGet(); when(client.search(any())).thenReturn(future); TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); - remover.remove(listener); + remover.remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertEquals(searchCount.get(), 3); - assertEquals(remover.getRetentionDaysCallCount, 7); + assertEquals(3, searchCount.get()); + assertEquals(7, remover.getRetentionDaysCallCount); + } + + public void testRemoveGivenTimeOut() throws IOException { + + int totalHits = 3; + SearchResponse response = createSearchResponse(Arrays.asList( + JobTests.buildJobBuilder("job1").build(), + JobTests.buildJobBuilder("job2").build(), + JobTests.buildJobBuilder("job3").build() + ), totalHits); + + final int timeoutAfter = randomIntBetween(0, totalHits - 1); + AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); + + @SuppressWarnings("unchecked") + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); + + TestListener listener = new TestListener(); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); + remover.remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + + listener.waitToCompletion(); + assertThat(listener.success, is(false)); + assertEquals(timeoutAfter, remover.getRetentionDaysCallCount); } static class TestListener implements ActionListener { @@ -157,7 +183,7 @@ public void onFailure(Exception e) { latch.countDown(); } - public void waitToCompletion() { + void waitToCompletion() { try { latch.await(3, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 977828ac21075..56c2333cae016 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -31,7 +31,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; import static org.hamcrest.Matchers.equalTo; @@ -69,7 +71,7 @@ public void setUpTests() { } @After - public void shutdownThreadPool() throws InterruptedException { + public void shutdownThreadPool() { terminate(threadPool); } @@ -80,7 +82,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { JobTests.buildJobBuilder("bar").build() )); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -90,9 +92,9 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { givenClientRequestsSucceed(); - givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); + givenJobs(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -110,11 +112,11 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -137,6 +139,28 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); } + public void testRemove_GivenTimeout() throws IOException { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + )); + + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + createModelSnapshot("snapshots-1", "snapshots-1_2")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + + final int timeoutAfter = randomIntBetween(0, 1); + AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); + + createExpiredModelSnapshotsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + + listener.waitToCompletion(); + assertThat(listener.success, is(false)); + } + public void testRemove_GivenClientSearchRequestsFail() throws IOException { givenClientSearchRequestsFail(); givenJobs(Arrays.asList( @@ -147,11 +171,11 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException { List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); @@ -173,11 +197,11 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 9c34c44856175..f5acae02b4f87 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; @@ -70,7 +71,7 @@ public void testRemove_GivenNoJobs() throws IOException { givenClientRequestsSucceed(); givenJobs(Collections.emptyList()); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); verify(listener).onResponse(true); verify(client).search(any()); @@ -84,7 +85,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); verify(listener).onResponse(true); verify(client).search(any()); @@ -99,7 +100,7 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); @@ -109,6 +110,22 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception verify(listener).onResponse(true); } + public void testRemove_GivenTimeout() throws Exception { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() + )); + + final int timeoutAfter = randomIntBetween(0, 1); + AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); + + createExpiredResultsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + + assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter)); + verify(listener).onResponse(false); + } + public void testRemove_GivenClientRequestsFailed() throws IOException { givenClientRequestsFailed(); givenJobs(Arrays.asList( @@ -117,7 +134,7 @@ public void testRemove_GivenClientRequestsFailed() throws IOException { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);