Skip to content

Commit

Permalink
[ML] Throttle the delete-by-query of expired results (#47177)
Browse files Browse the repository at this point in the history
Due to #47003 many clusters will have built up a
large backlog of expired results. On upgrading to
a version where that bug is fixed users could find
that the first ML daily maintenance task deletes
a very large amount of documents.

This change introduces throttling to the
delete-by-query that the ML daily maintenance uses
to delete expired results to limit it to deleting an
average 200 documents per second. (There is no
throttling for state/forecast documents as these
are expected to be lower volume.)

Additionally a rough time limit of 8 hours is applied
to the whole delete expired data action. (This is only
rough as it won't stop part way through a single
operation - it only checks the timeout between
operations.)

Relates #47103
  • Loading branch information
droberts195 committed Oct 2, 2019
1 parent 6607204 commit b576a6a
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public void readFrom(StreamInput in) throws IOException {
deleted = in.readBoolean();
}

public boolean isDeleted() {
return deleted;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
ActionListener.wrap(
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,54 @@
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;

public class TransportDeleteExpiredDataAction extends HandledTransportAction<DeleteExpiredDataAction.Request,
DeleteExpiredDataAction.Response> {

// TODO: make configurable in the request
static final Duration MAX_DURATION = Duration.ofHours(8);

private final String executor;
private final Client client;
private final ClusterService clusterService;
private final Clock clock;

@Inject
public TransportDeleteExpiredDataAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, ClusterService clusterService) {
this(settings, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, indexNameExpressionResolver,
client, clusterService, Clock.systemUTC());
}

TransportDeleteExpiredDataAction(Settings settings, ThreadPool threadPool, String executor, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client,
ClusterService clusterService, Clock clock) {
super(settings, DeleteExpiredDataAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
DeleteExpiredDataAction.Request::new);
DeleteExpiredDataAction.Request::new, executor);
this.executor = executor;
this.client = ClientHelper.clientWithOrigin(client, ClientHelper.ML_ORIGIN);
this.clusterService = clusterService;
this.clock = clock;
}

@Override
protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener<DeleteExpiredDataAction.Response> listener) {
logger.info("Deleting expired data");
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION);
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier));
}

private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {
Auditor auditor = new Auditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
Expand All @@ -62,25 +83,32 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);
}

private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener) {
if (mlDataRemoversIterator.hasNext()) {
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier,
boolean haveAllPreviousDeletionsCompleted) {
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), listener::onFailure);
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse),
listener::onFailure);
// Removing expired ML data and artifacts requires multiple operations.
// These are queued up and executed sequentially in the action listener,
// the chained calls must all run the ML utility thread pool NOT the thread
// the previous action returned in which in the case of a transport_client_boss
// thread is a disaster.
remover.remove(new ThreadedActionListener<>(logger, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, nextListener,
false));
remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
isTimedOutSupplier);
} else {
logger.info("Completed deletion of expired data");
listener.onResponse(new DeleteExpiredDataAction.Response(true));
if (haveAllPreviousDeletionsCompleted) {
logger.info("Completed deletion of expired ML data");
} else {
logger.info("Halted deletion of expired ML data until next invocation");
}
listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Result;

Expand Down Expand Up @@ -79,6 +80,9 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

try {
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
} catch (Exception e) {
Expand All @@ -101,6 +105,10 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
Expand All @@ -116,6 +124,9 @@ public void deleteInterimResults() {
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -50,11 +51,12 @@ protected Client getClient() {
}

@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
removeData(newJobIterator(), listener, isTimedOutSupplier);
}

private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
if (jobIterator.hasNext() == false) {
listener.onResponse(true);
return;
Expand All @@ -66,13 +68,19 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
return;
}

if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}

Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, listener);
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
}

private WrappedBatchedJobsIterator newJobIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
Expand All @@ -43,6 +44,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Removes up to {@link #MAX_FORECASTS} forecasts (stats + forecasts docs) that have expired.
Expand Down Expand Up @@ -70,10 +72,10 @@ public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
}

@Override
public void remove(ActionListener<Boolean> listener) {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, listener),
searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));

SearchSourceBuilder source = new SearchSourceBuilder();
Expand All @@ -82,13 +84,16 @@ public void remove(ActionListener<Boolean> listener) {
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);

// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);

SearchRequest searchRequest = new SearchRequest(RESULTS_INDEX_PATTERN);
searchRequest.source(source);
client.execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener) {
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
Expand All @@ -97,6 +102,11 @@ private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boole
return;
}

if (isTimedOutSupplier.get()) {
listener.onResponse(false);
return;
}

DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override
Expand Down Expand Up @@ -155,6 +165,10 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
Expand Down Expand Up @@ -87,7 +88,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE));
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));

getClient().execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
Expand Down Expand Up @@ -87,13 +88,22 @@ private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(5);

// Delete the documents gradually.
// With batch size 1000 and 200 requests per second this implies we spread
// deletion of 1 million documents over 5000 seconds ~= 83 minutes.
request.setBatchSize(1000);
request.setRequestsPerSecond(200);

request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
request.setQuery(query);

// _doc is the most efficient sort order and will also disable scoring
request.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.elasticsearch.action.ActionListener;

import java.util.function.Supplier;

public interface MlDataRemover {
void remove(ActionListener<Boolean> listener);
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* If for any reason a job is deleted by some of its state documents
Expand All @@ -52,13 +53,17 @@ public UnusedStateRemover(Client client, ClusterService clusterService) {
}

@Override
public void remove(ActionListener<Boolean> listener) {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
try {
List<String> unusedStateDocIds = findUnusedStateDocIds();
if (unusedStateDocIds.size() > 0) {
executeDeleteUnusedStateDocs(unusedStateDocIds, listener);
if (isTimedOutSupplier.get()) {
listener.onResponse(false);
} else {
listener.onResponse(true);
if (unusedStateDocIds.size() > 0) {
executeDeleteUnusedStateDocs(unusedStateDocIds, listener);
} else {
listener.onResponse(true);
}
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -108,6 +113,10 @@ private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, ActionListe
.types(ElasticsearchMappings.DOC_TYPE)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0])));

// _doc is the most efficient sort order and will also disable scoring
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);

client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) {
Expand Down
Loading

0 comments on commit b576a6a

Please sign in to comment.