Skip to content

Commit

Permalink
[ML] Calculate results and snapshot retention using latest bucket tim…
Browse files Browse the repository at this point in the history
…estamps (#51061) (#51301)

The retention period is calculated relative to the last bucket result or snapshot
time rather than wall clock
  • Loading branch information
davidkyle authored Jan 22, 2020
1 parent 59687a9 commit ca4b90a
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 183 deletions.
17 changes: 10 additions & 7 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,10 @@ Only the specified `terms` can be viewed when using the Single Metric Viewer.
end::model-plot-config-terms[]

tag::model-snapshot-retention-days[]
The time in days that model snapshots are retained for the job. Older snapshots
are deleted. The default value is `1`, which means snapshots are retained for
one day (twenty-four hours).
Advanced configuration option. The period of time (in days) that model snapshots are retained.
Age is calculated relative to the timestamp of the newest model snapshot.
The default value is `1`, which means snapshots that are one day (twenty-four hours)
older than the newest snapshot are deleted.
end::model-snapshot-retention-days[]

tag::multivariate-by-fields[]
Expand Down Expand Up @@ -961,10 +962,12 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
end::results-index-name[]

tag::results-retention-days[]
Advanced configuration option. The number of days for which job results are
retained. Once per day at 00:30 (server time), results older than this period
are deleted from {es}. The default value is null, which means results are
retained.
Advanced configuration option. The period of time (in days) that results are retained.
Age is calculated relative to the timestamp of the latest bucket result.
If this property has a non-null value, once per day at 00:30 (server time),
results that are the specified number of days older than the latest
bucket result are deleted from {es}. The default value is null, which means all
results are retained.
end::results-retention-days[]

tag::retain[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,20 +51,20 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
private static final String DATA_INDEX = "delete-expired-data-test-data";

@Before
public void setUpData() throws IOException {
public void setUpData() {
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(SINGLE_MAPPING_NAME, "time", "type=date,format=epoch_millis")
.get();

// We are going to create data for last 2 days
long nowMillis = System.currentTimeMillis();
// We are going to create 3 days of data ending 1 hr ago
long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
int totalBuckets = 3 * 24;
int normalRate = 10;
int anomalousRate = 100;
int anomalousBucket = 30;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
Expand Down Expand Up @@ -122,7 +120,7 @@ public void testDeleteExpiredData() throws Exception {

String datafeedId = job.getId() + "-feed";
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
datafeedConfig.setIndices(Arrays.asList(DATA_INDEX));
datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
DatafeedConfig datafeed = datafeedConfig.build();
registerDatafeed(datafeed);
putDatafeed(datafeed);
Expand Down Expand Up @@ -210,7 +208,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("no-retention").size(), equalTo(2));

List<Bucket> buckets = getBuckets("results-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-retention").size(), equalTo(0));
Expand All @@ -225,7 +223,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));

buckets = getBuckets("results-and-snapshots-retention");
assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
Expand Down Expand Up @@ -278,7 +276,7 @@ public void testDeleteExpiredData() throws Exception {
private static Job.Builder newJobBuilder(String id) {
Detector.Builder detector = new Detector.Builder();
detector.setFunction("count");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
new ExpiredResultsRemover(client, auditor, threadPool),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -16,12 +15,9 @@
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.time.Clock;
import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -68,30 +64,37 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<B
removeData(jobIterator, listener, isTimedOutSupplier);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs,
ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));

calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
cutoffEpochMs -> {
if (cutoffEpochMs == null) {
removeData(jobIterator, listener, isTimedOutSupplier);
} else {
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
response -> removeData(jobIterator, listener, isTimedOutSupplier),
listener::onFailure));
}
},
listener::onFailure
));
}

private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

private long calcCutoffEpochMs(long retentionDays) {
long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
}
abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);

protected abstract Long getRetentionDays(Job job);
abstract Long getRetentionDays(Job job);

/**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);

protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
Expand All @@ -27,12 +31,14 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Deletes all model snapshots that have expired the configured retention time
Expand Down Expand Up @@ -65,10 +71,59 @@ public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threa
}

@Override
protected Long getRetentionDays(Job job) {
Long getRetentionDays(Job job) {
return job.getModelSnapshotRetentionDays();
}

@Override
void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);

latestSnapshotTimeStamp(jobId, ActionListener.wrap(
latestTime -> {
if (latestTime == null) {
threadedActionListener.onResponse(null);
} else {
long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
threadedActionListener.onResponse(cutoff);
}
},
listener::onFailure
));
}

private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(snapshotQuery);
searchSourceBuilder.size(1);
searchSourceBuilder.trackTotalHits(false);

String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));

client.search(searchRequest, ActionListener.wrap(
response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
// no snapshots found
listener.onResponse(null);
} else {
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
listener.onResponse(snapshot.getTimestamp().getTime());
}
},
listener::onFailure)
);
}

@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) {
Expand Down
Loading

0 comments on commit ca4b90a

Please sign in to comment.