From b576a6aa7786beee1b4ce39a97c210a520ffb0c4 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 2 Oct 2019 08:58:56 +0100 Subject: [PATCH] [ML] Throttle the delete-by-query of expired results (#47177) Due to #47003 many clusters will have built up a large backlog of expired results. On upgrading to a version where that bug is fixed users could find that the first ML daily maintenance task deletes a very large amount of documents. This change introduces throttling to the delete-by-query that the ML daily maintenance uses to delete expired results to limit it to deleting an average 200 documents per second. (There is no throttling for state/forecast documents as these are expected to be lower volume.) Additionally a rough time limit of 8 hours is applied to the whole delete expired data action. (This is only rough as it won't stop part way through a single operation - it only checks the timeout between operations.) Relates #47103 --- .../ml/action/DeleteExpiredDataAction.java | 4 + .../xpack/ml/MlDailyMaintenanceService.java | 8 +- .../TransportDeleteExpiredDataAction.java | 52 +++++++-- .../ml/job/persistence/JobDataDeleter.java | 11 ++ .../AbstractExpiredJobDataRemover.java | 18 ++- .../retention/ExpiredForecastsRemover.java | 20 +++- .../ExpiredModelSnapshotsRemover.java | 3 +- .../job/retention/ExpiredResultsRemover.java | 10 ++ .../xpack/ml/job/retention/MlDataRemover.java | 4 +- .../ml/job/retention/UnusedStateRemover.java | 17 ++- ...TransportDeleteExpiredDataActionTests.java | 107 ++++++++++++++++++ .../AbstractExpiredJobDataRemoverTests.java | 48 ++++++-- .../ExpiredModelSnapshotsRemoverTests.java | 35 +++++- .../retention/ExpiredResultsRemoverTests.java | 25 +++- 14 files changed, 316 insertions(+), 46 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java index aeacf68fe18c7..d57b3cc3615f8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java @@ -75,6 +75,10 @@ public void readFrom(StreamInput in) throws IOException { deleted = in.readBoolean(); } + public boolean isDeleted() { + return deleted; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index d58ebe5bf9023..bf7f3b70a0c34 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -111,7 +111,13 @@ private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( - response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), + response -> { + if (response.isDeleted()) { + LOGGER.info("Successfully completed [ML] maintenance tasks"); + } else { + LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great"); + } + }, e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); scheduleNext(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index dcf5960a2f0bc..20ed5e05a0087 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -27,33 +27,54 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.function.Supplier; public class TransportDeleteExpiredDataAction extends HandledTransportAction { + // TODO: make configurable in the request + static final Duration MAX_DURATION = Duration.ofHours(8); + + private final String executor; private final Client client; private final ClusterService clusterService; + private final Clock clock; @Inject public TransportDeleteExpiredDataAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService) { + this(settings, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, indexNameExpressionResolver, + client, clusterService, Clock.systemUTC()); + } + + TransportDeleteExpiredDataAction(Settings settings, ThreadPool threadPool, String executor, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, + ClusterService clusterService, Clock clock) { super(settings, DeleteExpiredDataAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, - DeleteExpiredDataAction.Request::new); + DeleteExpiredDataAction.Request::new, executor); + this.executor = executor; this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN); this.clusterService = clusterService; + this.clock = clock; } @Override protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener listener) { logger.info("Deleting expired data"); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener)); + Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION); + Supplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier)); } - private void deleteExpiredData(ActionListener listener) { + private void deleteExpiredData(ActionListener listener, + Supplier isTimedOutSupplier) { Auditor auditor = new Auditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( new ExpiredResultsRemover(client, clusterService, auditor), @@ -62,25 +83,32 @@ private void deleteExpiredData(ActionListener new UnusedStateRemover(client, clusterService) ); Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); - deleteExpiredData(dataRemoversIterator, listener); + deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true); } - private void deleteExpiredData(Iterator mlDataRemoversIterator, - ActionListener listener) { - if (mlDataRemoversIterator.hasNext()) { + void deleteExpiredData(Iterator mlDataRemoversIterator, + ActionListener listener, + Supplier isTimedOutSupplier, + boolean haveAllPreviousDeletionsCompleted) { + if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); ActionListener nextListener = ActionListener.wrap( - booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure); + booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse), + listener::onFailure); // Removing expired ML data and artifacts requires multiple operations. // These are queued up and executed sequentially in the action listener, // the chained calls must all run the ML utility thread pool NOT the thread // the previous action returned in which in the case of a transport_client_boss // thread is a disaster. - remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener, - false)); + remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), + isTimedOutSupplier); } else { - logger.info("Completed deletion of expired data"); - listener.onResponse(new DeleteExpiredDataAction.Response(true)); + if (haveAllPreviousDeletionsCompleted) { + logger.info("Completed deletion of expired ML data"); + } else { + logger.info("Halted deletion of expired ML data until next invocation"); + } + listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted)); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 333d9447ba698..6d5e31bf04a32 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -79,6 +80,9 @@ public void deleteModelSnapshots(List modelSnapshots, ActionListe .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0]))); + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + try { executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener); } catch (Exception e) { @@ -101,6 +105,10 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener li .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); deleteByQueryHolder.dbqRequest.setQuery(query); + + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); } @@ -116,6 +124,9 @@ public void deleteInterimResults() { QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true); deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb)); + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get(); } catch (Exception e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index eda8f9a6a95a5..84c86b136cf9c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -50,11 +51,12 @@ protected Client getClient() { } @Override - public void remove(ActionListener listener) { - removeData(newJobIterator(), listener); + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + removeData(newJobIterator(), listener, isTimedOutSupplier); } - private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener) { + private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener, + Supplier isTimedOutSupplier) { if (jobIterator.hasNext() == false) { listener.onResponse(true); return; @@ -66,13 +68,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener removeData(jobIterator, listener), listener::onFailure)); + removeDataBefore(job, cutoffEpochMs, + ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure)); } private WrappedBatchedJobsIterator newJobIterator() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 981d257afa1a0..bce61cb4c23eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Supplier; /** * Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired. @@ -70,10 +72,10 @@ public ExpiredForecastsRemover(Client client, ThreadPool threadPool) { } @Override - public void remove(ActionListener listener) { + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs); ActionListener forecastStatsHandler = ActionListener.wrap( - searchResponse -> deleteForecasts(searchResponse, listener), + searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier), e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e))); SearchSourceBuilder source = new SearchSourceBuilder(); @@ -82,13 +84,16 @@ public void remove(ActionListener listener) { .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); + // _doc is the most efficient sort order and will also disable scoring + source.sort(ElasticsearchMappings.ES_DOC); + SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN); searchRequest.source(source); client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); } - private void deleteForecasts(SearchResponse searchResponse, ActionListener listener) { + private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { List forecastsToDelete; try { forecastsToDelete = findForecastsToDelete(searchResponse); @@ -97,6 +102,11 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener() { @Override @@ -155,6 +165,10 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); + + // _doc is the most efficient sort order and will also disable scoring + request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + return request; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 0114bd322f95c..7f46f63516d73 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; @@ -87,7 +88,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 64cf7550ee362..afe9aa9fd736b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; @@ -87,6 +88,12 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); + // Delete the documents gradually. + // With batch size 1000 and 200 requests per second this implies we spread + // deletion of 1 million documents over 5000 seconds ~= 83 minutes. + request.setBatchSize(1000); + request.setRequestsPerSecond(200); + request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE); @@ -94,6 +101,9 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) .mustNot(excludeFilter); request.setQuery(query); + + // _doc is the most efficient sort order and will also disable scoring + request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); return request; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 03238cdfc8815..485d8e9bfa22d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -7,6 +7,8 @@ import org.elasticsearch.action.ActionListener; +import java.util.function.Supplier; + public interface MlDataRemover { - void remove(ActionListener listener); + void remove(ActionListener listener, Supplier isTimedOutSupplier); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 249d3761b5842..f8c630e271e56 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; /** * If for any reason a job is deleted by some of its state documents @@ -52,13 +53,17 @@ public UnusedStateRemover(Client client, ClusterService clusterService) { } @Override - public void remove(ActionListener listener) { + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { try { List unusedStateDocIds = findUnusedStateDocIds(); - if (unusedStateDocIds.size() > 0) { - executeDeleteUnusedStateDocs(unusedStateDocIds, listener); + if (isTimedOutSupplier.get()) { + listener.onResponse(false); } else { - listener.onResponse(true); + if (unusedStateDocIds.size() > 0) { + executeDeleteUnusedStateDocs(unusedStateDocIds, listener); + } else { + listener.onResponse(true); + } } } catch (Exception e) { listener.onFailure(e); @@ -108,6 +113,10 @@ private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListe .types(ElasticsearchMappings.DOC_TYPE) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0]))); + + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( response -> { if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java new file mode 100644 index 0000000000000..3b267e8a32aff --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; +import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; +import org.junit.After; +import org.junit.Before; + +import java.time.Clock; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportDeleteExpiredDataActionTests extends ESTestCase { + + private ThreadPool threadPool; + private TransportDeleteExpiredDataAction transportDeleteExpiredDataAction; + + /** + * A data remover that only checks for timeouts. + */ + private static class DummyDataRemover implements MlDataRemover { + + public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + listener.onResponse(isTimedOutSupplier.get() == false); + } + } + + @Before + public void setup() { + threadPool = new TestThreadPool("TransportDeleteExpiredDataActionTests thread pool"); + TransportService transportService = mock(TransportService.class); + IndexNameExpressionResolver indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + when(client.threadPool()).thenReturn(threadPool); + ClusterService clusterService = mock(ClusterService.class); + transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(Settings.EMPTY, threadPool, ThreadPool.Names.SAME, + transportService, new ActionFilters(Collections.emptySet()), indexNameExpressionResolver, client, clusterService, + Clock.systemUTC()); + } + + @After + public void teardown() { + threadPool.shutdown(); + } + + public void testDeleteExpiredDataIterationNoTimeout() { + + final int numRemovers = randomIntBetween(2, 5); + + List removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList()); + + AtomicBoolean succeeded = new AtomicBoolean(); + ActionListener finalListener = ActionListener.wrap( + response -> succeeded.set(response.isDeleted()), + e -> fail(e.getMessage()) + ); + + Supplier isTimedOutSupplier = () -> false; + + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + + assertTrue(succeeded.get()); + } + + public void testDeleteExpiredDataIterationWithTimeout() { + + final int numRemovers = randomIntBetween(2, 5); + AtomicInteger removersRemaining = new AtomicInteger(randomIntBetween(0, numRemovers - 1)); + + List removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList()); + + AtomicBoolean succeeded = new AtomicBoolean(); + ActionListener finalListener = ActionListener.wrap( + response -> succeeded.set(response.isDeleted()), + e -> fail(e.getMessage()) + ); + + Supplier isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0); + + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + + assertFalse(succeeded.get()); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 7eb05916b07c3..5256cb9cc0e35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -95,6 +95,7 @@ private static SearchResponse createSearchResponse(List to public void testRemoveGivenNoJobs() throws IOException { SearchResponse response = createSearchResponse(Collections.emptyList()); + @SuppressWarnings("unchecked") ActionFuture future = mock(ActionFuture.class); when(future.actionGet()).thenReturn(response); when(client.search(any())).thenReturn(future); @@ -104,15 +105,14 @@ public void testRemoveGivenNoJobs() throws IOException { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); - remover.remove(listener); + remover.remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertEquals(remover.getRetentionDaysCallCount, 0); + assertEquals(0, remover.getRetentionDaysCallCount); } - - public void testRemoveGivenMulipleBatches() throws IOException { + public void testRemoveGivenMultipleBatches() throws IOException { ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); when(clusterService.state()).thenReturn(clusterState); @@ -139,18 +139,48 @@ public void testRemoveGivenMulipleBatches() throws IOException { AtomicInteger searchCount = new AtomicInteger(0); + @SuppressWarnings("unchecked") ActionFuture future = mock(ActionFuture.class); doAnswer(invocationOnMock -> responses.get(searchCount.getAndIncrement())).when(future).actionGet(); when(client.search(any())).thenReturn(future); TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); - remover.remove(listener); + remover.remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); - assertEquals(searchCount.get(), 3); - assertEquals(remover.getRetentionDaysCallCount, 7); + assertEquals(3, searchCount.get()); + assertEquals(7, remover.getRetentionDaysCallCount); + } + + public void testRemoveGivenTimeOut() throws IOException { + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + + int totalHits = 3; + SearchResponse response = createSearchResponse(Arrays.asList( + JobTests.buildJobBuilder("job1").build(), + JobTests.buildJobBuilder("job2").build(), + JobTests.buildJobBuilder("job3").build() + ), totalHits); + + final int timeoutAfter = randomIntBetween(0, totalHits - 1); + AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); + + @SuppressWarnings("unchecked") + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); + + TestListener listener = new TestListener(); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); + remover.remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + + listener.waitToCompletion(); + assertThat(listener.success, is(false)); + assertEquals(timeoutAfter, remover.getRetentionDaysCallCount); } public void testIterateOverClusterStateJobs() throws IOException { @@ -173,7 +203,7 @@ public void testIterateOverClusterStateJobs() throws IOException { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); - remover.remove(listener); + remover.remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -196,7 +226,7 @@ public void onFailure(Exception e) { latch.countDown(); } - public void waitToCompletion() { + void waitToCompletion() { try { latch.await(3, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index e5a4b1d14da69..42e97efbe3f13 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; import static org.hamcrest.Matchers.equalTo; @@ -78,7 +79,7 @@ public void setUpTests() { } @After - public void shutdownThreadPool() throws InterruptedException { + public void shutdownThreadPool() { terminate(threadPool); } @@ -89,7 +90,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { JobTests.buildJobBuilder("bar").build() )); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -101,7 +102,7 @@ public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { givenClientRequestsSucceed(); givenJobs(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -123,7 +124,7 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -146,6 +147,28 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); } + public void testRemove_GivenTimeout() throws IOException { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(), + JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() + )); + + List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), + createModelSnapshot("snapshots-1", "snapshots-1_2")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + + final int timeoutAfter = randomIntBetween(0, 1); + AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); + + createExpiredModelSnapshotsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + + listener.waitToCompletion(); + assertThat(listener.success, is(false)); + } + public void testRemove_GivenClientSearchRequestsFail() throws IOException { givenClientSearchRequestsFail(); givenJobs(Arrays.asList( @@ -160,7 +183,7 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException { searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); @@ -186,7 +209,7 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(listener); + createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 5caac27368712..068a2393bb064 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; @@ -79,7 +80,7 @@ public void testRemove_GivenNoJobs() throws IOException { givenClientRequestsSucceed(); givenJobs(Collections.emptyList()); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); verify(listener).onResponse(true); verify(client).search(any()); @@ -93,7 +94,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); verify(listener).onResponse(true); verify(client).search(any()); @@ -108,7 +109,7 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); @@ -118,6 +119,22 @@ public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception verify(listener).onResponse(true); } + public void testRemove_GivenTimeout() throws Exception { + givenClientRequestsSucceed(); + givenJobs(Arrays.asList( + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() + )); + + final int timeoutAfter = randomIntBetween(0, 1); + AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); + + createExpiredResultsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + + assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter)); + verify(listener).onResponse(false); + } + public void testRemove_GivenClientRequestsFailed() throws IOException { givenClientRequestsFailed(); givenJobs(Arrays.asList( @@ -126,7 +143,7 @@ public void testRemove_GivenClientRequestsFailed() throws IOException { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(listener); + createExpiredResultsRemover().remove(listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);