Skip to content

Commit

Permalink
[ML] Throttle the delete-by-query of expired results (#47177)
Browse files Browse the repository at this point in the history
Due to #47003 many clusters will have built up a
large backlog of expired results. On upgrading to
a version where that bug is fixed users could find
that the first ML daily maintenance task deletes
a very large amount of documents.

This change introduces throttling to the
delete-by-query that the ML daily maintenance uses
to delete expired results to limit it to deleting an
average 200 documents per second. (There is no
throttling for state/forecast documents as these
are expected to be lower volume.)

Additionally a rough time limit of 8 hours is applied
to the whole delete expired data action. (This is only
rough as it won't stop part way through a single
operation - it only checks the timeout between
operations.)

Relates #47103
  • Loading branch information
droberts195 committed Oct 2, 2019
1 parent 42c5054 commit 4379a3c
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public Response(StreamInput in) throws IOException {
deleted = in.readBoolean();
}

public boolean isDeleted() {
return deleted;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(deleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
ActionListener.wrap(
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,54 @@
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;

public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
DeleteExpiredDataAction.Response> {

// TODO: make configurable in the request
static final Duration MAX_DURATION = Duration.ofHours(8);

private final ThreadPool threadPool;
private final String executor;
private final Client client;
private final ClusterService clusterService;
private final Clock clock;

@Inject
public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new);
this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
Clock.systemUTC());
}

TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService, Clock clock) {
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
this.threadPool = threadPool;
this.executor = executor;
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
}

@Override
protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
ActionListener<DeleteExpiredDataAction.Response> listener) {
logger.info("Deleting expired data");
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION);
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier));
}

private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
Expand All @@ -62,25 +82,32 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);
}

private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener) {
if (mlDataRemoversIterator.hasNext()) {
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier,
boolean haveAllPreviousDeletionsCompleted) {
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure);
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse),
listener::onFailure);
// Removing expired ML data and artifacts requires multiple operations.
// These are queued up and executed sequentially in the action listener,
// the chained calls must all run the ML utility thread pool NOT the thread
// the previous action returned in which in the case of a transport_client_boss
// thread is a disaster.
remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener,
false));
remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
isTimedOutSupplier);
} else {
logger.info("Completed deletion of expired data");
listener.onResponse(new DeleteExpiredDataAction.Response(true));
if (haveAllPreviousDeletionsCompleted) {
logger.info("Completed deletion of expired ML data");
} else {
logger.info("Halted deletion of expired ML data until next invocation");
}
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Result;

Expand Down Expand Up @@ -79,6 +80,9 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

try {
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
} catch (Exception e) {
Expand All @@ -101,6 +105,10 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
Expand All @@ -116,6 +124,9 @@ public void deleteInterimResults() {
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
} catch (Exception e) {
Expand All @@ -134,6 +145,9 @@ public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> liste
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -40,11 +41,12 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}

@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
removeData(newJobIterator(), listener, isTimedOutSupplier);
}

private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
if (jobIterator.hasNext() == false) {
listener.onResponse(true);
return;
Expand All @@ -56,13 +58,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
return;
}

if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}

Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, listener);
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
}

private WrappedBatchedJobsIterator newJobIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
Expand All @@ -44,6 +45,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired.
Expand Down Expand Up @@ -71,10 +73,10 @@ public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
}

@Override
public void remove(ActionListener<Boolean> listener) {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, listener),
searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));

SearchSourceBuilder source = new SearchSourceBuilder();
Expand All @@ -84,13 +86,16 @@ public void remove(ActionListener<Boolean> listener) {
source.size(MAX_FORECASTS);
source.trackTotalHits(true);

// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);

SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
Expand All @@ -99,6 +104,11 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boole
return;
}

if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}

DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
Expand Down Expand Up @@ -157,6 +167,10 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
Expand Down Expand Up @@ -88,7 +89,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));

client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
Expand Down Expand Up @@ -88,13 +89,21 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);

// Delete the documents gradually.
// With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes.
request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE);
request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5);

request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
request.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.elasticsearch.action.ActionListener;

import java.util.function.Supplier;

public interface MlDataRemover {
void remove(ActionListener<Boolean> listener);
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
}
Loading

0 comments on commit 4379a3c

Please sign in to comment.